migrateRootSourceId.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """
  2. @author: luojunhui
  3. 迁移rootSourceId
  4. """
  5. import json
  6. import time
  7. import pymysql
  8. import datetime
  9. import schedule
  10. from tqdm import tqdm
  11. from concurrent.futures.thread import ThreadPoolExecutor
  12. from applications import Functions, PQMySQL
  13. class UpdateRootSourceId(object):
  14. """
  15. 更新 rootSourceId
  16. """
  17. db_client = PQMySQL()
  18. source_id_list = {
  19. 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
  20. 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
  21. 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
  22. 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
  23. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
  24. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
  25. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
  26. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
  27. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
  28. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1,
  29. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1,
  30. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1,
  31. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1,
  32. "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1,
  33. "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1,
  34. "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1
  35. }
  36. @classmethod
  37. def getDataList(cls, request_time_stamp):
  38. """
  39. :param request_time_stamp:
  40. :return:
  41. """
  42. start_dt = request_time_stamp - 1 * 24 * 3600
  43. sql = f"""
  44. select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
  45. from long_articles_video
  46. where request_time_stamp > {start_dt}
  47. and request_time_stamp < {request_time_stamp}
  48. and content_status = 2;
  49. """
  50. result = cls.db_client.select(sql)
  51. return result
  52. @classmethod
  53. def processEachData(cls, data_tuple):
  54. """
  55. 处理数据
  56. :param data_tuple:
  57. :return:
  58. """
  59. trace_id = data_tuple[0]
  60. gh_id = data_tuple[1]
  61. account_name = data_tuple[2]
  62. title = data_tuple[3]
  63. result_1 = data_tuple[4]
  64. result_2 = data_tuple[5]
  65. result_3 = data_tuple[6]
  66. request_time_stamp = data_tuple[7]
  67. result_list = [result_1, result_2, result_3]
  68. for result in result_list:
  69. if result:
  70. source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
  71. video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0]
  72. sql = f"""
  73. INSERT INTO long_articles_root_source_id
  74. (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id)
  75. values
  76. (%s, %s, %s, %s, %s, %s, %s, %s);
  77. """
  78. cls.db_client.update(
  79. sql=sql,
  80. params=(
  81. source_id,
  82. account_name,
  83. gh_id,
  84. title,
  85. request_time_stamp,
  86. trace_id,
  87. cls.source_id_list.get(source_id, 2),
  88. video_id
  89. )
  90. )
  91. else:
  92. print("No result")
  93. @classmethod
  94. def sourceIdJob(cls):
  95. """
  96. 执行代码
  97. :return:
  98. """
  99. today_string = datetime.datetime.today().strftime("%Y-%m-%d")
  100. time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
  101. data_list = cls.getDataList(int(time_stamp))
  102. for item in tqdm(data_list):
  103. try:
  104. cls.processEachData(item)
  105. except Exception as e:
  106. print(e)
  107. def source_id_job():
  108. """
  109. :return:
  110. """
  111. S = UpdateRootSourceId()
  112. S.sourceIdJob()
  113. if __name__ == '__main__':
  114. # source_id_job()
  115. schedule.every().day.at("01:00").do(Functions().job_with_thread, source_id_job)
  116. while True:
  117. schedule.run_pending()
  118. time.sleep(1)