ソースを参照

限流、违规报警优化

luojunhui 1 週間 前
コミット
80fc945e88

+ 1 - 0
app/domains/llm_tasks/decode_video/__init__.py

@@ -0,0 +1 @@
+from .decode_video_produce import DecodeVideoProduce

+ 9 - 0
app/domains/llm_tasks/decode_video/_const.py

@@ -0,0 +1,9 @@
+class VideoDecodeConst:
+    class TaskStatus:
+        INIT = 0
+        PROCESSING = 1
+        SUCCESS = 2
+        FAILED = 99
+
+    class SceneType:
+        DAILY_ARTICLE: str = "1058"

+ 70 - 0
app/domains/llm_tasks/decode_video/_mapper.py

@@ -0,0 +1,70 @@
+from app.core.database import DatabaseManager
+
+
+class VideoDecodeMapper:
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def fetch_video_source_content(self, root_source_id: str):
+        query = """
+            SELECT content_id, gh_id, video_id, trace_id
+            FROM long_articles_root_source_id
+            WHERE root_source_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(root_source_id,))
+
+    async def fetch_video_match_result_v1(self, gh_id: str, content_id: str):
+        query = """
+            SELECT response FROM long_articles_match_videos WHERE gh_id = %s AND content_id = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(
+                gh_id,
+                content_id,
+            ),
+        )
+
+    async def fetch_video_match_result_v2(self, trace_id: str):
+        query = """
+            SELECT response FROM long_articles_match_videos WHERE trace_id = %s
+        """
+        return await self.pool.async_fetch(query=query, params=(trace_id,))
+
+    async def save_video_to_decode_data(self, data: tuple):
+        """
+        存储数据到 video_decode_data
+        """
+        query = """
+            INSERT IGNORE INTO video_decode_data
+                (video_id, channel, hot_scene_type, video_path, title, root_source_id, dt)
+            VALUES
+                (%s, %s, %s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=data,
+        )
+
+    async def update_video_decode_data(self, ori_status, new_status, video_path):
+        pass
+
+    async def insert_into_decode_task_queue(self, data: tuple):
+        query = """
+            INSERT IGNORE INTO video_decode_queue
+                (video_path, sample_video_id, dt)
+            VALUES
+                (%s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=data,
+        )
+
+    async def fetch_decode_task_queue(self, video_path):
+        query = """
+            SELECT video_path, sample_video_id, dt
+            FROM video_decode_queue
+            WHERE video_path = %s;
+            """
+        return await self.pool.async_fetch(query=query, params=(video_path,))

+ 92 - 0
app/domains/llm_tasks/decode_video/_utils.py

@@ -0,0 +1,92 @@
+import json
+from typing import List, Dict
+from datetime import datetime, timedelta
+
+from odps.types import Record
+
+from app.infra.shared.tools import fetch_from_odps
+from app.infra.internal.piaoquan import fetch_piaoquan_video_list_detail
+
+
+class VideoDecodeUtils:
+    # 获取检测的账号 list
+    @staticmethod
+    def get_top_head_videos(duration="day"):
+        uv_threshold = 500
+        match duration:
+            case "day":
+                days = 1
+            case "history":
+                days = 7
+            case _:
+                return []
+        # dt = (datetime.today() - timedelta(days=days)).strftime("%Y%m%d")
+        dt = "20260420"
+        query = f"""
+            WITH top_video AS 
+        (
+            SELECT  dt, channel, 合作方名,包名,公众号名,hotsencetype, videoid, rootsourceid,title,`merge二级品类`
+                    ,COUNT(DISTINCT mid) AS 访问uv
+            FROM    loghubods.opengid_base_data
+            WHERE   dt >= '{dt}'
+            AND     dt <= '20260427'
+            AND     hotsencetype != 1167
+            AND     videoid IS NOT NULL
+            AND     usersharedepth = 0
+            AND     channel IN ('公众号合作-即转-稳定','小程序投流-稳定','服务号合作-Daily-自选','群/企微合作-稳定','公众号买号','服务号买号','公众号投流-稳定','公众号代运营-Daily-系统','公众号合作-Daily-自选','服务号投流','公众号完全代投放','群/企微合作-稳定')
+            GROUP BY dt
+                     ,channel
+                     ,合作方名
+                     ,包名
+                     ,公众号名
+                     ,hotsencetype
+                     ,videoid
+                     ,rootsourceid
+                     ,title
+                     ,`merge二级品类`
+                     ,推荐状态
+            HAVING  访问uv >= {uv_threshold}
+        )
+        SELECT  DISTINCT channel
+                ,公众号名
+                ,hotsencetype
+                ,videoid
+                ,rootsourceid
+                ,title
+        FROM    top_video
+        WHERE   访问uv >= {uv_threshold}
+;
+        """
+        result = fetch_from_odps(query)
+        return result
+
+    @staticmethod
+    def process_odps_data(odps_list: List[Record]):
+        return [
+            {
+                "channel": i.channel,
+                "account_name": i.公众号名,
+                "hot_scene_type": i.hotsencetype,  # 大数据单词写错。场景: scene
+                "video_id": i.videoid,
+                "root_source_id": i.rootsourceid,
+                "title": i.title,
+            }
+            for i in odps_list
+        ]
+
+    @staticmethod
+    def get_match_video_real_path(raw_match_videos: List, video_id: int):
+        _response = raw_match_videos[0]["response"]
+        match_video = json.loads(_response)
+
+        for i in match_video:
+            if i["videoId"] == video_id:
+                return i["videoOss"]
+
+        return None
+
+    @staticmethod
+    async def get_pq_video_real_path(video_id):
+        response = await fetch_piaoquan_video_list_detail(video_list=[video_id])
+        video_path = response["data"][0]["ossVideoPath"]
+        return video_path

+ 131 - 0
app/domains/llm_tasks/decode_video/decode_video_produce.py

@@ -0,0 +1,131 @@
+"""
+建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度
+"""
+
+from typing import Dict
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.shared import run_tasks_with_asyncio_task_group
+
+from ._const import VideoDecodeConst
+from ._utils import VideoDecodeUtils
+from ._mapper import VideoDecodeMapper
+
+
+class DecodeVideoProduce(VideoDecodeConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.log_service: LogService = log_service
+        self.tool: VideoDecodeUtils = VideoDecodeUtils()
+        self.mapper: VideoDecodeMapper = VideoDecodeMapper(pool)
+
+    async def save_decode_info(
+        self, video_id, channel, hot_scene_type, video_path, title, root_source_id, dt
+    ):
+        # 存储到 video_decode_data 表中
+        insert_row = await self.mapper.save_video_to_decode_data(
+            data=(
+                video_id,
+                channel,
+                hot_scene_type,
+                video_path,
+                title,
+                root_source_id,
+                dt,
+            )
+        )
+        if not insert_row:
+            print(f"INSERT error!!!! Duplicated VideoId, video_id {video_id}")
+            return
+
+        # 进入解构队列
+        decoding_queue = await self.mapper.fetch_decode_task_queue(video_path)
+        if decoding_queue:
+            # 已经解构成功
+            print("该视频已经解构成功,无需解构")
+            return
+
+        else:
+            await self.mapper.insert_into_decode_task_queue(
+                data=(video_path, video_id, "20260429")
+            )
+
+    async def decode_daily_video(self, video_obj: Dict, dt: str):
+        root_source_id = video_obj["root_source_id"]  # 上游肯定有,不然上游就会报错
+        video_id = int(video_obj["video_id"])
+
+        # 查文章信息
+        article_info = await self.mapper.fetch_video_source_content(root_source_id)
+        if not article_info:
+            return await self.decode_other_video(video_obj=video_obj, dt=dt)
+        gh_id = article_info[0]["gh_id"]
+        content_id = article_info[0]["content_id"]
+        inner_vid = article_info[0]["video_id"]
+        trace_id = article_info[0]["trace_id"]
+        if inner_vid != video_id:
+            print(
+                f"error!!!!, root_source_id 和  video_id 映射失败 内部视频id{inner_vid}, 内部视频 type{type(inner_vid)}, 外部视频id{video_id}, 外部视频类型{type(video_id)}"
+            )
+            return await self.decode_other_video(video_obj=video_obj, dt=dt)
+
+        # 查匹配小程序信息
+        match_video_info = await self.mapper.fetch_video_match_result_v1(
+            gh_id=gh_id, content_id=content_id
+        )
+        if not match_video_info:
+            match_video_info = await self.mapper.fetch_video_match_result_v2(
+                trace_id=trace_id
+            )
+
+        if not match_video_info:
+            print(
+                f"error!!!!, gh_id{gh_id}, content_id{content_id}, trace_id{trace_id}"
+            )
+            return await self.decode_other_video(video_obj=video_obj, dt=dt)
+
+        video_path = self.tool.get_match_video_real_path(match_video_info, video_id)
+        return await self.save_decode_info(
+            video_id=video_id,
+            channel=video_obj["channel"],
+            hot_scene_type=video_obj["hot_scene_type"],
+            video_path=video_path,
+            title=video_obj["title"],
+            root_source_id=video_obj["root_source_id"],
+            dt=dt,
+        )
+
+    async def decode_other_video(self, video_obj: Dict, dt: str):
+        video_id = video_obj["video_id"]
+        video_path = await self.tool.get_pq_video_real_path(video_id=video_id)
+        await self.save_decode_info(
+            video_id=video_id,
+            channel=video_obj["channel"],
+            hot_scene_type=video_obj["hot_scene_type"],
+            video_path=video_path,
+            title=video_obj["title"],
+            root_source_id=video_obj["root_source_id"],
+            dt=dt,
+        )
+
+    async def process_single_video(self, video_obj: Dict):
+        dt = "20260427"
+        hot_scene_type = video_obj.get("hot_scene_type")
+        if hot_scene_type == self.SceneType.DAILY_ARTICLE:
+            return await self.decode_daily_video(video_obj, dt)
+        else:
+            return await self.decode_other_video(video_obj, dt)
+
+    async def deal(self):
+        odps_video_list = self.tool.get_top_head_videos(duration="history")
+        if not odps_video_list:
+            return
+        task_list = self.tool.process_odps_data(odps_video_list)
+
+        await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=self.process_single_video,
+            description="解构视频生产",
+            unit="video",
+            max_concurrency=10,
+        )