威望0
积分7946
贡献0
在线时间763 小时
UID1
注册时间2021-4-14
最后登录2024-11-21
管理员
- UID
- 1
- 威望
- 0
- 积分
- 7946
- 贡献
- 0
- 注册时间
- 2021-4-14
- 最后登录
- 2024-11-21
- 在线时间
- 763 小时
|
- # -*- coding: utf-8 -*-
- from WindPy import *
- from datetime import datetime, timedelta
- import time
- import cx_Oracle
- import sys,traceback
- import os
- os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
- year = int(datetime.now().strftime('%Y'))
- today = int(datetime.now().strftime('%Y%m%d'))
- dayOfWeek = datetime.strptime(str(today),"%Y%m%d").date().weekday() + 1
- print("today:%d,dayOfWeek:%d" %(today,dayOfWeek))
- def getDataFromSjzx(today,sql,param,callFunName):
- conn_sjzx = cx_Oracle.connect('stage/stage@10.201.200.29:1521/orcl')
- cur_sjzx = conn_sjzx.cursor()
- cur_sjzx.prepare(sql)
- print("[getDataFromSjzx---%s]传入参数:%s" % (callFunName,param))
- print("[getDataFromSjzx---%s]传入Sql语句:%s" % (callFunName,sql))
- print("[getDataFromSjzx---%s]ips绑定变量:%s" % (callFunName,cur_sjzx.bindnames()))
- cur_sjzx.execute(None, param)
- fromData = cur_sjzx.fetchall()
- desc = cur_sjzx.description
- cur_sjzx.close()
- conn_sjzx.close()
- length = len(desc)
- dataListFromSjzx = []
- for data in fromData:
- row = {}
- for i in range(length):
- if data[i] is not None:
- row[desc[i][0].lower()] = data[i]
- dataListFromSjzx.append(row)
- print("[getDataFromSjzx---%s]日期查询[%d]总记录数:%d" % (callFunName ,today, len(dataListFromSjzx)))
- return dataListFromSjzx
- def updateDataToIps(sql,param,callFunName):
- conn_ips = cx_Oracle.connect('ips/ips@10.201.200.42:1521/xjsdb')
- cursor_ips = conn_ips.cursor()
- cursor_ips.prepare(sql)
- #print("[updateDataToIps]传入参数:%s" % (param))
- print("[updateDataToIps---%s]传入Sql语句:%s" % (callFunName,sql))
- print("[updateDataToIps---%s]ips绑定变量:%s" % (callFunName,cursor_ips.bindnames()))
- cursor_ips.executemany(None, param)
- conn_ips.commit()
- cursor_ips.close()
- conn_ips.close()
- def updateDeleteDataToIps(delete_sql,delete_param,insert_sql,insert_param,callFunName):
- conn_ips = cx_Oracle.connect('ips/ips@10.201.200.42:1521/xjsdb')
- cursor_ips = conn_ips.cursor()
- cursor_ips.prepare(delete_sql)
- print("[updateDataToIps---%s]传入Sql语句(删除):%s" % (callFunName,delete_sql))
- print("[updateDataToIps---%s]ips绑定变量(删除):%s" % (callFunName,cursor_ips.bindnames()))
- cursor_ips.execute(None,delete_param)
- cursor_ips.prepare(insert_sql)
- print("[updateDataToIps---%s]传入Sql语句(插入):%s" % (callFunName,insert_sql))
- print("[updateDataToIps---%s]ips绑定变量(插入):%s" % (callFunName,cursor_ips.bindnames()))
- cursor_ips.executemany(None, insert_param)
- conn_ips.commit()
- cursor_ips.close()
- conn_ips.close()
- def synTradeDayData():
- select_sql = "select t.trade_days, t.s_info_exchmarket " \
- " from wind2_asharecalendar t " \
- " where to_number(t.trade_days) >= :today " \
- " and t.s_info_exchmarket = :marketType " \
- " order by to_number(t.trade_days) "
- param = {}
- marketType = "SSE"
- param["today"] = today
- param["marketType"] = marketType
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synTradeDayData")
- dataListToIps = []
- dataListToIps = fillTradeDay(dataListFromSjzx)
- insert_sql = "insert into t_trade_day (l_trade_date,vc_market_type) values (:tradeDay,:marketType)"
- delete_sql = "delete from t_trade_day a where a.l_trade_date >= :today and a.vc_market_type = :marketType"
- if (dataListToIps is not None and len(dataListToIps) > 0):
- updateDeleteDataToIps(delete_sql,param,insert_sql,dataListToIps,"synTradeDayData")
- def fillTradeDay(dataListFromSjzx):
- dataListToIps = []
- for data in dataListFromSjzx:
- result = {}
- result["tradeDay"] = data['trade_days']
- result["marketType"] = data['s_info_exchmarket']
- dataListToIps.append(result)
- return dataListToIps
- def synSpecialNonTradedayData():
- if (dayOfWeek==6 or dayOfWeek==7):
- print("[synSpecialNonTradedayData]日期[%d]为周%d,无需进行同步处理" % (today,dayOfWeek))
- return
- select_sql = "select nvl(count(1),0) l_count " \
- " from wind2_asharecalendar t " \
- " where to_number(t.trade_days) = :today " \
- " and t.s_info_exchmarket = :marketType "
- param = {}
- marketType = "SSE"
- param["today"] = today
- param["marketType"] = marketType
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synSpecialNonTradedayData")
- if (dataListFromSjzx is not None and dataListFromSjzx[0]["l_count"] > 0):
- print("[synSpecialNonTradedayData]日期[%d]为周%d且为交易日,无需进行同步处理" % (today, dayOfWeek))
- return
- dataListToIps = []
- dataListToIps = fillSpecialNonTradeday(dataListFromSjzx)
- delete_sql = "delete from t_special_non_tradeday a where a.l_trade_date >= :today and a.vc_market_type = :marketType"
- insert_sql = "insert into t_special_non_tradeday (l_trade_date,vc_market_type,l_day_week) values (:tradeDay,:marketType,:dayOfWeek)"
- if (dataListToIps is not None and len(dataListToIps) > 0):
- updateDeleteDataToIps(delete_sql, param, insert_sql, dataListToIps, "synSpecialNonTradedayData")
- def fillSpecialNonTradeday(dataListFromSjzx):
- dataListToIps = []
- result = {}
- result["tradeDay"] = today
- result["marketType"] = "SSE"
- result["dayOfWeek"] = dayOfWeek
- dataListToIps.append(result)
- return dataListToIps
- def synStockInfoWind():
- select_sql = "select a.ob_object_id vc_object_id, " \
- " a.f1_0001 vc_wind_code, " \
- " a.f4_0001 vc_stock_code, " \
- " a.f6_0001 vc_stock_name, " \
- " a.f10_0001 vc_type_wind, " \
- " a.f11_0001 vc_type_detail_wind, " \
- " a.f12_0001 vc_stock_type, " \
- " a.f14_0001 vc_market_type, " \
- " a.f16_0001 vc_stock_id_wind, " \
- " a.f17_0001 vc_company_id_wind, " \
- " to_number(b.s_info_listdate) l_listing_date, " \
- " nvl(to_number(b.s_info_delistdate),99999999) l_delisting_date " \
- " from wind_tb_object_0001 a, wind2_asharedescription b " \
- " where a.f12_0001 = :stockType " \
- " and a.f1_0001 = b.s_info_windcode " \
- " and b.s_info_listdate is not null " \
- " and (to_number(b.s_info_listdate) >= :today or " \
- " (b.s_info_delistdate is not null and to_number(b.s_info_delistdate) >= :today)) "
- param = {}
- stockType = "A"
- param["today"] = today
- param["stockType"] = stockType
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synStockInfoWind")
- dataListToIps = []
- deleteStockCodeStr = ""
- (dataListToIps,deleteStockCodeStr) = fillStockInfoWind(dataListFromSjzx)
- print("[synStockInfoWind]待删除数据:%s" % (deleteStockCodeStr))
- delete_sql = "delete from t_stock_info_wind where vc_wind_code in ("+deleteStockCodeStr+")"
- insert_sql = "insert into t_stock_info_wind ( "\
- " vc_object_id ,vc_wind_code ,vc_stock_code ,vc_stock_name , "\
- " vc_type_wind ,vc_type_detail_wind ,vc_stock_type ,vc_market_type , "\
- " vc_stock_id_wind ,vc_company_id_wind ,l_listing_date ,l_delisting_date) "\
- " values (:objectId,:windCode,:stockCode,:stockName, "\
- " :typeWind,:typeDetailWind,:stockType,:marketType, "\
- " :stockIdWind,:companyIdWind,:listingDate,:delistingDate) "
- if (dataListToIps is not None and len(dataListToIps) > 0):
- if (deleteStockCodeStr is not None and len(deleteStockCodeStr) > 0):
- updateDeleteDataToIps(delete_sql, {}, insert_sql, dataListToIps, "synStockInfoWind")
- else:
- updateDataToIps(insert_sql, dataListToIps, "synStockInfoWind")
- def fillStockInfoWind(dataListFromSjzx):
- dataListToIps = []
- deleteStockCodeStr = ""
- for data in dataListFromSjzx:
- result = {}
- result["objectId"] = data["vc_object_id"]
- result["windCode"] = data["vc_wind_code"]
- result["stockCode"] = data["vc_stock_code"]
- result["stockName"] = data["vc_stock_name"]
- result["typeWind"] = data["vc_type_wind"]
- result["typeDetailWind"] = data["vc_type_detail_wind"]
- result["stockType"] = data["vc_stock_type"]
- result["marketType"] = data["vc_market_type"]
- result["stockIdWind"] = data["vc_stock_id_wind"]
- result["companyIdWind"] = data["vc_company_id_wind"]
- result["listingDate"] = data["l_listing_date"]
- result["delistingDate"] = data["l_delisting_date"]
- if (result["delistingDate"] is not None and result["delistingDate"] > 0 and result["delistingDate"] < 99999999):
- if (deleteStockCodeStr is None or len(deleteStockCodeStr) <= 0):
- deleteStockCodeStr = "'" + result["windCode"] + "'"
- else:
- deleteStockCodeStr = deleteStockCodeStr + "," + "'" + result["windCode"] + "'"
- dataListToIps.append(result)
- return (dataListToIps,deleteStockCodeStr)
- def synWind2AshareeodpricesData():
- table = "wind2_ashareeodprices_" + str(year)
- select_sql = "select a.object_id,a.s_info_windcode,nvl(to_number(a.trade_dt),0) trade_dt,a.crncy_code, " \
- " nvl(to_number(a.s_dq_preclose),0) s_dq_preclose, " \
- " nvl(to_number(a.s_dq_open),0) s_dq_open, " \
- " nvl(to_number(a.s_dq_close),0) s_dq_close, " \
- " nvl(to_number(a.s_dq_volume),0) s_dq_volume, " \
- " nvl(to_number(a.s_dq_amount),0) s_dq_amount, " \
- " a.s_dq_tradestatus, " \
- " a.l_createdate,a.l_createtime " \
- " from " + table + " a " \
- " where to_number(a.trade_dt) >= :today"
- param = {}
- param["today"] = today
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today,select_sql,param,"synWind2AshareeodpricesData")
- dataListToIps = []
- dataListToIps = fillWind2AshareeodpricesData(dataListFromSjzx)
- insert_sql = "insert into " + table + "_bak ( " \
- " object_id,s_info_windcode,trade_dt,crncy_code, " \
- " s_dq_preclose,s_dq_open,s_dq_close,s_dq_volume,s_dq_amount, " \
- " s_dq_tradestatus,l_createdate,l_createtime) " \
- " values(:objectId,:windcode,:tradeDt,:crncyCode, " \
- " :dqPreclose,:dqOpen,:dqClose,:dqVolume,:dqAmount, " \
- " :dqTradestatus,:createDate,:createTime)"
- if (dataListToIps is not None and len(dataListToIps) > 0):
- updateDataToIps(insert_sql, dataListToIps, "synWind2AshareeodpricesData")
- def fillWind2AshareeodpricesData(dataListFromSjzx):
- dataListToIps = []
- for data in dataListFromSjzx:
- result = {}
- result["objectId"] = data["object_id"]
- result["windcode"] = data["s_info_windcode"]
- result["tradeDt"] = data["trade_dt"]
- result["crncyCode"] = data["crncy_code"]
- result["dqPreclose"] = data["s_dq_preclose"]
- result["dqOpen"] = data["s_dq_open"]
- result["dqClose"] = data["s_dq_close"]
- result["dqVolume"] = data["s_dq_volume"]
- result["dqAmount"] = data["s_dq_amount"]
- result["dqTradestatus"] = data["s_dq_tradestatus"]
- result["createDate"] = data["l_createdate"]
- result["createTime"] = data["l_createtime"]
- dataListToIps.append(result)
- return dataListToIps
- def synAshareDividendData():
- select_sql = "select a.s_info_windcode vc_wind_code, "\
- " a.ex_dt l_ex_date, "\
- " sum(a.cash_dvd_per_sh_pre_tax) en_cash_dvd_per_sh_pre_tax, "\
- " sum(a.cash_dvd_per_sh_after_tax) en_cash_dvd_per_sh_after_tax "\
- " from wind2_asharedividend a "\
- " where a.ex_dt is not null "\
- " and to_number(a.ex_dt) >= :today "\
- " group by a.s_info_windcode, a.ex_dt order by a.ex_dt desc"
- param = {}
- param["today"] = today
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synAshareDividendData")
- dataListToIps = []
- dataListToIps = fillAshareDividendData(dataListFromSjzx)
- insert_sql = "insert into t_ashare_dividend ( "\
- " vc_wind_code,l_ex_date,en_cash_dvd_per_sh_pre_tax,en_cash_dvd_per_sh_after_tax) "\
- " values(:windCode,:exDate,:cashDvdPerShPreTax,:cashDvdPerShAfterTax)"
- if (dataListToIps is not None and len(dataListToIps) > 0):
- updateDataToIps(insert_sql, dataListToIps, "synAshareDividendData")
- def fillAshareDividendData(dataListFromSjzx):
- dataListToIps = []
- for data in dataListFromSjzx:
- result = {}
- result["windCode"] = data["vc_wind_code"]
- result["exDate"] = data["l_ex_date"]
- result["cashDvdPerShPreTax"] = data["en_cash_dvd_per_sh_pre_tax"]
- result["cashDvdPerShAfterTax"] = data["en_cash_dvd_per_sh_after_tax"]
- dataListToIps.append(result)
- return dataListToIps
- def synCapitalStockWindData():
- select_sql = "select capital.vc_wind_code, " \
- " capital.l_change_date, " \
- " capital.l_capital_type, " \
- " capital.en_captial_value " \
- " from (select b.f1_0001 vc_wind_code, " \
- " to_number(a.f50_1432) l_change_date, "\
- " '1' l_capital_type, " \
- " a.f27_1432 en_captial_value " \
- " from wind_tb_object_1432 a, wind_tb_object_0001 b " \
- " where a.f1_1432 = b.f17_0001 " \
- " and b.f12_0001 = 'A' " \
- " and a.ob_is_valid_1432 = '1' " \
- " and to_number(a.f50_1432) >= :today " \
- " union all " \
- " select d.f1_0001 vc_wind_code, " \
- " to_number(c.f2_1931) l_change_date, "\
- " '2' l_capital_type, " \
- " c.f4_1931 en_captial_value " \
- " from wind_tb_object_1931 c, wind_tb_object_0001 d " \
- " where c.f1_1931 = d.f16_0001 " \
- " and d.f12_0001 = 'A' " \
- " and to_number(c.f2_1931) >= :today) capital " \
- " order by capital.l_change_date"
- param = {}
- param["today"] = today
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synCapitalStockWindData")
- dataListToIps = []
- dataListToIps = fillCapitalStockWindData(dataListFromSjzx)
- insert_sql = "insert into t_capital_stock_wind ( "\
- " vc_wind_code,l_change_date,l_capital_type,en_captial_value) "\
- " values(:windCode,:changeDate,:capitalType,:captialValue)"
- if (dataListToIps is not None and len(dataListToIps) > 0):
- updateDataToIps(insert_sql, dataListToIps, "synCapitalStockWindData")
- def fillCapitalStockWindData(dataListFromSjzx):
- dataListToIps = []
- for data in dataListFromSjzx:
- result = {}
- result["windCode"] = data["vc_wind_code"]
- result["changeDate"] = data["l_change_date"]
- result["capitalType"] = data["l_capital_type"]
- result["captialValue"] = data["en_captial_value"]
- dataListToIps.append(result)
- return dataListToIps
- def synSwIndustryClass():
- select_sql = "select c.f1_0001 vc_wind_code, " \
- " c.f6_0001 vc_stock_name, " \
- " to_number(a.f3_1476) l_import_date, " \
- " to_number(nvl(a.f4_1476,0)) l_remove_date, " \
- " a.f5_1476 l_new_flag, " \
- " b.levelnum-1 l_industry_level, " \
- " (case when b.levelnum=2 then substr(a.f2_1476,1,4) " \
- " when b.levelnum=3 then substr(a.f2_1476,1,6) " \
- " when b.levelnum=4 then substr(a.f2_1476,1,8) " \
- " else '' end) vc_industry_code, " \
- " b.name vc_industry_name " \
- " from wind_tb_object_1476 a, wind_tb_object_1022 b, wind_tb_object_0001 c " \
- " where b.used = '1' " \
- " and b.code like '61%' " \
- " and c.f12_0001 = 'A' " \
- " and a.f1_1476 = c.f16_0001 " \
- " and (to_number(a.f3_1476) >= :today or " \
- " to_number(nvl(a.f4_1476,0)) >= :today) " \
- " and ((b.levelnum = 2 and substr(b.code,1,4) = substr(a.f2_1476,1,4)) or " \
- " (b.levelnum = 3 and substr(b.code,1,6) = substr(a.f2_1476,1,6)) or " \
- " (b.levelnum = 4 and substr(b.code,1,8) = substr(a.f2_1476,1,8))) " \
- " order by b.levelnum "
- param = {}
- param["today"] = today
- dataListFromSjzx = []
- dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synSwIndustryClass")
- dataListToIps = []
- dataListToIps = fillSwIndustryClass(dataListFromSjzx)
- insert_sql = "insert into t_sw_industry_class ( "\
- " vc_wind_code,vc_stock_name,l_import_date,l_remove_date, "\
- " l_new_flag,l_industry_level,vc_industry_code,vc_industry_name) "\
- " values(:windCode,:stockName,:importDate,:removeDate, "\
- " :newFlag,:industryLevel,:industryCode,:industryName)"
- if (dataListToIps is not None and len(dataListToIps) > 0):
- updateDataToIps(insert_sql, dataListToIps, "synSwIndustryClass")
- def fillSwIndustryClass(dataListFromSjzx):
- dataListToIps = []
- for data in dataListFromSjzx:
- result = {}
- result["windCode"] = data["vc_wind_code"]
- result["stockName"] = data["vc_stock_name"]
- result["importDate"] = data["l_import_date"]
- result["removeDate"] = data["l_remove_date"]
- result["newFlag"] = data["l_new_flag"]
- result["industryLevel"] = data["l_industry_level"]
- result["industryCode"] = data["vc_industry_code"]
- result["industryName"] = data["vc_industry_name"]
- dataListToIps.append(result)
- return dataListToIps
- if __name__ == "__main__":
- synTradeDayData()
- synSpecialNonTradedayData()
- synStockInfoWind()
- synWind2AshareeodpricesData()
- synAshareDividendData()
- synCapitalStockWindData()
- synSwIndustryClass()
复制代码 |
|