update_rootSourceId.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. """
  2. @author: luojunhui
  3. 迁移rootSourceId
  4. """
  5. import json
  6. import time
  7. import pymysql
  8. import datetime
  9. import schedule
  10. from concurrent.futures.thread import ThreadPoolExecutor
  11. source_list = [
  12. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b",
  13. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0",
  14. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a",
  15. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c",
  16. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56",
  17. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2"
  18. ]
  19. source_id_list = {
  20. 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
  21. 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
  22. 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
  23. 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
  24. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
  25. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
  26. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
  27. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
  28. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
  29. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1
  30. }
  31. def get_data_list(request_time_stamp):
  32. """
  33. 获取数据
  34. :param request_time_stamp:
  35. :return:
  36. """
  37. connection = pymysql.connect(
  38. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  39. port=3306,
  40. user="crawler",
  41. passwd="crawler123456@",
  42. db="piaoquan-crawler",
  43. charset="utf8mb4"
  44. )
  45. sql = f"""
  46. select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
  47. from long_articles_video where %s < request_time_stamp < %s;
  48. """
  49. cursor = connection.cursor()
  50. cursor.execute(
  51. sql,
  52. (request_time_stamp - 24 * 60 * 60, request_time_stamp)
  53. )
  54. print(request_time_stamp)
  55. data = cursor.fetchall()
  56. return data
  57. def process_each_data(data_tuple):
  58. """
  59. 处理数据
  60. :param data_tuple:
  61. :return:
  62. """
  63. trace_id = data_tuple[0]
  64. gh_id = data_tuple[1]
  65. account_name = data_tuple[2]
  66. title = data_tuple[3]
  67. result_1 = data_tuple[4]
  68. result_2 = data_tuple[5]
  69. result_3 = data_tuple[6]
  70. request_time_stamp = data_tuple[7]
  71. result_list = [result_1, result_2, result_3]
  72. for result in result_list:
  73. if result:
  74. connection = pymysql.connect(
  75. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  76. port=3306,
  77. user="crawler",
  78. passwd="crawler123456@",
  79. db="piaoquan-crawler",
  80. charset="utf8mb4"
  81. )
  82. source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
  83. sql = f"""
  84. INSERT INTO long_articles_root_source_id
  85. (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type)
  86. values
  87. (%s, %s, %s, %s, %s, %s, %s);
  88. """
  89. cursor = connection.cursor()
  90. cursor.execute(
  91. sql,
  92. (
  93. source_id,
  94. account_name,
  95. gh_id,
  96. title,
  97. request_time_stamp,
  98. trace_id,
  99. source_id_list.get(source_id, 2)
  100. )
  101. )
  102. connection.commit()
  103. else:
  104. print("No result")
  105. def source_id_job():
  106. """
  107. 执行代码
  108. :return:
  109. """
  110. today_string = datetime.datetime.today().strftime("%Y-%m-%d")
  111. time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
  112. data_list = get_data_list(int(time_stamp))
  113. with ThreadPoolExecutor(max_workers=10) as Pool:
  114. Pool.map(process_each_data, data_list)
  115. # if __name__ == '__main__':
  116. # schedule.every().day.at("01:00").do(job)
  117. # while True:
  118. # schedule.run_pending()
  119. # time.sleep(1)