""" @author: luojunhui """ import asyncio from static.config import db_article, db_video from applications.functions.log import logging from static.config import mysql_coroutines class MatchTask5(object): """ 定时执行任务 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client async def get_task(self): """ 获取任务 :return: """ select_sql = f""" SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times FROM {db_article} WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子' ORDER BY request_time_stamp DESC LIMIT {mysql_coroutines}; """ task_list = await self.mysql_client.async_select(sql=select_sql) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "title": item[3], "text": item[4], "content_status": item[5], "process_times": item[6] } for item in task_list ] print("本次任务获取到 {} 条视频".format(len(task_obj_list))) # logging( # code="9001", # info="本次任务获取到 {} 条视频".format(len(task_obj_list)), # data=task_obj_list # ) return task_obj_list async def get_history_videos(self, content_id): """ check whether the contents videos exists :param content_id: :return: """ select_sql = f""" SELECT video_id FROM {db_video} where content_id = '{content_id}' and video_status = 1 order by request_time DESC; """ content_videos = await self.mysql_client.async_select(select_sql) videos = [vid for vid in content_videos] print(len(videos)) if len(videos) >= 3: return videos else: return None async def use_exists_contents_videos(self, video_id_list, params): """ 使用已经存在的视频id :return: """ trace_id = params['trace_id'] content_id = params['content_id'] select_sql = f""" SELECT kimi_title FROM {db_article} WHERE content_id = '{content_id}' and kimi_title is not null limit 1; """ info = await self.mysql_client.async_select(sql=select_sql) kimi_title = info[0] update_sql = f""" UPDATE {db_article} SET kimi_title=%s, recall_video_id1=%s, recall_video_id2=%s, recall_video_id3=%s, content_status=%s, process_times = %s WHERE trace_id = %s """ vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2] await self.mysql_client.async_insert( sql=update_sql, params=( kimi_title, video_id_list[0], "NULL" if vid2 is None else vid2, "NULL" if vid3 is None else vid3, 2, int(params['process_times']) + 1, trace_id ) ) logging( code="9002", info="已从历史文章更新,文章id: {}".format(content_id), trace_id=trace_id ) async def process_task(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] print(content_id) # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点 video_id_list = await self.get_history_videos(content_id=content_id) print(video_id_list) if video_id_list: # 说明已经存在了结果, 将该条记录下的video_id拿出来 print("存在历史文章") await self.use_exists_contents_videos(video_id_list=video_id_list, params=params) else: pass async def deal(self): """ 处理 :return: """ task_list = await self.get_task() print(len(task_list)) if task_list: tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )