| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 | 
							- """
 
- @author: luojunhui
 
- """
 
- from typing import Dict, List
 
- from aiomysql.cursors import DictCursor
 
- from applications.log import logging
 
- from applications.config import Config
 
- from applications.const import rematch_task_const
 
- class ReMatchTask(object):
 
-     """
 
-     重新匹配任务
 
-     """
 
-     def __init__(self, db_client):
 
-         self.db_client = db_client
 
-         self.config = Config()
 
-         self.article_match_video_table = self.config.article_match_video_table
 
-         self.article_text_table = self.config.article_text_table
 
-         self.article_crawler_video_table = self.config.article_crawler_video_table
 
-         self.rematch_coroutines = int(self.config.get_config_value(key='rematchCoroutines'))
 
-     async def get_tasks(self, business_type) -> List[Dict]:
 
-         """
 
-         get task needs to rematch
 
-         """
 
-         if business_type == 'process':
 
-             select_sql = f"""
 
-                 SELECT trace_id, content_id
 
-                 FROM article_re_match_record
 
-                 WHERE status = {rematch_task_const.REMATCH_INIT_STATUS}
 
-                 LIMIT {self.rematch_coroutines};
 
-             """
 
-         elif business_type == 'check':
 
-             select_sql = f"""
 
-                 SELECT trace_id, content_id
 
-                 FROM article_re_match_record
 
-                 WHERE status = {rematch_task_const.REMATCH_PROCESSING_STATUS}
 
-                 LIMIT {self.rematch_coroutines};
 
-             """
 
-         else:
 
-             return []
 
-         result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
 
-         task_dict = {task['content_id']: task for task in result}
 
-         task_list = list(task_dict.values())
 
-         logging(
 
-             code="rematch_1001",
 
-             function="get_tasks",
 
-             info="获取content_id数量: {}".format(len(task_list))
 
-         )
 
-         return task_list
 
-     async def check_whether_get_enough_videos(self, content_id: str) -> bool:
 
-         """
 
-         check 该content_id是否存在足够的视频
 
-         """
 
-         select_sql = f"""
 
-             SELECT count(1)
 
-             FROM {self.article_crawler_video_table}
 
-             WHERE content_id = '{content_id}' 
 
-                 AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS} 
 
-                 AND is_illegal = {rematch_task_const.VIDEO_SAFE};
 
-         """
 
-         count_tuple = await self.db_client.async_select(select_sql)
 
-         count = count_tuple[0][0]
 
-         if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
 
-             return True
 
-         else:
 
-             return False
 
-     async def update_content_id_status_to_success(self, content_id: str) -> int:
 
-         """
 
-         更新content_id的status为成功
 
-         """
 
-         update_sql = f"""
 
-             UPDATE article_re_match_record
 
-             SET status = %s
 
-             WHERE content_id = '{content_id}';
 
-         """
 
-         affected_rows = await self.db_client.async_insert(
 
-             update_sql,
 
-             params=(rematch_task_const.REMATCH_SUCCESS_STATUS,)
 
-         )
 
-         return affected_rows
 
-     async def get_task_lock(self, trace_id: str) -> int:
 
-         """
 
-         将任务上锁,防止被其他进程抢占
 
-         """
 
-         update_sql = f"""
 
-             UPDATE article_re_match_record
 
-             SET status = %s
 
-             WHERE trace_id = %s and status = %s;
 
-         """
 
-         affected_rows = await self.db_client.async_insert(
 
-             sql=update_sql,
 
-             params=(
 
-                 rematch_task_const.REMATCH_PROCESSING_STATUS,
 
-                 trace_id,
 
-                 rematch_task_const.REMATCH_INIT_STATUS
 
-             )
 
-         )
 
-         return affected_rows
 
-     async def whether_same_content_id_processing(self, content_id: str) -> bool:
 
-         """
 
-         是否相同的content_id处理中 or 处理完成
 
-         """
 
-         select_sql = f"""
 
-             SELECT DISTINCT status
 
-             FROM article_re_match_record
 
-             WHERE content_id = '{content_id}';
 
-         """
 
-         response = await self.db_client.async_select(select_sql)
 
-         status_list = list(i[0] for i in response)
 
-         for status in status_list:
 
-             if status == rematch_task_const.REMATCH_PROCESSING_STATUS:
 
-                 return True
 
-         return False
 
-     async def process_content_id(self, content_id: str) -> int:
 
-         """
 
-         处理content_id
 
-         """
 
-         update_sql = f"""
 
-             UPDATE {self.article_match_video_table}
 
-             SET content_status = %s, success_status = %s, process_times = %s
 
-             WHERE content_id = %s;
 
-         """
 
-         affected_rows = await self.db_client.async_insert(
 
-             sql=update_sql,
 
-             params=(
 
-                 rematch_task_const.TASK_INIT_STATUS,
 
-                 rematch_task_const.AIGC_DONT_GET_RESULT_STATUS,
 
-                 rematch_task_const.TASK_INIT_PROCESS_TIMES,
 
-                 content_id
 
-             )
 
-         )
 
-         return affected_rows
 
-     async def process_task(self, task_list: List[Dict]) -> None:
 
-         """
 
-         处理
 
-         """
 
-         for task in task_list:
 
-             content_id = task['content_id']
 
-             trace_id = task['trace_id']
 
-             processing_flag = await self.whether_same_content_id_processing(content_id=content_id)
 
-             if processing_flag:
 
-                 continue
 
-             else:
 
-                 affected_row = await self.get_task_lock(trace_id)
 
-                 if not affected_row:
 
-                     continue
 
-                 affected_rows = await self.process_content_id(content_id)
 
-                 if affected_rows:
 
-                     logging(
 
-                         code="rematch_1002",
 
-                         function="deal",
 
-                         info="回滚content_id成功",
 
-                         data={
 
-                             "content_id": content_id,
 
-                             "trace_id": trace_id,
 
-                             "affected_rows": affected_rows
 
-                         }
 
-                     )
 
-                 else:
 
-                     logging(
 
-                         code="rematch_1003",
 
-                         function="deal",
 
-                         info="回滚content_id失败",
 
-                         data={
 
-                             "content_id": content_id,
 
-                             "trace_id": trace_id
 
-                         }
 
-                     )
 
-     async def check_task(self, task_list: List[Dict]) -> None:
 
-         """
 
-         校验任务是否完成
 
-         """
 
-         for task in task_list:
 
-             content_id = task['content_id']
 
-             enough_video_flag = await self.check_whether_get_enough_videos(content_id=content_id)
 
-             if enough_video_flag:
 
-                 affected_rows = await self.update_content_id_status_to_success(content_id=content_id)
 
-                 if affected_rows:
 
-                     logging(
 
-                         code="rematch_1004",
 
-                         function="check_task",
 
-                         info="修改状态成功",
 
-                         data={
 
-                             "content_id": content_id,
 
-                             "affected_rows": affected_rows
 
-                         }
 
-                     )
 
-             else:
 
-                 continue
 
-     async def deal(self):
 
-         """
 
-         do job here
 
-         """
 
-         # 处理任务
 
-         task_list = await self.get_tasks(business_type="process")
 
-         if task_list:
 
-             await self.process_task(task_list)
 
-         else:
 
-             logging(
 
-                 code="rematch_5001",
 
-                 info="do not get article to process"
 
-             )
 
-         # 校验任务
 
-         task_list = await self.get_tasks(business_type="check")
 
-         if task_list:
 
-             await self.check_task(task_list)
 
-         else:
 
-             logging(
 
-                 code="rematch_5002",
 
-                 info="do not get article to check"
 
-             )
 
 
  |