123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- # encodings : utf-8
- """
- @author: luojunhui
- """
- import datetime
- import time
- import json
- import pymysql
- import requests
- from functions import MatchRate
- def find_fail_data(dt):
- """
- 查找1h失败的数据
- :param dt:
- :return:
- """
- M = MatchRate()
- time_stamp_list = M.generate_stamp_list(dt, dt)
- c = 0
- for item in time_stamp_list:
- s_d = int(item)
- e_d = int(item) + 24 * 60 * 60 * 1000
- result = M.match_rate(s_d, e_d)
- s = 0
- f = 0
- p = 0
- w = []
- for obj in result:
- if obj[0] == 2:
- s += 1
- elif obj[0] == 3:
- f += 1
- elif obj[0] == 1:
- p += 1
- w.append(obj[1])
- c += 1
- long_time_data = [i for i in w if int(time.time()) - int(i.split("-")[-1]) > 3600]
- return tuple(long_time_data)
- def find_defeat_info(trace_id_tuple):
- """
- 查找失败的视频
- :return:
- """
- select_sql = f"""
- select trace_id, article_title, gh_id, `kimi_summary` , `kimi_keys`
- from `long_articles_video` where `trace_id` in {trace_id_tuple};
- """
- connection = pymysql.connect(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- port=3306, # 端口号
- user="crawler", # mysql用户名
- passwd="crawler123456@", # mysql用户登录密码
- db="piaoquan-crawler", # 数据库名
- charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- )
- cursor = connection.cursor()
- cursor.execute(select_sql)
- fail_list = cursor.fetchall()
- return fail_list
- def request_for_research(result):
- """
- research from new machine
- :param result:
- """
- params = {
- "trace_id": result[0],
- "title": result[1],
- "ghId": result[2],
- "kimi_summary": result[3],
- "kimi_keys": result[4]
- }
- url = "http://47.99.132.47:8111/re_search_videos"
- a = time.time()
- header = {
- "Content-Type": "application/json",
- }
- response = requests.post(url, json=params, headers=header, timeout=600)
- b = time.time()
- print("total cost: ", b - a, " s")
- print(json.dumps(response.json(), ensure_ascii=False, indent=4))
- def job2():
- """
- 定时任务
- :return:
- """
- date_str = datetime.datetime.today().strftime("%Y%m%d")
- trace_id_t = find_fail_data(dt=date_str)
- if trace_id_t:
- fail_list = find_defeat_info(trace_id_t)
- now_time_str = datetime.datetime.now().__str__()
- if fail_list:
- print("{} find {} defeat requests".format(now_time_str, len(fail_list)))
- count = 1
- for obj in fail_list:
- print(obj)
- request_for_research(obj)
- count += 1
- print("{} success re_search {} defeat requests".format(now_time_str, count))
- else:
- print("{} No videos Find".format(now_time_str))
- if __name__ == '__main__':
- while True:
- now_time_str = datetime.datetime.now().__str__()
- job2()
- print("{}: 执行程序完成, 等待一小时".format(now_time_str))
- time.sleep(60 * 60)
|