Prechádzať zdrojové kódy

Merge branch '2024-12-23-rematch-task' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 10 mesiacov pred
rodič
commit
f90de9f88b

+ 22 - 0
RematchTask.py

@@ -0,0 +1,22 @@
+"""
+@author: luojunhui
+"""
+import time
+import datetime
+import asyncio
+from applications.db import AsyncMySQLClient
+from tasks.rm_match_task import ReMatchTask
+
+
+async def main_job():
+    """
+    main job
+    :return:
+    """
+    async with AsyncMySQLClient() as long_articles_pool:
+        new_content_id_task = ReMatchTask(long_articles_pool)
+        await new_content_id_task.deal()
+
+
+if __name__ == '__main__':
+    asyncio.run(main_job())

+ 5 - 2
applications/const/__init__.py

@@ -2,9 +2,12 @@
 @author: luojunhui
 """
 from .server_const import ServerConst
-from .task_const import HistoryContentIdTaskConst, NewContentIdTaskConst
+from .task_const import HistoryContentIdTaskConst
+from .task_const import NewContentIdTaskConst
+from .task_const import RematchTaskConst
 
 
 server_const = ServerConst()
 new_content_id_task_const = NewContentIdTaskConst()
-history_content_id_task_const = HistoryContentIdTaskConst()
+history_content_id_task_const = HistoryContentIdTaskConst()
+rematch_task_const = RematchTaskConst()

+ 21 - 3
applications/const/task_const.py

@@ -19,9 +19,13 @@ class HistoryContentIdTaskConst:
     EXIT_STATUS = 97
     # 文章品类不匹配
     MISMATCH_STATUS = 96
-    # 视频已经下载成功状态
+
+    # 视频下载状态
+    VIDEO_DOWNLOAD_INIT_STATUS = 0
+    VIDEO_DOWNLOAD_FAIL_STATUS = 3
     VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
     # 任务最多处理次数
+    TASK_INIT_PROCESS_TIMES = 0
     TASK_MAX_PROCESS_TIMES = 3
 
     # 与AIGC交互,发送处理完成的trace_id至AIGC系统
@@ -72,5 +76,19 @@ class NewContentIdTaskConst(HistoryContentIdTaskConst):
     KIMI_SUCCESS_STATUS = 1
     KIMI_FAIL_STATUS = 2
 
-    # 视频下载失败状态
-    VIDEO_DOWNLOAD_FAIL_STATUS = 3
+
+class RematchTaskConst(NewContentIdTaskConst):
+    """
+    Rematch Task const
+    """
+    TASK_DEFAULT_GH_ID = 'DEFAULT_ID'
+
+    # 待重新匹配状态
+    REMATCH_INIT_STATUS = 0
+    REMATCH_SUCCESS_STATUS = 1
+    REMATCH_FAIL_STATUS = 2
+    REMATCH_PROCESSING_STATUS = -1
+
+    # AIGC 获取结果状态
+    AIGC_GET_RESULT_STATUS = 1
+    AIGC_DONT_GET_RESULT_STATUS = 0

+ 5 - 2
applications/db/__init__.py

@@ -3,6 +3,8 @@
 """
 import aiomysql
 
+from aiomysql.cursors import Cursor
+
 from applications.config import denet_config, long_articles_config
 
 
@@ -47,14 +49,15 @@ class AsyncMySQLClient(object):
         self.mysql_pool.close()
         await self.mysql_pool.wait_closed()
 
-    async def async_select(self, sql):
+    async def async_select(self, sql, cursor_type=Cursor):
         """
         select method
         :param sql:
+        :param cursor_type:
         :return:
         """
         async with self.mysql_pool.acquire() as conn:
-            async with conn.cursor() as cursor:
+            async with conn.cursor(cursor_type) as cursor:
                 await cursor.execute(sql)
                 result = await cursor.fetchall()
                 return result

+ 21 - 2
tasks/new_contentId_task.py

@@ -48,13 +48,14 @@ class NewContentIdTask(object):
                 SELECT content_id, count(1) as cnt
                 FROM {self.article_crawler_video_table}
                 WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                    AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
                 GROUP BY content_id
             ) t2
             ON t1.content_id = t2.content_id
             WHERE
                     t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
                 AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
-                AND t2.cnt IS NULL
+                AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
             ORDER BY flow_pool_level, request_timestamp
             LIMIT {self.spider_coroutines};
         """
@@ -253,6 +254,22 @@ class NewContentIdTask(object):
         else:
             return
 
+    async def get_illegal_out_ids(self, content_id: str) -> List[str]:
+        """
+        获取违规的外站视频id
+        """
+        select_sql = f"""
+            SELECT platform, out_video_id
+            FROM {self.article_crawler_video_table}
+            WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
+        """
+        response = await self.long_articles_client.async_select(select_sql)
+        if response:
+            result = ["{}_{}".format(line[0], line[1]) for line in response]
+            return result
+        else:
+            return []
+
     async def kimi_task(self, params):
         """
         执行 kimi 任务
@@ -515,11 +532,13 @@ class NewContentIdTask(object):
                 return False
 
             # download videos
+            illegal_videos = await self.get_illegal_out_ids(content_id)
             downloaded_count = await async_download_videos(
                 trace_id=trace_id,
                 content_id=content_id,
                 article_crawler_video_table=self.article_crawler_video_table,
-                db_client=self.long_articles_client
+                db_client=self.long_articles_client,
+                illegal_videos=illegal_videos
             )
 
             if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:

+ 225 - 0
tasks/rm_match_task.py

@@ -0,0 +1,225 @@
+"""
+@author: luojunhui
+"""
+from typing import Dict, List
+
+from aiomysql.cursors import DictCursor
+
+from applications.log import logging
+from applications.config import Config
+from applications.const import rematch_task_const
+
+
+class ReMatchTask(object):
+    """
+    重新匹配任务
+    """
+
+    def __init__(self, db_client):
+        self.db_client = db_client
+        self.config = Config()
+        self.article_match_video_table = self.config.article_match_video_table
+        self.article_text_table = self.config.article_text_table
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.rematch_coroutines = int(self.config.get_config_value(key='rematchCoroutines'))
+
+    async def get_tasks(self, business_type) -> List[Dict]:
+        """
+        get task needs to rematch
+        """
+        if business_type == 'process':
+            select_sql = f"""
+                SELECT trace_id, content_id
+                FROM article_re_match_record
+                WHERE status = {rematch_task_const.REMATCH_INIT_STATUS}
+                LIMIT {self.rematch_coroutines};
+            """
+        elif business_type == 'check':
+            select_sql = f"""
+                SELECT trace_id, content_id
+                FROM article_re_match_record
+                WHERE status = {rematch_task_const.REMATCH_PROCESSING_STATUS}
+                LIMIT {self.rematch_coroutines};
+            """
+        else:
+            return []
+        result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
+        task_dict = {task['content_id']: task for task in result}
+        task_list = list(task_dict.values())
+        logging(
+            code="rematch_1001",
+            function="get_tasks",
+            info="获取content_id数量: {}".format(len(task_list))
+        )
+        return task_list
+
+    async def check_whether_get_enough_videos(self, content_id: str) -> bool:
+        """
+        check 该content_id是否存在足够的视频
+        """
+        select_sql = f"""
+            SELECT count(1)
+            FROM {self.article_crawler_video_table}
+            WHERE content_id = '{content_id}' 
+                AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS} 
+                AND is_illegal = {rematch_task_const.VIDEO_SAFE};
+        """
+        count_tuple = await self.db_client.async_select(select_sql)
+        count = count_tuple[0][0]
+        if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
+            return True
+        else:
+            return False
+
+    async def update_content_id_status_to_success(self, content_id: str) -> int:
+        """
+        更新content_id的status为成功
+        """
+        update_sql = f"""
+            UPDATE article_re_match_record
+            SET status = %s
+            WHERE content_id = '{content_id}';
+        """
+        affected_rows = await self.db_client.async_insert(
+            update_sql,
+            params=(rematch_task_const.REMATCH_SUCCESS_STATUS,)
+        )
+        return affected_rows
+
+    async def get_task_lock(self, trace_id: str) -> int:
+        """
+        将任务上锁,防止被其他进程抢占
+        """
+        update_sql = f"""
+            UPDATE article_re_match_record
+            SET status = %s
+            WHERE trace_id = %s and status = %s;
+        """
+        affected_rows = await self.db_client.async_insert(
+            sql=update_sql,
+            params=(
+                rematch_task_const.REMATCH_PROCESSING_STATUS,
+                trace_id,
+                rematch_task_const.REMATCH_INIT_STATUS
+            )
+        )
+        return affected_rows
+
+    async def whether_same_content_id_processing(self, content_id: str) -> bool:
+        """
+        是否相同的content_id处理中 or 处理完成
+        """
+        select_sql = f"""
+            SELECT DISTINCT status
+            FROM article_re_match_record
+            WHERE content_id = '{content_id}';
+        """
+        response = await self.db_client.async_select(select_sql)
+        status_list = list(i[0] for i in response)
+        for status in status_list:
+            if status in [rematch_task_const.REMATCH_PROCESSING_STATUS, rematch_task_const.REMATCH_SUCCESS_STATUS]:
+                return True
+        return False
+
+    async def process_content_id(self, content_id: str) -> int:
+        """
+        处理content_id
+        """
+        update_sql = f"""
+            UPDATE {self.article_match_video_table}
+            SET content_status = %s, success_status = %s, process_times = %s
+            WHERE content_id = %s;
+        """
+        affected_rows = await self.db_client.async_insert(
+            sql=update_sql,
+            params=(
+                rematch_task_const.TASK_INIT_STATUS,
+                rematch_task_const.AIGC_DONT_GET_RESULT_STATUS,
+                rematch_task_const.TASK_INIT_PROCESS_TIMES,
+                content_id
+            )
+        )
+        return affected_rows
+
+    async def process_task(self, task_list: List[Dict]) -> None:
+        """
+        处理
+        """
+        for task in task_list:
+            content_id = task['content_id']
+            trace_id = task['trace_id']
+            processing_flag = await self.whether_same_content_id_processing(content_id=content_id)
+            if processing_flag:
+                continue
+            else:
+                affected_row = await self.get_task_lock(trace_id)
+                if not affected_row:
+                    continue
+                affected_rows = await self.process_content_id(content_id)
+                if affected_rows:
+                    logging(
+                        code="rematch_1002",
+                        function="deal",
+                        info="回滚content_id成功",
+                        data={
+                            "content_id": content_id,
+                            "trace_id": trace_id,
+                            "affected_rows": affected_rows
+                        }
+                    )
+                else:
+                    logging(
+                        code="rematch_1003",
+                        function="deal",
+                        info="回滚content_id失败",
+                        data={
+                            "content_id": content_id,
+                            "trace_id": trace_id
+                        }
+                    )
+
+    async def check_task(self, task_list: List[Dict]) -> None:
+        """
+        校验任务是否完成
+        """
+        for task in task_list:
+            content_id = task['content_id']
+            enough_video_flag = await self.check_whether_get_enough_videos(content_id=content_id)
+            if enough_video_flag:
+                affected_rows = await self.update_content_id_status_to_success(content_id=content_id)
+                if affected_rows:
+                    logging(
+                        code="rematch_1004",
+                        function="check_task",
+                        info="修改状态成功",
+                        data={
+                            "content_id": content_id,
+                            "affected_rows": affected_rows
+                        }
+                    )
+            else:
+                continue
+
+    async def deal(self):
+        """
+        do job here
+        """
+        # 处理任务
+        task_list = await self.get_tasks(business_type="process")
+        if task_list:
+            await self.process_task(task_list)
+        else:
+            logging(
+                code="rematch_5001",
+                info="do not get article to process"
+            )
+
+        # 校验任务
+        task_list = await self.get_tasks(business_type="check")
+        if task_list:
+            await self.check_task(task_list)
+        else:
+            logging(
+                code="rematch_5002",
+                info="do not get article to check"
+            )

+ 4 - 1
tasks/utils/etl_task.py

@@ -6,7 +6,7 @@ from applications.const import new_content_id_task_const
 from applications.log import logging
 
 
-async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client):
+async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client, illegal_videos):
     """
     下载视频
     """
@@ -31,6 +31,9 @@ async def async_download_videos(trace_id, content_id, article_crawler_video_tabl
             "user_id": line[6],
             "trace_id": line[7]
         }
+        out_key = "{}_{}".format(params['platform'], params['video_id'])
+        if out_key in illegal_videos:
+            continue
         try:
             local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
             # download videos