migrateRootSourceId.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. """
  2. @author: luojunhui
  3. 迁移rootSourceId
  4. """
  5. import json
  6. import time
  7. import datetime
  8. import schedule
  9. from tqdm import tqdm
  10. from applications import Functions, PQMySQL, log
  11. class UpdateRootSourceId(object):
  12. """
  13. 更新 rootSourceId
  14. """
  15. db_client = PQMySQL()
  16. source_id_list = {
  17. 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
  18. 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
  19. 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
  20. 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
  21. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
  22. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
  23. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
  24. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
  25. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
  26. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1,
  27. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1,
  28. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1,
  29. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1,
  30. "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1,
  31. "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1,
  32. "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1
  33. }
  34. @classmethod
  35. def getDataList(cls, request_time_stamp):
  36. """
  37. :param request_time_stamp:
  38. :return:
  39. """
  40. start_dt = request_time_stamp - 1 * 24 * 3600
  41. sql = f"""
  42. select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
  43. from long_articles_video
  44. where request_time_stamp > {start_dt}
  45. and request_time_stamp < {request_time_stamp}
  46. and content_status = 2;
  47. """
  48. result = cls.db_client.select(sql)
  49. log(
  50. task="migrateRootSourceId",
  51. function="getDataList",
  52. message="一共找到了: {} 条记录".format(len(result))
  53. )
  54. return result
  55. @classmethod
  56. def processEachData(cls, data_tuple):
  57. """
  58. 处理数据
  59. :param data_tuple:
  60. :return:
  61. """
  62. trace_id = data_tuple[0]
  63. gh_id = data_tuple[1]
  64. account_name = data_tuple[2]
  65. title = data_tuple[3]
  66. result_1 = data_tuple[4]
  67. result_2 = data_tuple[5]
  68. result_3 = data_tuple[6]
  69. request_time_stamp = data_tuple[7]
  70. result_list = [result_1, result_2, result_3]
  71. for result in result_list:
  72. if result:
  73. source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
  74. video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0]
  75. sql = f"""
  76. INSERT INTO long_articles_root_source_id
  77. (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id)
  78. values
  79. (%s, %s, %s, %s, %s, %s, %s, %s);
  80. """
  81. try:
  82. cls.db_client.update(
  83. sql=sql,
  84. params=(
  85. source_id,
  86. account_name,
  87. gh_id,
  88. title,
  89. request_time_stamp,
  90. trace_id,
  91. cls.source_id_list.get(source_id, 2),
  92. video_id
  93. )
  94. )
  95. log(
  96. task="migrateRootSourceId",
  97. function="processEachData",
  98. message="更新消息成功",
  99. data={"trace_id": trace_id}
  100. )
  101. except Exception as e:
  102. log(
  103. task="migrateRootSourceId",
  104. function="processEachData",
  105. message="更新消息失败,报错信息是: {}".format(e),
  106. status="fail",
  107. data={"trace_id": trace_id}
  108. )
  109. else:
  110. print("No result")
  111. @classmethod
  112. def sourceIdJob(cls):
  113. """
  114. 执行代码
  115. :return:
  116. """
  117. today_string = datetime.datetime.today().strftime("%Y-%m-%d")
  118. time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
  119. data_list = cls.getDataList(int(time_stamp))
  120. for item in tqdm(data_list):
  121. try:
  122. cls.processEachData(item)
  123. except Exception as e:
  124. print(e)
  125. def source_id_job():
  126. """
  127. :return:
  128. """
  129. S = UpdateRootSourceId()
  130. S.sourceIdJob()
  131. if __name__ == '__main__':
  132. # source_id_job()
  133. schedule.every().day.at("01:00").do(Functions().job_with_thread, source_id_job)
  134. while True:
  135. schedule.run_pending()
  136. time.sleep(1)
  137. # log(
  138. # task="migrateRootSourceId",
  139. # function="main",
  140. # message="迁移 source_id 任务正常执行"
  141. # )