migrateRootSourceId.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """
  2. @author: luojunhui
  3. 迁移rootSourceId
  4. """
  5. import json
  6. import time
  7. import datetime
  8. import schedule
  9. from tqdm import tqdm
  10. from applications import Functions, longArticlesMySQL, PQMySQL, log
  11. class UpdateRootSourceId(object):
  12. """
  13. 更新 rootSourceId
  14. """
  15. db_client = PQMySQL()
  16. lam = longArticlesMySQL()
  17. source_id_list = {
  18. 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
  19. 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
  20. 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
  21. 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
  22. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
  23. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
  24. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
  25. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
  26. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
  27. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1,
  28. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1,
  29. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1,
  30. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1,
  31. "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1,
  32. "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1,
  33. "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1
  34. }
  35. @classmethod
  36. def getDataList(cls, request_timestamp):
  37. """
  38. :param request_timestamp:
  39. :return:
  40. """
  41. start_dt = request_timestamp - 1 * 24 * 3600
  42. sql = f"""
  43. select trace_id, gh_id, account_name, response, request_timestamp
  44. from long_articles_match_videos
  45. where request_timestamp >= {start_dt}
  46. and request_timestamp <= {request_timestamp}
  47. and content_status = 4;
  48. """
  49. result = cls.lam.select(sql)
  50. log(
  51. task="migrateRootSourceId",
  52. function="getDataList",
  53. message="一共找到了: {} 条记录".format(len(result))
  54. )
  55. return result
  56. @classmethod
  57. def processEachData(cls, data_tuple):
  58. """
  59. 处理数据
  60. :param data_tuple:
  61. :return:
  62. """
  63. trace_id = data_tuple[0]
  64. gh_id = data_tuple[1]
  65. account_name = data_tuple[2]
  66. request_timestamp = data_tuple[4]
  67. response = json.loads(data_tuple[3])
  68. if response:
  69. for result in response:
  70. source_id = result['rootSourceId']
  71. video_id = result['videoId']
  72. sql = f"""
  73. INSERT INTO long_articles_root_source_id
  74. (rootSourceId, accountName, ghId, requestTime, trace_id, push_type, video_id)
  75. values
  76. (%s, %s, %s, %s, %s, %s, %s);
  77. """
  78. try:
  79. cls.db_client.update(
  80. sql=sql,
  81. params=(
  82. source_id,
  83. account_name,
  84. gh_id,
  85. request_timestamp,
  86. trace_id,
  87. cls.source_id_list.get(source_id, 2),
  88. video_id
  89. )
  90. )
  91. log(
  92. task="migrateRootSourceId",
  93. function="processEachData",
  94. message="更新消息成功",
  95. data={"trace_id": trace_id}
  96. )
  97. except Exception as e:
  98. log(
  99. task="migrateRootSourceId",
  100. function="processEachData",
  101. message="更新消息失败,报错信息是: {}".format(e),
  102. status="fail",
  103. data={"trace_id": trace_id}
  104. )
  105. else:
  106. print("No result")
  107. @classmethod
  108. def sourceIdJob(cls):
  109. """
  110. 执行代码
  111. :return:
  112. """
  113. today_string = datetime.datetime.today().strftime("%Y-%m-%d")
  114. time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
  115. data_list = cls.getDataList(int(time_stamp))
  116. for item in tqdm(data_list):
  117. try:
  118. cls.processEachData(item)
  119. except Exception as e:
  120. print(e)
  121. def source_id_job():
  122. """
  123. :return:
  124. """
  125. S = UpdateRootSourceId()
  126. S.sourceIdJob()
  127. if __name__ == '__main__':
  128. # source_id_job()
  129. schedule.every().day.at("02:00").do(Functions().job_with_thread, source_id_job)
  130. while True:
  131. schedule.run_pending()
  132. time.sleep(1)