ソースを参照

新增-rematch task

luojunhui 4 ヶ月 前
コミット
28c226ea2d

+ 22 - 0
RematchTask.py

@@ -0,0 +1,22 @@
+"""
+@author: luojunhui
+"""
+import time
+import datetime
+import asyncio
+from applications.db import AsyncMySQLClient
+from tasks.rm_match_task import ReMatchTask
+
+
+async def main_job():
+    """
+    main job
+    :return:
+    """
+    async with AsyncMySQLClient() as long_articles_pool:
+        new_content_id_task = ReMatchTask(long_articles_pool)
+        await new_content_id_task.deal()
+
+
+if __name__ == '__main__':
+    asyncio.run(main_job())

+ 5 - 2
applications/const/__init__.py

@@ -2,9 +2,12 @@
 @author: luojunhui
 """
 from .server_const import ServerConst
-from .task_const import HistoryContentIdTaskConst, NewContentIdTaskConst
+from .task_const import HistoryContentIdTaskConst
+from .task_const import NewContentIdTaskConst
+from .task_const import RematchTaskConst
 
 
 server_const = ServerConst()
 new_content_id_task_const = NewContentIdTaskConst()
-history_content_id_task_const = HistoryContentIdTaskConst()
+history_content_id_task_const = HistoryContentIdTaskConst()
+rematch_task_const = RematchTaskConst()

+ 13 - 0
applications/const/task_const.py

@@ -73,4 +73,17 @@ class NewContentIdTaskConst(HistoryContentIdTaskConst):
     KIMI_FAIL_STATUS = 2
 
     # 视频下载失败状态
+    VIDEO_DOWNLOAD_INIT_STATUS = 0
     VIDEO_DOWNLOAD_FAIL_STATUS = 3
+
+
+class RematchTaskConst(NewContentIdTaskConst):
+    """
+    Rematch Task const
+    """
+    TASK_DEFAULT_GH_ID = 'DEFAULT_ID'
+
+    # 待重新匹配状态
+    REMATCH_INIT_STATUS = 0
+    REMATCH_SUCCESS_STATUS = 1
+    REMATCH_FAIL_STATUS = 2

+ 6 - 3
applications/db/__init__.py

@@ -3,6 +3,8 @@
 """
 import aiomysql
 
+from aiomysql.cursors import Cursor
+
 from applications.config import denet_config, long_articles_config
 
 
@@ -47,17 +49,18 @@ class AsyncMySQLClient(object):
         self.mysql_pool.close()
         await self.mysql_pool.wait_closed()
 
-    async def async_select(self, sql):
+    async def async_select(self, sql, cursor_type=Cursor):
         """
         select method
         :param sql:
+        :param cursor_type:
         :return:
         """
         async with self.mysql_pool.acquire() as conn:
-            async with conn.cursor() as cursor:
+            async with conn.cursor(cursor_type) as cursor:
                 await cursor.execute(sql)
                 result = await cursor.fetchall()
-                return result
+            return result
 
     async def async_insert(self, sql, params):
         """

+ 318 - 0
tasks/rm_match_task.py

@@ -0,0 +1,318 @@
+"""
+@author: luojunhui
+"""
+import traceback
+from typing import Dict, List
+
+from aiomysql.cursors import DictCursor
+
+from applications.log import logging
+from applications.config import Config
+from applications.const import rematch_task_const
+from applications.etl_function import download_cover
+from applications.etl_function import download_video
+from applications.etl_function import generate_video_path
+from applications.etl_function import upload_to_oss
+from applications.spider import search_videos_from_web
+from .utils import get_kimi_result
+
+
+class ReMatchTask(object):
+    """
+    重新匹配任务
+    """
+
+    def __init__(self, db_client):
+        self.db_client = db_client
+        self.config = Config()
+        self.article_match_video_table = self.config.article_match_video_table
+        self.article_text_table = self.config.article_text_table
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+
+    async def get_tasks(self) -> List[Dict]:
+        """
+        get task needs to rematch
+        """
+        select_sql = f"""
+            SELECT DISTINCT content_id 
+            FROM article_re_match_record
+            WHERE status = {rematch_task_const.REMATCH_INIT_STATUS};
+        """
+        result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
+        logging(
+            code="rematch_1001",
+            function="get_tasks",
+            info="获取content_id数量: {}".format(len(result))
+        )
+        return result
+
+    async def async_download_video_list(self, to_download_videos: List[Dict], illegal_platform_id_list: List[str]) -> int:
+        """
+        异步下载视频
+        """
+        success_count = 0
+        for video in to_download_videos:
+            # check whether video is illegal
+            out_key = "{}_{}".format(video['platform'], video['out_video_id'])
+            if out_key in illegal_platform_id_list:
+                continue
+            # start download
+            try:
+                local_video_path, local_cover_path = generate_video_path(video['platform'], video['out_video_id'])
+                # download videos
+                file_path = await download_video(
+                    file_path=local_video_path,
+                    platform=video['platform'],
+                    video_url=video['video_url']
+                )
+                if not file_path:
+                    # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
+                    update_sql = f"""
+                            UPDATE {self.article_crawler_video_table}
+                            SET download_status = %s
+                            WHERE id = %s;
+                    """
+
+                    await self.db_client.async_insert(
+                        sql=update_sql,
+                        params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
+                    )
+                else:
+                    # download cover
+                    cover_path = await download_cover(
+                        file_path=local_cover_path,
+                        platform=video['platform'],
+                        cover_url=video['cover_url']
+                    )
+                    # upload video to oss
+                    oss_video = await upload_to_oss(
+                        local_video_path=file_path,
+                        download_type="video"
+                    )
+                    # upload cover to oss
+                    if cover_path:
+                        oss_cover = await upload_to_oss(
+                            local_video_path=cover_path,
+                            download_type="image"
+                        )
+                    else:
+                        oss_cover = None
+
+                    # change status to success
+                    update_sql = f"""
+                        UPDATE {self.article_crawler_video_table}
+                        SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                        WHERE id = %s;
+                    """
+                    await self.db_client.async_insert(
+                        sql=update_sql,
+                        params=(
+                            oss_video,
+                            oss_cover,
+                            rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
+                            video['id']
+                        )
+                    )
+                    success_count += 1
+                # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
+                if success_count > rematch_task_const.MIN_MATCH_VIDEO_NUM:
+                    return success_count
+            except Exception as e:
+                update_sql = f"""
+                            UPDATE {self.article_crawler_video_table}
+                            SET download_status = %s
+                            WHERE id = %s;
+                            """
+                await self.db_client.async_insert(
+                    sql=update_sql,
+                    params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
+                )
+
+        return success_count
+
+    async def download_upload_task(self, content_id: str, illegal_platform_id_list: List[str], legal_not_downloaded_videos=None) -> int:
+        """
+        下载任务
+        """
+        success_count = 0
+        if legal_not_downloaded_videos:
+            # 下载legal_not_download_videos, 记录下载成功数量
+            new_download_count = await self.async_download_video_list(
+                to_download_videos=legal_not_downloaded_videos,
+                illegal_platform_id_list=illegal_platform_id_list
+            )
+            success_count += new_download_count
+            return success_count
+
+        select_sql = f"""
+            SELECT id, out_video_id, platform, video_title, video_url, cover_url
+            FROM {self.article_crawler_video_table} 
+            WHERE content_id = {content_id} AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS};
+        """
+        to_download_videos = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
+        new_download_count = await self.async_download_video_list(
+            to_download_videos=to_download_videos,
+            illegal_platform_id_list=illegal_platform_id_list
+        )
+        success_count += new_download_count
+        return success_count
+
+    async def spider_task(self, content_id: str) -> int:
+        """
+        爬虫抓取任务
+        """
+        kimi_result = await get_kimi_result(
+            content_id=content_id,
+            article_text_table=self.article_text_table,
+            db_client=self.db_client
+        )
+        # 该搜索任务是content_id粒度,因此将trace_id 传为content_id
+        search_videos_count = await search_videos_from_web(
+            info={
+                "ori_title": kimi_result['ori_title'],
+                "kimi_summary": kimi_result['kimi_summary'],
+                "kimi_keys": kimi_result['kimi_keys'],
+                "trace_id": content_id,
+                "gh_id": rematch_task_const.TASK_DEFAULT_GH_ID,
+                "content_id": content_id,
+                "crawler_video_table": self.article_crawler_video_table
+            },
+            gh_id_map={},
+            db_client=self.db_client
+        )
+        return search_videos_count
+
+    async def update_each_content_id(self, content_id: str) -> None:
+        """
+        更新每一个content_id的视频信息
+        """
+        select_sql = f"""
+            SELECT
+                id, platform, out_video_id, video_oss_path, download_status, is_illegal, video_url, cover_url
+            FROM
+                {self.article_crawler_video_table}
+            WHERE content_id = '{content_id}';
+        """
+        record_list = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
+        # 获取违规视频,违规视频out_id && platform
+        illegal_videos = [record for record in record_list if record["is_illegal"] == rematch_task_const.VIDEO_UNSAFE]
+        illegal_platform_id_list = [
+            "{}_{}".format(record['platform'], record['out_video_id']) for record in illegal_videos
+        ]
+
+        # 处理非违规字段
+        legal_downloaded_videos = []
+        legal_not_downloaded_videos = []
+        for record in record_list:
+            if record['is_illegal'] == rematch_task_const.VIDEO_UNSAFE:
+                continue
+            else:
+                out_key = "{}_{}".format(record['platform'], record['out_video_id'])
+                if out_key in illegal_platform_id_list:
+                    continue
+                if record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS:
+                    legal_not_downloaded_videos.append(record)
+                elif record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS:
+                    legal_downloaded_videos.append(record)
+                else:
+                    continue
+
+        if len(legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
+            logging(
+                code="rematch_1002",
+                info="存在{}条以上待合规已下载内容,无需处理该content_id".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
+                data={
+                    "content_id": content_id
+                }
+            )
+            return
+        elif len(legal_not_downloaded_videos + legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
+            logging(
+                code="rematch_1003",
+                info="存在{}条以上待合规内容".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
+                data={
+                    "content_id": content_id,
+                    "待下载数量": len(legal_not_downloaded_videos),
+                    "已下载数量": len(legal_downloaded_videos)
+                }
+            )
+            await self.download_upload_task(
+                content_id=content_id,
+                illegal_platform_id_list=illegal_platform_id_list,
+                legal_not_downloaded_videos=legal_not_downloaded_videos
+            )
+        else:
+            logging(
+                code="rematch_1004",
+                info="重新执行爬虫任务",
+                data={
+                    "content_id": content_id,
+                }
+            )
+            await self.spider_task(content_id=content_id)
+            await self.download_upload_task(
+                content_id=content_id,
+                illegal_platform_id_list=illegal_platform_id_list
+            )
+
+    async def check_whether_get_enough_videos(self, content_id: str) -> bool:
+        """
+        check 该content_id是否存在足够的视频
+        """
+        select_sql = f"""
+            SELECT count(1)
+            FROM {self.article_crawler_video_table}
+            WHERE content_id = '{content_id}' 
+                AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS} 
+                AND is_illegal = {rematch_task_const.VIDEO_SAFE};
+        """
+        count_tuple = await self.db_client.async_select(select_sql)
+        count = count_tuple[0][0]
+        if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
+            return True
+        else:
+            return False
+
+    async def update_content_id_status(self, content_id: str) -> int:
+        """
+        更新content_id的status为成功
+        """
+        update_sql = f"""
+            UPDATE article_re_match_record
+            SET status = %s
+            WHERE content_id = '{content_id}';
+        """
+        affected_rows = await self.db_client.async_insert(
+            update_sql,
+            params=(rematch_task_const.REMATCH_SUCCESS_STATUS,)
+        )
+        return affected_rows
+
+    async def deal(self):
+        """
+        do job here
+        """
+        task_list = await self.get_tasks()
+        for task in task_list:
+            content_id = task['content_id']
+            try:
+                await self.update_each_content_id(content_id=content_id)
+
+                if await self.check_whether_get_enough_videos(content_id=content_id):
+                    # 修改状态为1
+                    await self.update_content_id_status(content_id=content_id)
+                else:
+                    continue
+            except Exception as e:
+                error_stack = traceback.format_exc()
+                logging(
+                    code="rematch_1010",
+                    function="update_each_content_id",
+                    data={
+                        "error_stack": error_stack,
+                        "error": str(e),
+                        "content_id": content_id
+                    }
+                )
+
+