update_rootSourceId.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """
  2. @author: luojunhui
  3. 迁移rootSourceId
  4. """
  5. import json
  6. import pymysql
  7. import datetime
  8. from concurrent.futures.thread import ThreadPoolExecutor
  9. source_list = [
  10. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b",
  11. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0",
  12. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a",
  13. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c",
  14. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56",
  15. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2"
  16. ]
  17. source_id_list = {
  18. 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
  19. 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
  20. 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
  21. 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
  22. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
  23. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
  24. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
  25. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
  26. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
  27. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1,
  28. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1,
  29. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1,
  30. "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1,
  31. "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1,
  32. "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1,
  33. "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1
  34. }
  35. def get_data_list(request_time_stamp):
  36. """
  37. 获取数据
  38. :param request_time_stamp:
  39. :return:
  40. """
  41. connection = pymysql.connect(
  42. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  43. port=3306,
  44. user="crawler",
  45. passwd="crawler123456@",
  46. db="piaoquan-crawler",
  47. charset="utf8mb4"
  48. )
  49. sql = f"""
  50. select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
  51. from long_articles_video where %s < request_time_stamp < %s;
  52. """
  53. cursor = connection.cursor()
  54. cursor.execute(
  55. sql,
  56. (request_time_stamp - 24 * 60 * 60, request_time_stamp)
  57. )
  58. print(request_time_stamp)
  59. data = cursor.fetchall()
  60. return data
  61. def process_each_data(data_tuple):
  62. """
  63. 处理数据
  64. :param data_tuple:
  65. :return:
  66. """
  67. trace_id = data_tuple[0]
  68. gh_id = data_tuple[1]
  69. account_name = data_tuple[2]
  70. title = data_tuple[3]
  71. result_1 = data_tuple[4]
  72. result_2 = data_tuple[5]
  73. result_3 = data_tuple[6]
  74. request_time_stamp = data_tuple[7]
  75. result_list = [result_1, result_2, result_3]
  76. for result in result_list:
  77. if result:
  78. connection = pymysql.connect(
  79. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  80. port=3306,
  81. user="crawler",
  82. passwd="crawler123456@",
  83. db="piaoquan-crawler",
  84. charset="utf8mb4"
  85. )
  86. source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
  87. video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0]
  88. sql = f"""
  89. INSERT INTO long_articles_root_source_id
  90. (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id)
  91. values
  92. (%s, %s, %s, %s, %s, %s, %s, %s);
  93. """
  94. cursor = connection.cursor()
  95. cursor.execute(
  96. sql,
  97. (
  98. source_id,
  99. account_name,
  100. gh_id,
  101. title,
  102. request_time_stamp,
  103. trace_id,
  104. source_id_list.get(source_id, 2),
  105. video_id
  106. )
  107. )
  108. connection.commit()
  109. else:
  110. print("No result")
  111. def source_id_job():
  112. """
  113. 执行代码
  114. :return:
  115. """
  116. today_string = datetime.datetime.today().strftime("%Y-%m-%d")
  117. time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
  118. data_list = get_data_list(int(time_stamp))
  119. with ThreadPoolExecutor(max_workers=10) as Pool:
  120. Pool.map(process_each_data, data_list)
  121. # if __name__ == '__main__':
  122. # schedule.every().day.at("01:00").do(job)
  123. # while True:
  124. # schedule.run_pending()
  125. # time.sleep(1)