""" @author: luojunhui """ import asyncio from static.config import db_article, db_video from applications.schedule import search_videos from applications.functions.log import logging from static.config import spider_coroutines class MatchTask1(object): """ 定时执行任务 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client async def get_task(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT DISTINCT (content_id) FROM {db_article} WHERE content_status = 0 and process_times <= 3 ORDER BY request_time_stamp ASC LIMIT {spider_coroutines}; """ content_ids = await self.mysql_client.async_select(select_sql1) cil = [] for content_id in content_ids: cil.append(content_id[0]) content_ids_tuple = str(cil).replace("[", "(").replace("]", ")") if len(content_ids_tuple) > 3: select_sql = f""" SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times FROM {db_article} WHERE content_id in {content_ids_tuple} and process_times <= 3 ORDER BY request_time_stamp ASC; """ task_list = await self.mysql_client.async_select(sql=select_sql) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "title": item[3], "text": item[4], "content_status": item[5], "process_times": item[6] } for item in task_list ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list else: return [] async def get_history_videos(self, content_id): """ check whether the contents videos exists :param content_id: :return: """ select_sql = f""" SELECT video_id FROM {db_video} where content_id = '{content_id}' and video_status = 1 order by request_time DESC; """ content_videos = await self.mysql_client.async_select(select_sql) videos = [vid for vid in content_videos] if len(videos) >= 3: return videos else: return None async def judge_content_processing(self, content_id): """ 判断该content_id是否在处理中 :param content_id: :return: """ select_sql = f""" SELECT trace_id, content_status FROM {db_article} WHERE content_id = '{content_id}' ORDER BY id DESC; """ result = await self.mysql_client.async_select(select_sql) if result: for item in result: trace_id, content_status = item if content_status == 1: return False return True else: return True async def use_exists_contents_videos(self, video_id_list, params): """ 使用已经存在的视频id :return: """ trace_id = params['trace_id'] content_id = params['content_id'] select_sql = f""" SELECT kimi_title FROM {db_article} WHERE content_id = '{content_id}' and kimi_title is not null limit 1; """ info = await self.mysql_client.async_select(sql=select_sql) kimi_title = info[0] update_sql = f""" UPDATE {db_article} SET kimi_title=%s, recall_video_id1=%s, recall_video_id2=%s, recall_video_id3=%s, content_status=%s, process_times = %s WHERE trace_id = %s """ vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2] await self.mysql_client.async_insert( sql=update_sql, params=( kimi_title, video_id_list[0], "NULL" if vid2 is None else vid2, "NULL" if vid3 is None else vid3, 2, int(params['process_times']) + 1, trace_id ) ) logging( code="9002", info="已从历史文章更新,文章id: {}".format(content_id), trace_id=trace_id ) async def start_process(self, params): """ 开始处理 :param params: :return: """ # 更新文章contentId为1, 说明该文章正在处理中 update_sql = f""" UPDATE {db_article} SET content_status = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=( 1, params['trace_id'] ) ) try: video_count = await search_videos( params={ "title": params['title'], "content": params['text'], "trace_id": params['trace_id'], "content_id": params['content_id'] }, trace_id=params['trace_id'], gh_id=params['gh_id'], mysql_client=self.mysql_client ) select_sql = f""" SELECT video_id FROM {db_video} WHERE content_id = '{params['content_id']}' """ result = await self.mysql_client.async_select(sql=select_sql) vid1, vid2, vid3 = result[0], result[1], result[2] if vid1 or vid2 or vid3: update_sql2 = f""" UPDATE {db_article} SET recall_video_id1 = %s, recall_video_id2 = %s, recall_video_id3 = %s, content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql2, params=( vid1 if vid1 else "NULL", vid2 if vid2 else "NULL", vid3 if vid3 else "NULL", 2, {int(params['process_times']) + 1}, params['trace_id'] ) ) logging( code="9008", info="视频搜索成功, 状态修改为2", trace_id=params['trace_id'] ) else: if int(params['process_times']) < 3: update_sql3 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql3, params=(0, int(params['process_times']) + 1, params['trace_id']) ) logging( code="9018", info="视频搜索失败,回退状态为0", trace_id=params['trace_id'] ) else: update_sql3 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql3, params=(3, int(params['process_times']) + 1, params['trace_id']) ) logging( code="9019", info="视频多次搜索失败,状态修改为3", trace_id=params['trace_id'] ) except Exception as e: if int(params['process_times']) < 3: logging( code="9018", info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e), trace_id=params['trace_id'] ) update_sql4 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql4, params=(0, int(params['process_times']) + 1, params['trace_id']) ) else: logging( code="9019", info="{}异常错误:{}, 状态修改为3".format(params['trace_id'], e), trace_id=params['trace_id'] ) update_sql4 = f""" UPDATE {db_article} SET content_status = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_sql4, params=(3, int(params['process_times']) + 1, params['trace_id']) ) async def process_task(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点 video_id_list = await self.get_history_videos(content_id=content_id) if video_id_list: # 说明已经存在了结果, 将该条记录下的video_id拿出来 logging( code="9001", info="存在历史文章", trace_id=trace_id ) # await self.use_exists_contents_videos(video_id_list=video_id_list, params=params) else: flag = await self.judge_content_processing(content_id) if flag: logging( code="9004", info="无正在处理的文章ID, 开始处理", trace_id=trace_id ) await self.start_process(params=params) else: logging( code="9003", info="该文章ID正在请求--文章ID {}".format(content_id), trace_id=trace_id ) async def deal(self): """ 处理 :return: """ task_list = await self.get_task() task_dict = {} for task in task_list: key = task['content_id'] task_dict[key] = task process_list = [] for item in task_dict: process_list.append(task_dict[item]) if process_list: # for task in task_list: # await self.process_task(task) tasks = [self.process_task(params) for params in process_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )