""" @author: luojunhui """ from typing import Dict, List from aiomysql.cursors import DictCursor from applications.log import logging from applications.config import Config from applications.const import rematch_task_const class ReMatchTask(object): """ 重新匹配任务 """ def __init__(self, db_client): self.db_client = db_client self.config = Config() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table self.rematch_coroutines = int(self.config.get_config_value(key='rematchCoroutines')) async def get_tasks(self, business_type) -> List[Dict]: """ get task needs to rematch """ if business_type == 'process': select_sql = f""" SELECT trace_id, content_id FROM article_re_match_record WHERE status = {rematch_task_const.REMATCH_INIT_STATUS} LIMIT {self.rematch_coroutines}; """ elif business_type == 'check': select_sql = f""" SELECT trace_id, content_id FROM article_re_match_record WHERE status = {rematch_task_const.REMATCH_PROCESSING_STATUS} LIMIT {self.rematch_coroutines}; """ else: return [] result = await self.db_client.async_select(select_sql, cursor_type=DictCursor) task_dict = {task['content_id']: task for task in result} task_list = list(task_dict.values()) logging( code="rematch_1001", function="get_tasks", info="获取content_id数量: {}".format(len(task_list)) ) return task_list async def check_whether_get_enough_videos(self, content_id: str) -> bool: """ check 该content_id是否存在足够的视频 """ select_sql = f""" SELECT count(1) FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS} AND is_illegal = {rematch_task_const.VIDEO_SAFE}; """ count_tuple = await self.db_client.async_select(select_sql) count = count_tuple[0][0] if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM: return True else: return False async def update_content_id_status_to_success(self, content_id: str) -> int: """ 更新content_id的status为成功 """ update_sql = f""" UPDATE article_re_match_record SET status = %s WHERE content_id = '{content_id}'; """ affected_rows = await self.db_client.async_insert( update_sql, params=(rematch_task_const.REMATCH_SUCCESS_STATUS,) ) return affected_rows async def get_task_lock(self, trace_id: str) -> int: """ 将任务上锁,防止被其他进程抢占 """ update_sql = f""" UPDATE article_re_match_record SET status = %s WHERE trace_id = %s and status = %s; """ affected_rows = await self.db_client.async_insert( sql=update_sql, params=( rematch_task_const.REMATCH_PROCESSING_STATUS, trace_id, rematch_task_const.REMATCH_INIT_STATUS ) ) return affected_rows async def whether_same_content_id_processing(self, content_id: str) -> bool: """ 是否相同的content_id处理中 or 处理完成 """ select_sql = f""" SELECT DISTINCT status FROM article_re_match_record WHERE content_id = '{content_id}'; """ response = await self.db_client.async_select(select_sql) status_list = list(i[0] for i in response) for status in status_list: if status == rematch_task_const.REMATCH_PROCESSING_STATUS: return True return False async def process_content_id(self, content_id: str) -> int: """ 处理content_id """ update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, success_status = %s, process_times = %s WHERE content_id = %s; """ affected_rows = await self.db_client.async_insert( sql=update_sql, params=( rematch_task_const.TASK_INIT_STATUS, rematch_task_const.AIGC_DONT_GET_RESULT_STATUS, rematch_task_const.TASK_INIT_PROCESS_TIMES, content_id ) ) return affected_rows async def process_task(self, task_list: List[Dict]) -> None: """ 处理 """ for task in task_list: content_id = task['content_id'] trace_id = task['trace_id'] processing_flag = await self.whether_same_content_id_processing(content_id=content_id) if processing_flag: continue else: affected_row = await self.get_task_lock(trace_id) if not affected_row: continue affected_rows = await self.process_content_id(content_id) if affected_rows: logging( code="rematch_1002", function="deal", info="回滚content_id成功", data={ "content_id": content_id, "trace_id": trace_id, "affected_rows": affected_rows } ) else: logging( code="rematch_1003", function="deal", info="回滚content_id失败", data={ "content_id": content_id, "trace_id": trace_id } ) async def check_task(self, task_list: List[Dict]) -> None: """ 校验任务是否完成 """ for task in task_list: content_id = task['content_id'] enough_video_flag = await self.check_whether_get_enough_videos(content_id=content_id) if enough_video_flag: affected_rows = await self.update_content_id_status_to_success(content_id=content_id) if affected_rows: logging( code="rematch_1004", function="check_task", info="修改状态成功", data={ "content_id": content_id, "affected_rows": affected_rows } ) else: continue async def deal(self): """ do job here """ # 处理任务 task_list = await self.get_tasks(business_type="process") if task_list: await self.process_task(task_list) else: logging( code="rematch_5001", info="do not get article to process" ) # 校验任务 task_list = await self.get_tasks(business_type="check") if task_list: await self.check_task(task_list) else: logging( code="rematch_5002", info="do not get article to check" )