""" @author: luojunhui """ import asyncio from static.config import db_article, db_video, mysql_coroutines from applications.functions.log import logging from applications.functions.pqFunctions import * class MatchTask3(object): """ 处理已经匹配过小程序的文章 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client async def getTaskList(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT ART.trace_id, ART.content_id, ART.gh_id, ART.article_title, ART.article_text, ART.content_status, ART.process_times FROM {db_article} ART JOIN ( select content_id, count(1) as cnt from {db_video} where oss_status = 1 group by content_id ) VID on ART.content_id = VID.content_id and VID.cnt >= 3 WHERE ART.content_status = 0 and ART.process_times <= 3 ORDER BY request_time_stamp LIMIT {mysql_coroutines}; """ tasks = await self.mysql_client.async_select(sql=select_sql1) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "title": item[3], "text": item[4], "content_status": item[5], "process_times": item[6] } for item in tasks ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list async def getHistoryVideoOssPath(self, content_id): """ check whether the contents videos exists :param content_id: :return: """ select_sql = f""" SELECT video_title, uid, video_path, cover_path FROM {db_video} where content_id = '{content_id}' and oss_status = 1 order by request_time DESC; """ content_videos = await self.mysql_client.async_select(select_sql) video_list = [ { "title": line[0], "uid": line[1], "videoPath": line[2], "coverPath": line[3] } for line in content_videos ] if len(video_list) >= 3: return video_list else: return None async def useExistOssPath(self, video_info_list, params): """ 使用已经存在的视频id :return: """ trace_id = params['trace_id'] content_id = params['content_id'] select_sql = f""" SELECT kimi_title FROM {db_article} WHERE content_id = '{content_id}' and kimi_title is not null limit 1; """ info = await self.mysql_client.async_select(sql=select_sql) kimi_title = info[0] video_id_list = await getNewVideoIds(video_info_list) vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2] update_sql = f""" UPDATE {db_article} SET kimi_title=%s, recall_video_id1=%s, recall_video_id2=%s, recall_video_id3=%s, content_status=%s, process_times = %s WHERE trace_id = %s """ await self.mysql_client.async_insert( sql=update_sql, params=( kimi_title, vid1, vid2, vid3, 2, int(params['process_times']) + 1, trace_id ) ) logging( code="9002", info="已从历史文章更新,文章id: {}".format(content_id), trace_id=trace_id ) async def processTask(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点 oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id) if oss_path_list: # 说明已经存在了结果, 将该条记录下的video_oss拿出来 logging( code="9001", info="存在历史文章", trace_id=trace_id ) try: await self.useExistOssPath(video_info_list=oss_path_list, params=params) except Exception as e: print(e) else: pass async def deal(self): """ 处理 :return: """ task_list = await self.getTaskList() if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )