research_app.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. import time
  6. import json
  7. import pymysql
  8. import requests
  9. from functions import MatchRate
  10. def find_fail_data(dt):
  11. """
  12. 查找1h失败的数据
  13. :param dt:
  14. :return:
  15. """
  16. M = MatchRate()
  17. time_stamp_list = M.generate_stamp_list(dt, dt)
  18. for item in time_stamp_list:
  19. s_d = int(item)
  20. e_d = int(item) + 24 * 60 * 60 * 1000
  21. result = M.match_rate(s_d, e_d)
  22. s = 0
  23. f = 0
  24. p = 0
  25. w = []
  26. for obj in result:
  27. if obj[0] == 2:
  28. s += 1
  29. elif obj[0] == 3:
  30. f += 1
  31. elif obj[0] == 1:
  32. p += 1
  33. w.append(obj[1])
  34. long_time_data = [i for i in w if int(time.time()) - int(i.split("-")[-1]) > 3600]
  35. return tuple(long_time_data)
  36. def find_defeat_info(trace_id_tuple):
  37. """
  38. 查找失败的视频
  39. :return:
  40. """
  41. select_sql = f"""
  42. select trace_id, article_title, article_text, gh_id, account_name
  43. from `long_articles_video` where `trace_id` in {trace_id_tuple};
  44. """
  45. connection = pymysql.connect(
  46. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
  47. port=3306, # 端口号
  48. user="crawler", # mysql用户名
  49. passwd="crawler123456@", # mysql用户登录密码
  50. db="piaoquan-crawler", # 数据库名
  51. charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
  52. )
  53. cursor = connection.cursor()
  54. cursor.execute(select_sql)
  55. fail_list = cursor.fetchall()
  56. return fail_list
  57. def request_for_research(result):
  58. """
  59. research from new machine
  60. :param result:
  61. """
  62. params = {
  63. "trace_id": result[0],
  64. "title": result[1],
  65. "ghId": result[3],
  66. "content": result[2],
  67. "accountName": result[4]
  68. }
  69. url = "http://47.99.132.47:8111/re_search_videos"
  70. a = time.time()
  71. header = {
  72. "Content-Type": "application/json",
  73. }
  74. response = requests.post(url, json=params, headers=header, timeout=600)
  75. b = time.time()
  76. print("total cost: ", b - a, " s")
  77. print(json.dumps(response.json(), ensure_ascii=False, indent=4))
  78. def job2():
  79. """
  80. 定时任务
  81. :return:
  82. """
  83. date_str = datetime.datetime.today().strftime("%Y%m%d")
  84. trace_id_t = find_fail_data(dt=date_str)
  85. fail_list = find_defeat_info(trace_id_t)
  86. now_time_str = datetime.datetime.now().__str__()
  87. if fail_list:
  88. print("{} find {} defeat requests".format(now_time_str, len(fail_list)))
  89. count = 1
  90. for obj in fail_list:
  91. request_for_research(obj)
  92. count += 1
  93. print("{} success re_search {} defeat requests".format(now_time_str, count))
  94. else:
  95. print("{} No videos Find".format(now_time_str))
  96. if __name__ == '__main__':
  97. while True:
  98. now_time_str = datetime.datetime.now().__str__()
  99. job2()
  100. print("{}: 执行程序完成, 等待一小时".format(now_time_str))
  101. time.sleep(60 * 60)