php中文网 | cnphp.com

 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 336|回复: 0

Python实现Oracle数据库同步

[复制链接]

3138

主题

3148

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

UID
1
威望
0
积分
7946
贡献
0
注册时间
2021-4-14
最后登录
2024-11-21
在线时间
763 小时
QQ
发表于 2023-11-1 20:41:52 | 显示全部楼层 |阅读模式
  1. # -*- coding: utf-8 -*-
  2. from WindPy import *
  3. from datetime import datetime, timedelta
  4. import time
  5. import cx_Oracle
  6. import sys,traceback
  7. import os
  8. os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

  9. year = int(datetime.now().strftime('%Y'))
  10. today = int(datetime.now().strftime('%Y%m%d'))
  11. dayOfWeek = datetime.strptime(str(today),"%Y%m%d").date().weekday() + 1
  12. print("today:%d,dayOfWeek:%d" %(today,dayOfWeek))

  13. def getDataFromSjzx(today,sql,param,callFunName):
  14.   conn_sjzx = cx_Oracle.connect('stage/stage@10.201.200.29:1521/orcl')
  15.   cur_sjzx = conn_sjzx.cursor()
  16.   cur_sjzx.prepare(sql)
  17.   print("[getDataFromSjzx---%s]传入参数:%s" % (callFunName,param))
  18.   print("[getDataFromSjzx---%s]传入Sql语句:%s" % (callFunName,sql))
  19.   print("[getDataFromSjzx---%s]ips绑定变量:%s" % (callFunName,cur_sjzx.bindnames()))
  20.   cur_sjzx.execute(None, param)
  21.   fromData = cur_sjzx.fetchall()
  22.   desc = cur_sjzx.description
  23.   cur_sjzx.close()
  24.   conn_sjzx.close()
  25.   length = len(desc)
  26.   dataListFromSjzx = []
  27.   for data in fromData:
  28.     row = {}
  29.     for i in range(length):
  30.       if data[i] is not None:
  31.         row[desc[i][0].lower()] = data[i]
  32.     dataListFromSjzx.append(row)
  33.   print("[getDataFromSjzx---%s]日期查询[%d]总记录数:%d" % (callFunName ,today, len(dataListFromSjzx)))
  34.   return dataListFromSjzx

  35. def updateDataToIps(sql,param,callFunName):
  36.   conn_ips = cx_Oracle.connect('ips/ips@10.201.200.42:1521/xjsdb')
  37.   cursor_ips = conn_ips.cursor()
  38.   cursor_ips.prepare(sql)
  39.   #print("[updateDataToIps]传入参数:%s" % (param))
  40.   print("[updateDataToIps---%s]传入Sql语句:%s" % (callFunName,sql))
  41.   print("[updateDataToIps---%s]ips绑定变量:%s" % (callFunName,cursor_ips.bindnames()))
  42.   cursor_ips.executemany(None, param)
  43.   conn_ips.commit()
  44.   cursor_ips.close()
  45.   conn_ips.close()

  46. def updateDeleteDataToIps(delete_sql,delete_param,insert_sql,insert_param,callFunName):
  47.   conn_ips = cx_Oracle.connect('ips/ips@10.201.200.42:1521/xjsdb')
  48.   cursor_ips = conn_ips.cursor()
  49.   cursor_ips.prepare(delete_sql)
  50.   print("[updateDataToIps---%s]传入Sql语句(删除):%s" % (callFunName,delete_sql))
  51.   print("[updateDataToIps---%s]ips绑定变量(删除):%s" % (callFunName,cursor_ips.bindnames()))
  52.   cursor_ips.execute(None,delete_param)
  53.   cursor_ips.prepare(insert_sql)
  54.   print("[updateDataToIps---%s]传入Sql语句(插入):%s" % (callFunName,insert_sql))
  55.   print("[updateDataToIps---%s]ips绑定变量(插入):%s" % (callFunName,cursor_ips.bindnames()))
  56.   cursor_ips.executemany(None, insert_param)
  57.   conn_ips.commit()
  58.   cursor_ips.close()
  59.   conn_ips.close()

  60. def synTradeDayData():
  61.   select_sql = "select t.trade_days, t.s_info_exchmarket " \
  62.                "  from wind2_asharecalendar t " \
  63.                " where to_number(t.trade_days) >= :today " \
  64.                "   and t.s_info_exchmarket = :marketType " \
  65.                " order by to_number(t.trade_days) "
  66.   param = {}
  67.   marketType = "SSE"
  68.   param["today"] = today
  69.   param["marketType"] = marketType
  70.   dataListFromSjzx = []
  71.   dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synTradeDayData")
  72.   dataListToIps = []
  73.   dataListToIps = fillTradeDay(dataListFromSjzx)
  74.   insert_sql = "insert into t_trade_day (l_trade_date,vc_market_type) values (:tradeDay,:marketType)"
  75.   delete_sql = "delete from t_trade_day a where a.l_trade_date >= :today and a.vc_market_type = :marketType"
  76.   if (dataListToIps is not None and len(dataListToIps) > 0):
  77.     updateDeleteDataToIps(delete_sql,param,insert_sql,dataListToIps,"synTradeDayData")

  78. def fillTradeDay(dataListFromSjzx):
  79.   dataListToIps = []
  80.   for data in dataListFromSjzx:
  81.     result = {}
  82.     result["tradeDay"] = data['trade_days']
  83.     result["marketType"] = data['s_info_exchmarket']
  84.     dataListToIps.append(result)
  85.   return dataListToIps

  86. def synSpecialNonTradedayData():
  87.   if (dayOfWeek==6 or dayOfWeek==7):
  88.     print("[synSpecialNonTradedayData]日期[%d]为周%d,无需进行同步处理" % (today,dayOfWeek))
  89.     return
  90.   select_sql = "select nvl(count(1),0) l_count " \
  91.                "  from wind2_asharecalendar t " \
  92.                " where to_number(t.trade_days) = :today " \
  93.                "   and t.s_info_exchmarket = :marketType "
  94.   param = {}
  95.   marketType = "SSE"
  96.   param["today"] = today
  97.   param["marketType"] = marketType
  98.   dataListFromSjzx = []
  99.   dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synSpecialNonTradedayData")
  100.   if (dataListFromSjzx is not None and dataListFromSjzx[0]["l_count"] > 0):
  101.     print("[synSpecialNonTradedayData]日期[%d]为周%d且为交易日,无需进行同步处理" % (today, dayOfWeek))
  102.     return
  103.   dataListToIps = []
  104.   dataListToIps = fillSpecialNonTradeday(dataListFromSjzx)
  105.   delete_sql = "delete from t_special_non_tradeday a where a.l_trade_date >= :today and a.vc_market_type = :marketType"
  106.   insert_sql = "insert into t_special_non_tradeday (l_trade_date,vc_market_type,l_day_week) values (:tradeDay,:marketType,:dayOfWeek)"
  107.   if (dataListToIps is not None and len(dataListToIps) > 0):
  108.     updateDeleteDataToIps(delete_sql, param, insert_sql, dataListToIps, "synSpecialNonTradedayData")

  109. def fillSpecialNonTradeday(dataListFromSjzx):
  110.   dataListToIps = []
  111.   result = {}
  112.   result["tradeDay"] = today
  113.   result["marketType"] = "SSE"
  114.   result["dayOfWeek"] = dayOfWeek
  115.   dataListToIps.append(result)
  116.   return dataListToIps

  117. def synStockInfoWind():
  118.   select_sql = "select a.ob_object_id      vc_object_id, " \
  119.                "       a.f1_0001           vc_wind_code, " \
  120.                "       a.f4_0001           vc_stock_code, " \
  121.                "       a.f6_0001           vc_stock_name, " \
  122.                "       a.f10_0001          vc_type_wind, " \
  123.                "       a.f11_0001          vc_type_detail_wind, " \
  124.                "       a.f12_0001          vc_stock_type, " \
  125.                "       a.f14_0001          vc_market_type, " \
  126.                "       a.f16_0001          vc_stock_id_wind, " \
  127.                "       a.f17_0001          vc_company_id_wind, " \
  128.                "       to_number(b.s_info_listdate)   l_listing_date, " \
  129.                "       nvl(to_number(b.s_info_delistdate),99999999) l_delisting_date " \
  130.                "  from wind_tb_object_0001 a, wind2_asharedescription b " \
  131.                " where a.f12_0001 = :stockType " \
  132.                "   and a.f1_0001 = b.s_info_windcode " \
  133.                "   and b.s_info_listdate is not null " \
  134.                "   and (to_number(b.s_info_listdate) >= :today or " \
  135.                "       (b.s_info_delistdate is not null and to_number(b.s_info_delistdate) >= :today)) "
  136.   param = {}
  137.   stockType = "A"
  138.   param["today"] = today
  139.   param["stockType"] = stockType
  140.   dataListFromSjzx = []
  141.   dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synStockInfoWind")
  142.   dataListToIps = []
  143.   deleteStockCodeStr = ""
  144.   (dataListToIps,deleteStockCodeStr) = fillStockInfoWind(dataListFromSjzx)
  145.   print("[synStockInfoWind]待删除数据:%s" % (deleteStockCodeStr))
  146.   delete_sql = "delete from t_stock_info_wind where vc_wind_code in ("+deleteStockCodeStr+")"
  147.   insert_sql = "insert into t_stock_info_wind ( "\
  148.                "    vc_object_id ,vc_wind_code ,vc_stock_code ,vc_stock_name , "\
  149.                "    vc_type_wind ,vc_type_detail_wind ,vc_stock_type ,vc_market_type , "\
  150.                "    vc_stock_id_wind ,vc_company_id_wind ,l_listing_date ,l_delisting_date) "\
  151.                "  values (:objectId,:windCode,:stockCode,:stockName, "\
  152.                "    :typeWind,:typeDetailWind,:stockType,:marketType, "\
  153.                "    :stockIdWind,:companyIdWind,:listingDate,:delistingDate) "
  154.   if (dataListToIps is not None and len(dataListToIps) > 0):
  155.     if (deleteStockCodeStr is not None and len(deleteStockCodeStr) > 0):
  156.       updateDeleteDataToIps(delete_sql, {}, insert_sql, dataListToIps, "synStockInfoWind")
  157.     else:
  158.       updateDataToIps(insert_sql, dataListToIps, "synStockInfoWind")

  159. def fillStockInfoWind(dataListFromSjzx):
  160.   dataListToIps = []
  161.   deleteStockCodeStr = ""
  162.   for data in dataListFromSjzx:
  163.     result = {}
  164.     result["objectId"] = data["vc_object_id"]
  165.     result["windCode"] = data["vc_wind_code"]
  166.     result["stockCode"] = data["vc_stock_code"]
  167.     result["stockName"] = data["vc_stock_name"]
  168.     result["typeWind"] = data["vc_type_wind"]
  169.     result["typeDetailWind"] = data["vc_type_detail_wind"]
  170.     result["stockType"] = data["vc_stock_type"]
  171.     result["marketType"] = data["vc_market_type"]
  172.     result["stockIdWind"] = data["vc_stock_id_wind"]
  173.     result["companyIdWind"] = data["vc_company_id_wind"]
  174.     result["listingDate"] = data["l_listing_date"]
  175.     result["delistingDate"] = data["l_delisting_date"]
  176.     if (result["delistingDate"] is not None and result["delistingDate"] > 0 and result["delistingDate"] < 99999999):
  177.       if (deleteStockCodeStr is None or len(deleteStockCodeStr) <= 0):
  178.         deleteStockCodeStr = "'" + result["windCode"] + "'"
  179.       else:
  180.         deleteStockCodeStr = deleteStockCodeStr + "," + "'" + result["windCode"] + "'"
  181.     dataListToIps.append(result)
  182.   return (dataListToIps,deleteStockCodeStr)

  183. def synWind2AshareeodpricesData():
  184.   table = "wind2_ashareeodprices_" + str(year)
  185.   select_sql = "select a.object_id,a.s_info_windcode,nvl(to_number(a.trade_dt),0) trade_dt,a.crncy_code, " \
  186.                "       nvl(to_number(a.s_dq_preclose),0) s_dq_preclose, " \
  187.                "       nvl(to_number(a.s_dq_open),0) s_dq_open, " \
  188.                "       nvl(to_number(a.s_dq_close),0) s_dq_close, " \
  189.                "       nvl(to_number(a.s_dq_volume),0) s_dq_volume, " \
  190.                "       nvl(to_number(a.s_dq_amount),0) s_dq_amount, " \
  191.                 "      a.s_dq_tradestatus, " \
  192.                "       a.l_createdate,a.l_createtime " \
  193.                "  from " + table + " a " \
  194.                " where to_number(a.trade_dt) >= :today"
  195.   param = {}
  196.   param["today"] = today
  197.   dataListFromSjzx = []
  198.   dataListFromSjzx = getDataFromSjzx(today,select_sql,param,"synWind2AshareeodpricesData")
  199.   dataListToIps = []
  200.   dataListToIps = fillWind2AshareeodpricesData(dataListFromSjzx)
  201.   insert_sql = "insert into " + table + "_bak ( " \
  202.                "         object_id,s_info_windcode,trade_dt,crncy_code, " \
  203.                "         s_dq_preclose,s_dq_open,s_dq_close,s_dq_volume,s_dq_amount, " \
  204.                "         s_dq_tradestatus,l_createdate,l_createtime) " \
  205.                "  values(:objectId,:windcode,:tradeDt,:crncyCode, " \
  206.                "         :dqPreclose,:dqOpen,:dqClose,:dqVolume,:dqAmount, " \
  207.                "         :dqTradestatus,:createDate,:createTime)"
  208.   if (dataListToIps is not None and len(dataListToIps) > 0):
  209.     updateDataToIps(insert_sql, dataListToIps, "synWind2AshareeodpricesData")

  210. def fillWind2AshareeodpricesData(dataListFromSjzx):
  211.   dataListToIps = []
  212.   for data in dataListFromSjzx:
  213.     result = {}
  214.     result["objectId"] = data["object_id"]
  215.     result["windcode"] = data["s_info_windcode"]
  216.     result["tradeDt"] = data["trade_dt"]
  217.     result["crncyCode"] = data["crncy_code"]
  218.     result["dqPreclose"] = data["s_dq_preclose"]
  219.     result["dqOpen"] = data["s_dq_open"]
  220.     result["dqClose"] = data["s_dq_close"]
  221.     result["dqVolume"] = data["s_dq_volume"]
  222.     result["dqAmount"] = data["s_dq_amount"]
  223.     result["dqTradestatus"] = data["s_dq_tradestatus"]
  224.     result["createDate"] = data["l_createdate"]
  225.     result["createTime"] = data["l_createtime"]
  226.     dataListToIps.append(result)
  227.   return dataListToIps

  228. def synAshareDividendData():
  229.   select_sql = "select a.s_info_windcode vc_wind_code, "\
  230.                "       a.ex_dt l_ex_date, "\
  231.                "       sum(a.cash_dvd_per_sh_pre_tax) en_cash_dvd_per_sh_pre_tax, "\
  232.                "       sum(a.cash_dvd_per_sh_after_tax) en_cash_dvd_per_sh_after_tax "\
  233.                "   from wind2_asharedividend a "\
  234.                " where a.ex_dt is not null "\
  235.                "   and to_number(a.ex_dt) >= :today "\
  236.                "   group by a.s_info_windcode, a.ex_dt order by a.ex_dt desc"
  237.   param = {}
  238.   param["today"] = today
  239.   dataListFromSjzx = []
  240.   dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synAshareDividendData")
  241.   dataListToIps = []
  242.   dataListToIps = fillAshareDividendData(dataListFromSjzx)
  243.   insert_sql = "insert into t_ashare_dividend ( "\
  244.                "         vc_wind_code,l_ex_date,en_cash_dvd_per_sh_pre_tax,en_cash_dvd_per_sh_after_tax) "\
  245.                "  values(:windCode,:exDate,:cashDvdPerShPreTax,:cashDvdPerShAfterTax)"
  246.   if (dataListToIps is not None and len(dataListToIps) > 0):
  247.     updateDataToIps(insert_sql, dataListToIps, "synAshareDividendData")

  248. def fillAshareDividendData(dataListFromSjzx):
  249.   dataListToIps = []
  250.   for data in dataListFromSjzx:
  251.     result = {}
  252.     result["windCode"] = data["vc_wind_code"]
  253.     result["exDate"] = data["l_ex_date"]
  254.     result["cashDvdPerShPreTax"] = data["en_cash_dvd_per_sh_pre_tax"]
  255.     result["cashDvdPerShAfterTax"] = data["en_cash_dvd_per_sh_after_tax"]
  256.     dataListToIps.append(result)
  257.   return dataListToIps

  258. def synCapitalStockWindData():
  259.   select_sql = "select capital.vc_wind_code, " \
  260.                "       capital.l_change_date, " \
  261.                "       capital.l_capital_type, " \
  262.                "       capital.en_captial_value " \
  263.                "  from (select b.f1_0001 vc_wind_code, " \
  264.                "               to_number(a.f50_1432) l_change_date, "\
  265.                "               '1' l_capital_type, " \
  266.                "               a.f27_1432 en_captial_value " \
  267.                "          from wind_tb_object_1432 a, wind_tb_object_0001 b " \
  268.                "        where a.f1_1432 = b.f17_0001 " \
  269.                "          and b.f12_0001 = 'A' " \
  270.                "          and a.ob_is_valid_1432 = '1' " \
  271.                "          and to_number(a.f50_1432) >= :today " \
  272.                "        union all " \
  273.                "        select d.f1_0001 vc_wind_code, " \
  274.                "               to_number(c.f2_1931) l_change_date, "\
  275.                "               '2' l_capital_type, " \
  276.                "               c.f4_1931 en_captial_value " \
  277.                "          from wind_tb_object_1931 c, wind_tb_object_0001 d " \
  278.                "        where c.f1_1931 = d.f16_0001 " \
  279.                "          and d.f12_0001 = 'A' " \
  280.                "          and to_number(c.f2_1931) >= :today) capital " \
  281.                "  order by capital.l_change_date"
  282.   param = {}
  283.   param["today"] = today
  284.   dataListFromSjzx = []
  285.   dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synCapitalStockWindData")
  286.   dataListToIps = []
  287.   dataListToIps = fillCapitalStockWindData(dataListFromSjzx)
  288.   insert_sql = "insert into t_capital_stock_wind ( "\
  289.                "         vc_wind_code,l_change_date,l_capital_type,en_captial_value) "\
  290.                "  values(:windCode,:changeDate,:capitalType,:captialValue)"
  291.   if (dataListToIps is not None and len(dataListToIps) > 0):
  292.     updateDataToIps(insert_sql, dataListToIps, "synCapitalStockWindData")

  293. def fillCapitalStockWindData(dataListFromSjzx):
  294.   dataListToIps = []
  295.   for data in dataListFromSjzx:
  296.     result = {}
  297.     result["windCode"] = data["vc_wind_code"]
  298.     result["changeDate"] = data["l_change_date"]
  299.     result["capitalType"] = data["l_capital_type"]
  300.     result["captialValue"] = data["en_captial_value"]
  301.     dataListToIps.append(result)
  302.   return dataListToIps

  303. def synSwIndustryClass():
  304.   select_sql = "select c.f1_0001 vc_wind_code, " \
  305.                "       c.f6_0001 vc_stock_name, " \
  306.                "       to_number(a.f3_1476) l_import_date, " \
  307.                "       to_number(nvl(a.f4_1476,0)) l_remove_date, " \
  308.                "       a.f5_1476 l_new_flag, " \
  309.                "       b.levelnum-1 l_industry_level, " \
  310.                "       (case when b.levelnum=2 then substr(a.f2_1476,1,4) " \
  311.                "             when b.levelnum=3 then substr(a.f2_1476,1,6) " \
  312.                "             when b.levelnum=4 then substr(a.f2_1476,1,8) " \
  313.                "        else '' end) vc_industry_code, " \
  314.                "       b.name vc_industry_name " \
  315.                "  from wind_tb_object_1476 a, wind_tb_object_1022 b, wind_tb_object_0001 c " \
  316.                " where b.used = '1' " \
  317.                "   and b.code like '61%' " \
  318.                "   and c.f12_0001 = 'A' " \
  319.                "   and a.f1_1476 = c.f16_0001 " \
  320.                "   and (to_number(a.f3_1476) >= :today or  " \
  321.                "        to_number(nvl(a.f4_1476,0)) >= :today) " \
  322.                "   and ((b.levelnum = 2 and substr(b.code,1,4) = substr(a.f2_1476,1,4)) or  " \
  323.                "        (b.levelnum = 3 and substr(b.code,1,6) = substr(a.f2_1476,1,6)) or  " \
  324.                "        (b.levelnum = 4 and substr(b.code,1,8) = substr(a.f2_1476,1,8))) " \
  325.                "   order by b.levelnum "
  326.   param = {}
  327.   param["today"] = today
  328.   dataListFromSjzx = []
  329.   dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synSwIndustryClass")
  330.   dataListToIps = []
  331.   dataListToIps = fillSwIndustryClass(dataListFromSjzx)
  332.   insert_sql = "insert into t_sw_industry_class ( "\
  333.                "         vc_wind_code,vc_stock_name,l_import_date,l_remove_date, "\
  334.                "         l_new_flag,l_industry_level,vc_industry_code,vc_industry_name) "\
  335.                "  values(:windCode,:stockName,:importDate,:removeDate, "\
  336.                "         :newFlag,:industryLevel,:industryCode,:industryName)"
  337.   if (dataListToIps is not None and len(dataListToIps) > 0):
  338.     updateDataToIps(insert_sql, dataListToIps, "synSwIndustryClass")

  339. def fillSwIndustryClass(dataListFromSjzx):
  340.   dataListToIps = []
  341.   for data in dataListFromSjzx:
  342.     result = {}
  343.     result["windCode"] = data["vc_wind_code"]
  344.     result["stockName"] = data["vc_stock_name"]
  345.     result["importDate"] = data["l_import_date"]
  346.     result["removeDate"] = data["l_remove_date"]
  347.     result["newFlag"] = data["l_new_flag"]
  348.     result["industryLevel"] = data["l_industry_level"]
  349.     result["industryCode"] = data["vc_industry_code"]
  350.     result["industryName"] = data["vc_industry_name"]
  351.     dataListToIps.append(result)
  352.   return dataListToIps

  353. if __name__ == "__main__":
  354.   synTradeDayData()
  355.   synSpecialNonTradedayData()
  356.   synStockInfoWind()
  357.   synWind2AshareeodpricesData()
  358.   synAshareDividendData()
  359.   synCapitalStockWindData()
  360.   synSwIndustryClass()
复制代码

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|php中文网 | cnphp.com ( 赣ICP备2021002321号-2 )

GMT+8, 2024-11-22 02:43 , Processed in 0.812303 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.4 Licensed

Copyright © 2001-2020, Tencent Cloud.

申明:本站所有资源皆搜集自网络,相关版权归版权持有人所有,如有侵权,请电邮(fiorkn@foxmail.com)告之,本站会尽快删除。

快速回复 返回顶部 返回列表