""" @author: luojunhui """ import asyncio from static.config import db_article from applications.schedule import search_videos from applications.functions.log import logging from static.config import mysql_coroutines class MatchTask2(object): """ 定时执行任务 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client async def get_task(self): """ 获取任务 :return: """ select_sql = f""" SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times FROM {db_article} WHERE content_status = 0 and process_times <= 5 ORDER BY request_time_stamp ASC LIMIT {mysql_coroutines}; """ task_list = await self.mysql_client.async_select(sql=select_sql) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "title": item[3], "text": item[4], "content_status": item[5], "process_times": item[6] } for item in task_list ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list async def get_history_contents(self, content_id): """ check whether the content id exists :return: trace_id or None """ select_sql = f""" SELECT trace_id, content_status FROM {db_article} WHERE content_id = '{content_id}' ORDER BY id DESC; """ result = await self.mysql_client.async_select(select_sql) if result: for item in result: trace_id, content_status = item if content_status == 2: return trace_id elif content_status == 3: update_sql = f""" UPDATE {db_article} SET content_status = 3 WHERE trace_id = %s; """ await self.mysql_client.async_insert(update_sql, trace_id) else: continue return None else: return None async def insert_history_contents_videos(self, history_trace_id, params): """ 插入历史视频id :return: """ select_sql = f""" SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3 FROM {db_article} WHERE trace_id = '{history_trace_id}'; """ info = await self.mysql_client.async_select(sql=select_sql) kimi_title, vid1, vid2, vid3 = info[0] update_sql = f""" UPDATE {db_article} SET kimi_title=%s, recall_video_id1=%s, recall_video_id2=%s, recall_video_id3=%s, content_status=%s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( update_sql, params=( kimi_title, vid1, {"NULL" if vid2 is None else vid2}, {"NULL" if vid2 is None else vid3}, 2, int(params['process_times']) + 1, params['trace_id'] ) ) logging( code="9002", info="已从历史文章更新,历史id: {}".format(history_trace_id), trace_id=params['trace_id'] ) async def process_video_id(self, title, trace_id, process_times): """ 如果video_id在标题中,则做特殊处理 :return: """ video_id = title.split("video_id=")[-1] update_sql = f""" UPDATE {db_article} SET recall_video_id1 = %s, content_status = %s, process_times = %s WHERE trace_id = %s;""" await self.mysql_client.async_insert( sql=update_sql, params=(video_id, 2, int(process_times) + 1, trace_id) ) async def start_process(self, params): """ 开始处理 :param params: :return: """ # 更新文章contentId为1, 说明该文章正在处理中 update_sql = f""" UPDATE {db_article} SET content_status = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=(1, params['trace_id']) ) try: # 判断标题中是否包含video_id if "video_id=" in params['title']: logging( code="9006", info="视频生成文本测试", trace_id=params['trace_id'] ) await self.process_video_id( title=params['title'], trace_id=params['trace_id'], process_times=params['process_times'] ) else: await search_videos( params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']}, trace_id=params['trace_id'], gh_id=params['gh_id'], mysql_client=self.mysql_client ) # 执行完成之后,判断是否存在视频id select_sql = f""" SELECT recall_video_id1, recall_video_id2, recall_video_id3 FROM {db_article} WHERE trace_id = '{params["trace_id"]}'; """ result = await self.mysql_client.async_select(sql=select_sql) vid1, vid2, vid3 = result[0] if vid1: update_sql2 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql2, params=(2, int(params['process_times']) + 1, params['trace_id']) ) logging( code="9008", info="视频搜索成功, 状态修改为2", trace_id=params['trace_id'] ) else: update_sql3 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql3, params=(0, int(params['process_times']) + 1, params["trace_id"]) ) logging( code="9018", info="视频搜索失败,回退状态为0", trace_id=params['trace_id'] ) except Exception as e: logging( code="9018", info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e), trace_id=params['trace_id'] ) update_sql4 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql4, params=(0, int(params['process_times']) + 1, params["trace_id"]) ) async def process_task(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] # 判断该文章是否已经生成了 history_trace_id = await self.get_history_contents(content_id) if history_trace_id: # 说明已经存在了结果, 将该条记录下的video_id拿出来 logging( code="9001", info="存在历史文章", trace_id=trace_id, function="find_history_article" ) await self.insert_history_contents_videos(history_trace_id, params) else: logging( code="9003", info="未找到历史文章", trace_id=trace_id, function="find_history_article" ) async def deal(self): """ 处理 :return: """ task_list = await self.get_task() if task_list: tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )