Browse Source

修改newContentIdTask

luojunhui 4 months ago
parent
commit
849b6200ef
2 changed files with 159 additions and 246 deletions
  1. 6 0
      applications/const/task_const.py
  2. 153 246
      tasks/rm_match_task.py

+ 6 - 0
applications/const/task_const.py

@@ -25,6 +25,7 @@ class HistoryContentIdTaskConst:
     VIDEO_DOWNLOAD_FAIL_STATUS = 3
     VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
     # 任务最多处理次数
+    TASK_INIT_PROCESS_TIMES = 0
     TASK_MAX_PROCESS_TIMES = 3
 
     # 与AIGC交互,发送处理完成的trace_id至AIGC系统
@@ -86,3 +87,8 @@ class RematchTaskConst(NewContentIdTaskConst):
     REMATCH_INIT_STATUS = 0
     REMATCH_SUCCESS_STATUS = 1
     REMATCH_FAIL_STATUS = 2
+    REMATCH_PROCESSING_STATUS = -1
+
+    # AIGC 获取结果状态
+    AIGC_GET_RESULT_STATUS = 1
+    AIGC_DONT_GET_RESULT_STATUS = 0

+ 153 - 246
tasks/rm_match_task.py

@@ -1,7 +1,6 @@
 """
 @author: luojunhui
 """
-import traceback
 from typing import Dict, List
 
 from aiomysql.cursors import DictCursor
@@ -9,12 +8,6 @@ from aiomysql.cursors import DictCursor
 from applications.log import logging
 from applications.config import Config
 from applications.const import rematch_task_const
-from applications.etl_function import download_cover
-from applications.etl_function import download_video
-from applications.etl_function import generate_video_path
-from applications.etl_function import upload_to_oss
-from applications.spider import search_videos_from_web
-from .utils import get_kimi_result
 
 
 class ReMatchTask(object):
@@ -28,232 +21,37 @@ class ReMatchTask(object):
         self.article_match_video_table = self.config.article_match_video_table
         self.article_text_table = self.config.article_text_table
         self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.rematch_coroutines = int(self.config.get_config_value(key='rematchCoroutines'))
 
-    async def get_tasks(self) -> List[Dict]:
+    async def get_tasks(self, business_type) -> List[Dict]:
         """
         get task needs to rematch
         """
-        select_sql = f"""
-            SELECT DISTINCT content_id 
-            FROM article_re_match_record
-            WHERE status = {rematch_task_const.REMATCH_INIT_STATUS};
-        """
+        if business_type == 'process':
+            select_sql = f"""
+                SELECT trace_id, content_id
+                FROM article_re_match_record
+                WHERE status = {rematch_task_const.REMATCH_INIT_STATUS}
+                LIMIT {self.rematch_coroutines};
+            """
+        elif business_type == 'check':
+            select_sql = f"""
+                SELECT trace_id, content_id
+                FROM article_re_match_record
+                WHERE status = {rematch_task_const.REMATCH_PROCESSING_STATUS}
+                LIMIT {self.rematch_coroutines};
+            """
+        else:
+            return []
         result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
+        task_dict = {task['content_id']: task for task in result}
+        task_list = list(task_dict.values())
         logging(
             code="rematch_1001",
             function="get_tasks",
-            info="获取content_id数量: {}".format(len(result))
+            info="获取content_id数量: {}".format(len(task_list))
         )
-        return result
-
-    async def async_download_video_list(self, to_download_videos: List[Dict], illegal_platform_id_list: List[str]) -> int:
-        """
-        异步下载视频
-        """
-        success_count = 0
-        for video in to_download_videos:
-            # check whether video is illegal
-            out_key = "{}_{}".format(video['platform'], video['out_video_id'])
-            if out_key in illegal_platform_id_list:
-                continue
-            # start download
-            try:
-                local_video_path, local_cover_path = generate_video_path(video['platform'], video['out_video_id'])
-                # download videos
-                file_path = await download_video(
-                    file_path=local_video_path,
-                    platform=video['platform'],
-                    video_url=video['video_url']
-                )
-                if not file_path:
-                    # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
-                    update_sql = f"""
-                            UPDATE {self.article_crawler_video_table}
-                            SET download_status = %s
-                            WHERE id = %s;
-                    """
-
-                    await self.db_client.async_insert(
-                        sql=update_sql,
-                        params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
-                    )
-                else:
-                    # download cover
-                    cover_path = await download_cover(
-                        file_path=local_cover_path,
-                        platform=video['platform'],
-                        cover_url=video['cover_url']
-                    )
-                    # upload video to oss
-                    oss_video = await upload_to_oss(
-                        local_video_path=file_path,
-                        download_type="video"
-                    )
-                    # upload cover to oss
-                    if cover_path:
-                        oss_cover = await upload_to_oss(
-                            local_video_path=cover_path,
-                            download_type="image"
-                        )
-                    else:
-                        oss_cover = None
-
-                    # change status to success
-                    update_sql = f"""
-                        UPDATE {self.article_crawler_video_table}
-                        SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
-                        WHERE id = %s;
-                    """
-                    await self.db_client.async_insert(
-                        sql=update_sql,
-                        params=(
-                            oss_video,
-                            oss_cover,
-                            rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
-                            video['id']
-                        )
-                    )
-                    success_count += 1
-                # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
-                if success_count > rematch_task_const.MIN_MATCH_VIDEO_NUM:
-                    return success_count
-            except Exception as e:
-                update_sql = f"""
-                            UPDATE {self.article_crawler_video_table}
-                            SET download_status = %s
-                            WHERE id = %s;
-                            """
-                await self.db_client.async_insert(
-                    sql=update_sql,
-                    params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
-                )
-
-        return success_count
-
-    async def download_upload_task(self, content_id: str, illegal_platform_id_list: List[str], legal_not_downloaded_videos=None) -> int:
-        """
-        下载任务
-        """
-        success_count = 0
-        if legal_not_downloaded_videos:
-            # 下载legal_not_download_videos, 记录下载成功数量
-            new_download_count = await self.async_download_video_list(
-                to_download_videos=legal_not_downloaded_videos,
-                illegal_platform_id_list=illegal_platform_id_list
-            )
-            success_count += new_download_count
-            return success_count
-
-        select_sql = f"""
-            SELECT id, out_video_id, platform, video_title, video_url, cover_url
-            FROM {self.article_crawler_video_table} 
-            WHERE content_id = '{content_id}' AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS};
-        """
-        to_download_videos = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
-        new_download_count = await self.async_download_video_list(
-            to_download_videos=to_download_videos,
-            illegal_platform_id_list=illegal_platform_id_list
-        )
-        success_count += new_download_count
-        return success_count
-
-    async def spider_task(self, content_id: str) -> int:
-        """
-        爬虫抓取任务
-        """
-        kimi_result = await get_kimi_result(
-            content_id=content_id,
-            article_text_table=self.article_text_table,
-            db_client=self.db_client
-        )
-        # 该搜索任务是content_id粒度,因此将trace_id 传为content_id
-        search_videos_count = await search_videos_from_web(
-            info={
-                "ori_title": kimi_result['ori_title'],
-                "kimi_summary": kimi_result['kimi_summary'],
-                "kimi_keys": kimi_result['kimi_keys'],
-                "trace_id": content_id,
-                "gh_id": rematch_task_const.TASK_DEFAULT_GH_ID,
-                "content_id": content_id,
-                "crawler_video_table": self.article_crawler_video_table
-            },
-            gh_id_map={},
-            db_client=self.db_client
-        )
-        return search_videos_count
-
-    async def update_each_content_id(self, content_id: str) -> None:
-        """
-        更新每一个content_id的视频信息
-        """
-        select_sql = f"""
-            SELECT
-                id, platform, out_video_id, video_oss_path, download_status, is_illegal, video_url, cover_url
-            FROM
-                {self.article_crawler_video_table}
-            WHERE content_id = '{content_id}';
-        """
-        record_list = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
-        # 获取违规视频,违规视频out_id && platform
-        illegal_videos = [record for record in record_list if record["is_illegal"] == rematch_task_const.VIDEO_UNSAFE]
-        illegal_platform_id_list = [
-            "{}_{}".format(record['platform'], record['out_video_id']) for record in illegal_videos
-        ]
-
-        # 处理非违规字段
-        legal_downloaded_videos = []
-        legal_not_downloaded_videos = []
-        for record in record_list:
-            if record['is_illegal'] == rematch_task_const.VIDEO_UNSAFE:
-                continue
-            else:
-                out_key = "{}_{}".format(record['platform'], record['out_video_id'])
-                if out_key in illegal_platform_id_list:
-                    continue
-                if record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS:
-                    legal_not_downloaded_videos.append(record)
-                elif record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS:
-                    legal_downloaded_videos.append(record)
-                else:
-                    continue
-
-        if len(legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
-            logging(
-                code="rematch_1002",
-                info="存在{}条以上待合规已下载内容,无需处理该content_id".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
-                data={
-                    "content_id": content_id
-                }
-            )
-            return
-        elif len(legal_not_downloaded_videos + legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
-            logging(
-                code="rematch_1003",
-                info="存在{}条以上待合规内容".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
-                data={
-                    "content_id": content_id,
-                    "待下载数量": len(legal_not_downloaded_videos),
-                    "已下载数量": len(legal_downloaded_videos)
-                }
-            )
-            await self.download_upload_task(
-                content_id=content_id,
-                illegal_platform_id_list=illegal_platform_id_list,
-                legal_not_downloaded_videos=legal_not_downloaded_videos
-            )
-        else:
-            logging(
-                code="rematch_1004",
-                info="重新执行爬虫任务",
-                data={
-                    "content_id": content_id,
-                }
-            )
-            await self.spider_task(content_id=content_id)
-            await self.download_upload_task(
-                content_id=content_id,
-                illegal_platform_id_list=illegal_platform_id_list
-            )
+        return task_list
 
     async def check_whether_get_enough_videos(self, content_id: str) -> bool:
         """
@@ -273,7 +71,7 @@ class ReMatchTask(object):
         else:
             return False
 
-    async def update_content_id_status(self, content_id: str) -> int:
+    async def update_content_id_status_to_success(self, content_id: str) -> int:
         """
         更新content_id的status为成功
         """
@@ -288,31 +86,140 @@ class ReMatchTask(object):
         )
         return affected_rows
 
-    async def deal(self):
+    async def get_task_lock(self, trace_id: str) -> int:
         """
-        do job here
+        将任务上锁,防止被其他进程抢占
+        """
+        update_sql = f"""
+            UPDATE article_re_match_record
+            SET status = %s
+            WHERE trace_id = %s and status = %s;
+        """
+        affected_rows = await self.db_client.async_insert(
+            sql=update_sql,
+            params=(
+                rematch_task_const.REMATCH_PROCESSING_STATUS,
+                trace_id,
+                rematch_task_const.REMATCH_INIT_STATUS
+            )
+        )
+        return affected_rows
+
+    async def whether_same_content_id_processing(self, content_id: str) -> bool:
+        """
+        是否相同的content_id处理中 or 处理完成
+        """
+        select_sql = f"""
+            SELECT DISTINCT status
+            FROM article_re_match_record
+            WHERE content_id = '{content_id}';
+        """
+        response = await self.db_client.async_select(select_sql)
+        status_list = list(i[0] for i in response)
+        for status in status_list:
+            if status in [rematch_task_const.REMATCH_PROCESSING_STATUS, rematch_task_const.REMATCH_SUCCESS_STATUS]:
+                return True
+        return False
+
+    async def process_content_id(self, content_id: str) -> int:
+        """
+        处理content_id
+        """
+        update_sql = f"""
+            UPDATE {self.article_match_video_table}
+            SET content_status = %s, success_status = %s, process_times = %s
+            WHERE content_id = %s;
+        """
+        affected_rows = await self.db_client.async_insert(
+            sql=update_sql,
+            params=(
+                rematch_task_const.TASK_INIT_STATUS,
+                rematch_task_const.AIGC_DONT_GET_RESULT_STATUS,
+                rematch_task_const.TASK_INIT_PROCESS_TIMES,
+                content_id
+            )
+        )
+        return affected_rows
+
+    async def process_task(self, task_list: List[Dict]) -> None:
+        """
+        处理
         """
-        task_list = await self.get_tasks()
         for task in task_list:
             content_id = task['content_id']
-            try:
-                await self.update_each_content_id(content_id=content_id)
-
-                if await self.check_whether_get_enough_videos(content_id=content_id):
-                    # 修改状态为1
-                    await self.update_content_id_status(content_id=content_id)
-                else:
+            trace_id = task['trace_id']
+            processing_flag = await self.whether_same_content_id_processing(content_id=content_id)
+            if processing_flag:
+                continue
+            else:
+                affected_row = await self.get_task_lock(trace_id)
+                if not affected_row:
                     continue
-            except Exception as e:
-                error_stack = traceback.format_exc()
-                logging(
-                    code="rematch_1010",
-                    function="update_each_content_id",
-                    data={
-                        "error_stack": error_stack,
-                        "error": str(e),
-                        "content_id": content_id
-                    }
-                )
+                affected_rows = await self.process_content_id(content_id)
+                if affected_rows:
+                    logging(
+                        code="rematch_1002",
+                        function="deal",
+                        info="回滚content_id成功",
+                        data={
+                            "content_id": content_id,
+                            "trace_id": trace_id,
+                            "affected_rows": affected_rows
+                        }
+                    )
+                else:
+                    logging(
+                        code="rematch_1003",
+                        function="deal",
+                        info="回滚content_id失败",
+                        data={
+                            "content_id": content_id,
+                            "trace_id": trace_id
+                        }
+                    )
+
+    async def check_task(self, task_list: List[Dict]) -> None:
+        """
+        校验任务是否完成
+        """
+        for task in task_list:
+            content_id = task['content_id']
+            enough_video_flag = await self.check_whether_get_enough_videos(content_id=content_id)
+            if enough_video_flag:
+                affected_rows = await self.update_content_id_status_to_success(content_id=content_id)
+                if affected_rows:
+                    logging(
+                        code="rematch_1004",
+                        function="check_task",
+                        info="修改状态成功",
+                        data={
+                            "content_id": content_id,
+                            "affected_rows": affected_rows
+                        }
+                    )
+            else:
+                continue
 
+    async def deal(self):
+        """
+        do job here
+        """
+        # 处理任务
+        task_list = await self.get_tasks(business_type="process")
+        if task_list:
+            await self.process_task(task_list)
+        else:
+            logging(
+                code="rematch_5001",
+                info="do not get article to process"
+            )
 
+        # 校验任务
+        task_list = await self.get_tasks(business_type="check")
+        if task_list:
+            await self.check_task(task_list)
+        else:
+            logging(
+                code="rematch_5002",
+                info="do not get article to check"
+            )