update_rootSourceId.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. @author: luojunhui
  3. 迁移rootSourceId
  4. """
  5. import json
  6. import time
  7. import pymysql
  8. import datetime
  9. import schedule
  10. from concurrent.futures.thread import ThreadPoolExecutor
  11. source_list = [
  12. "longArticles_3f4d2a1c1cece4cf88d348f46fa3c20d",
  13. "longArticles_2872ec9931e405b04c70b5b35d64fa07",
  14. "longArticles_8ceadda6dfd935c9f85c5f3b5abd32a0",
  15. "longArticles_6b1f49e1bc19e22b1a2c2c252722f099",
  16. "longArticles_480827356b0eabc4b03b21acf4c2e664",
  17. "longArticles_60ac70dc7cbf8bf40fac1d51c007213e",
  18. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b",
  19. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0",
  20. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a",
  21. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c",
  22. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56",
  23. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2"
  24. ]
  25. source_id_list = {
  26. "longArticles_3f4d2a1c1cece4cf88d348f46fa3c20d": 1,
  27. "longArticles_2872ec9931e405b04c70b5b35d64fa07": 1,
  28. "longArticles_8ceadda6dfd935c9f85c5f3b5abd32a0": 1,
  29. "longArticles_6b1f49e1bc19e22b1a2c2c252722f099": 1,
  30. "longArticles_480827356b0eabc4b03b21acf4c2e664": 1,
  31. "longArticles_60ac70dc7cbf8bf40fac1d51c007213e": 1,
  32. "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
  33. "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
  34. "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
  35. "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
  36. "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
  37. "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1
  38. }
  39. def get_data_list(request_time_stamp):
  40. """
  41. 获取数据
  42. :param request_time_stamp:
  43. :return:
  44. """
  45. connection = pymysql.connect(
  46. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  47. port=3306,
  48. user="crawler",
  49. passwd="crawler123456@",
  50. db="piaoquan-crawler",
  51. charset="utf8mb4"
  52. )
  53. sql = f"""
  54. select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
  55. from long_articles_video where %s < request_time_stamp < %s;
  56. """
  57. cursor = connection.cursor()
  58. cursor.execute(
  59. sql,
  60. (request_time_stamp - 24 * 60 * 60, request_time_stamp)
  61. )
  62. print(request_time_stamp)
  63. data = cursor.fetchall()
  64. return data
  65. def process_each_data(data_tuple):
  66. """
  67. 处理数据
  68. :param data_tuple:
  69. :return:
  70. """
  71. trace_id = data_tuple[0]
  72. gh_id = data_tuple[1]
  73. account_name = data_tuple[2]
  74. title = data_tuple[3]
  75. result_1 = data_tuple[4]
  76. result_2 = data_tuple[5]
  77. result_3 = data_tuple[6]
  78. request_time_stamp = data_tuple[7]
  79. result_list = [result_1, result_2, result_3]
  80. for result in result_list:
  81. if result:
  82. connection = pymysql.connect(
  83. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  84. port=3306,
  85. user="crawler",
  86. passwd="crawler123456@",
  87. db="piaoquan-crawler",
  88. charset="utf8mb4"
  89. )
  90. source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
  91. sql = f"""
  92. INSERT INTO long_articles_root_source_id
  93. (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type)
  94. values
  95. (%s, %s, %s, %s, %s, %s, %s);
  96. """
  97. cursor = connection.cursor()
  98. cursor.execute(
  99. sql,
  100. (
  101. source_id,
  102. account_name,
  103. gh_id,
  104. title,
  105. request_time_stamp,
  106. trace_id,
  107. source_id_list.get(source_id, 2)
  108. )
  109. )
  110. connection.commit()
  111. else:
  112. print("No result")
  113. def source_id_job():
  114. """
  115. 执行代码
  116. :return:
  117. """
  118. today_string = datetime.datetime.today().strftime("%Y-%m-%d")
  119. time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
  120. data_list = get_data_list(int(time_stamp))
  121. with ThreadPoolExecutor(max_workers=10) as Pool:
  122. Pool.map(process_each_data, data_list)
  123. # if __name__ == '__main__':
  124. # schedule.every().day.at("01:00").do(job)
  125. # while True:
  126. # schedule.run_pending()
  127. # time.sleep(1)