""" @author: luojunhui """ from applications.static.config import db_article from applications.schedule import search_videos class ProcessDeal(object): """ 定时执行任务 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client async def get_task(self): """ 获取任务 :return: """ select_sql = f""" SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times FROM {db_article} WHERE content_status = 0 and process_times <= 5 ORDER BY request_time_stamp ASC LIMIT 10; """ print(select_sql) task_list = await self.mysql_client.async_select(sql=select_sql) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "title": item[3], "text": item[4], "content_status": item[5], "process_times": item[6] } for item in task_list ] return task_obj_list async def get_history_contents(self, content_id): """ check whether the content id exists :return: trace_id or None """ select_sql = f""" SELECT trace_id, content_status FROM {db_article} WHERE content_id = '{content_id}' ORDER BY id DESC; """ result = await self.mysql_client.async_select(select_sql) if result: for item in result: trace_id, content_status = item if content_status == 2: return trace_id else: continue return None else: return None async def judge_content_processing(self, content_id): """ 判断该content_id是否在处理中 :param content_id: :return: """ select_sql = f""" SELECT trace_id, content_status FROM {db_article} WHERE content_id = '{content_id}' ORDER BY id DESC; """ result = await self.mysql_client.async_select(select_sql) if result: for item in result: trace_id, content_status = item if content_status == 1: return False return True else: return True async def insert_history_contents_videos(self, history_trace_id, params): """ 插入历史视频id :return: """ select_sql = f""" SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3 FROM {db_article} WHERE trace_id = '{history_trace_id}'; """ info = await self.mysql_client.async_select(sql=select_sql) kimi_title, vid1, vid2, vid3 = info[0] update_sql = f""" UPDATE {db_article} SET kimi_title='{kimi_title}', recall_video_id1={vid1}, recall_video_id2={"NULL" if vid2 is None else vid2}, recall_video_id3={"NULL" if vid3 is None else vid3}, content_status=2, process_times = {int(params['process_times']) + 1} WHERE trace_id = '{params['trace_id']}' """ await self.mysql_client.async_insert(update_sql) async def process_video_id(self, title, trace_id, process_times): """ 如果video_id在标题中,则做特殊处理 :return: """ video_id = title.split("video_id=")[-1] update_sql = f""" UPDATE {db_article} SET recall_video_id1 = '{video_id}', content_status = 2, process_times = {int(process_times) + 1} WHERE trace_id = '{trace_id}';""" await self.mysql_client.async_insert(update_sql) async def start_process(self, params): """ 开始处理 :param params: :return: """ # 更新文章contentId为1, 说明该文章正在处理中 update_sql = f""" UPDATE {db_article} SET content_status = 1 WHERE trace_id = '{params["trace_id"]}' """ await self.mysql_client.async_insert(sql=update_sql) try: # 判断标题中是否包含video_id if "video_id=" in params['title']: await self.process_video_id( title=params['title'], trace_id=params['trace_id'], process_times=params['process_times'] ) else: print("开始搜索视频") await search_videos( params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']}, trace_id=params['trace_id'], gh_id=params['gh_id'], mysql_client=self.mysql_client ) # 执行完成之后,判断是否存在视频id select_sql = f""" SELECT recall_video_id1, recall_video_id2, recall_video_id3 FROM {db_article} WHERE trace_id = '{params["trace_id"]}'; """ result = await self.mysql_client.async_select(sql=select_sql) vid1, vid2, vid3 = result[0] if vid1: update_sql2 = f""" UPDATE {db_article} SET content_status = 2, process_times = {int(params['process_times']) + 1} WHERE trace_id = '{params["trace_id"]}'; """ await self.mysql_client.async_insert(sql=update_sql2) print("搜索视频成功") else: print("搜索视频失败") update_sql3 = f""" UPDATE {db_article} SET content_status = 0, process_times = {int(params['process_times']) + 1} WHERE trace_id = '{params["trace_id"]}'; """ await self.mysql_client.async_insert(sql=update_sql3) except Exception as e: print("{}异常错误:{}".format(params['trace_id'], e)) update_sql4 = f""" UPDATE {db_article} SET content_status = 0, process_times = {int(params['process_times']) + 1} WHERE trace_id = '{params["trace_id"]}'; """ await self.mysql_client.async_insert(sql=update_sql4) async def deal(self): """ 处理 :return: """ task_list = await self.get_task() if task_list: for params in task_list: content_id = params['content_id'] # 判断该文章是否已经生成了 history_trace_id = await self.get_history_contents(content_id) if history_trace_id: # 说明已经存在了结果, 将该条记录下的video_id拿出来 print("该文章已经成功请求") await self.insert_history_contents_videos(history_trace_id, params) else: flag = await self.judge_content_processing(content_id) if flag: print("开始处理这条视频") await self.start_process(params=params) else: print("该文章id正在处理中") else: print("没有要处理的视频")