""" @author: luojunhui """ import asyncio from static.config import db_article, db_video, mysql_coroutines from applications.functions.log import logging from applications.functions.pqFunctions import * from applications.functions.apollo import Config class MatchTask3(object): """ 处理已经匹配过小程序的文章 """ def __init__(self, pq_client, denet_client, long_article_client): """ 初始化HistoryArticleMySQLClient """ self.config = Config(env="prod") self.pq_client = pq_client self.denet_client = denet_client self.long_article_client = long_article_client self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category")) async def getTaskList(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT ART.trace_id, ART.content_id, ART.gh_id, ART.article_title, ART.article_text, ART.content_status, ART.process_times FROM {db_article} ART JOIN ( select content_id, count(1) as cnt from {db_video} where oss_status = 1 group by content_id ) VID on ART.content_id = VID.content_id and VID.cnt >= 3 WHERE ART.content_status = 0 and ART.process_times <= 3 ORDER BY request_time_stamp LIMIT {mysql_coroutines}; """ tasks = await self.pq_client.async_select(sql=select_sql1) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "title": item[3], "text": item[4], "content_status": item[5], "process_times": item[6] } for item in tasks ] logging( code="9001", function="task3.get_task", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=[x['content_id'] for x in task_obj_list] ) return task_obj_list async def getHistoryVideoOssPath(self, content_id): """ check whether the contents videos exists :param content_id: :return: """ select_sql = f""" SELECT video_title, uid, video_path, cover_path FROM {db_video} where content_id = '{content_id}' and oss_status = 1 order by request_time DESC; """ content_videos = await self.pq_client.async_select(select_sql) video_list = [ { "title": line[0], "uid": line[1], "videoPath": line[2], "coverPath": line[3] } for line in content_videos ] if len(video_list) >= 3: return video_list else: return None async def useExistOssPath(self, video_info_list, params): """ 使用已经存在的视频id :return: """ trace_id = params['trace_id'] content_id = params['content_id'] select_sql = f""" SELECT kimi_title FROM {db_article} WHERE content_id = '{content_id}' and kimi_title is not null limit 1; """ info = await self.pq_client.async_select(sql=select_sql) kimi_title = info[0] video_id_list = await getNewVideoIds(video_info_list) vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2] update_sql = f""" UPDATE {db_article} SET kimi_title=%s, recall_video_id1=%s, recall_video_id2=%s, recall_video_id3=%s, content_status=%s, process_times = %s WHERE trace_id = %s """ await self.pq_client.async_insert( sql=update_sql, params=( kimi_title, vid1, vid2, vid3, 2, int(params['process_times']) + 1, trace_id ) ) logging( code="9002", info="已从历史文章更新,文章id: {}".format(content_id), trace_id=trace_id ) async def get_content_pool_level(self, content_id) -> str: """ 获取文章的内容池等级 :param content_id: """ select_sql = f""" SELECT produce_plan.plan_tag FROM produce_plan JOIN produce_plan_exe_record ON produce_plan.id = produce_plan_exe_record.plan_id WHERE produce_plan_exe_record.plan_exe_id = '{content_id}'; """ result = await self.denet_client.async_select(sql=select_sql) if result: return result[0][0] else: logging( code="5858", function="task3.get_content_pool_level", info="没有找到该文章的内容池等级", data={'content_id': content_id} ) return "ERROR" async def check_title_category(self, content_id, gh_id, trace_id) -> bool: """ 判断该文章的品类是否属于该账号的品类 :param trace_id: :param content_id: :param gh_id: :return: """ bad_category_list = self.account_negative_category.get(gh_id, []) logging( code="history1101", info="该账号的 negative 类型列表", trace_id=trace_id, data=bad_category_list ) if bad_category_list: sql = f""" SELECT category FROM article_category WHERE produce_content_id = '{content_id}'; """ result = await self.long_article_client.async_select(sql) if result: category = result[0][0] logging( code="history1102", info="文章的品类-{}".format(category), trace_id=trace_id ) if category in bad_category_list: return True return False async def processTask(self, params): """ 异步执行 :param params: :return: """ MISMATCH_STATUS = 95 TASK_INIT_STATUS = 0 content_id = params['content_id'] trace_id = params['trace_id'] gh_id = params['gh_id'] flow_pool_level = await self.get_content_pool_level(content_id) flow_pool_level_list = flow_pool_level.split("/") if "autoArticlePoolLevel4" in flow_pool_level_list: # 判断文章的品类是否属于该账号的 negative 类型 negative_category_status = await self.check_title_category( content_id=content_id, gh_id=gh_id, trace_id=trace_id ) if negative_category_status: # 修改状态为品类不匹配状态 logging( code="history1002", info="文章属于该账号的 negative 类型", trace_id=trace_id ) update_sql = f""" UPDATE {db_article} SET content_status = %s WHERE trace_id = %s and content_status = %s; """ affected_rows = await self.pq_client.async_insert( sql=update_sql, params=( MISMATCH_STATUS, trace_id, TASK_INIT_STATUS ) ) logging( code="history1003", info="已经修改该文章状态为 品类不匹配状态", trace_id=trace_id ) if affected_rows == 0: print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出") return # 处理完成之后,直接return return # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点 oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id) if oss_path_list: # 说明已经存在了结果, 将该条记录下的video_oss拿出来 logging( code="9001", info="存在历史文章", trace_id=trace_id ) try: await self.useExistOssPath(video_info_list=oss_path_list, params=params) except Exception as e: print(e) else: pass async def deal(self): """ 处理 :return: """ task_list = await self.getTaskList() if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )