소스 검색

Merge branch 'feature/luojunhui/20260428-decode-task-improve' of Server/LongArticleTaskServer into master

luojunhui 1 주 전
부모
커밋
af05394b10

+ 3 - 0
app/core/config/global_settings.py

@@ -26,6 +26,9 @@ class GlobalConfigSettings(BaseSettings):
         default_factory=PiaoquanCrawlerDatabaseConfig
     )
     growth_db: GrowthDatabaseConfig = Field(default_factory=GrowthDatabaseConfig)
+    content_decode_db: ContentDeconstructionSupplyConfig = Field(
+        default_factory=ContentDeconstructionSupplyConfig
+    )
 
     # ============ 外部服务配置 ============
     deepseek: DeepSeekConfig = Field(default_factory=DeepSeekConfig)

+ 2 - 0
app/core/config/settings/__init__.py

@@ -10,6 +10,7 @@ from .mysql import GrowthDatabaseConfig
 from .mysql import LongArticlesDatabaseConfig
 from .mysql import LongVideoDatabaseConfig
 from .mysql import PiaoquanCrawlerDatabaseConfig
+from .mysql import ContentDeconstructionSupplyConfig
 from .read_rate_limited import ReadRateLimited
 from .task_chinese_name import TaskChineseNameConfig
 
@@ -29,4 +30,5 @@ __ALL__ = [
     "PiaoquanCrawlerDatabaseConfig",
     "TaskChineseNameConfig",
     "ReadRateLimited",
+    "ContentDeconstructionSupplyConfig",
 ]

+ 14 - 0
app/core/config/settings/mysql.py

@@ -103,3 +103,17 @@ class GrowthDatabaseConfig(DatabaseConfig):
     model_config = SettingsConfigDict(
         env_prefix="GROWTH_DB_", env_file=".env", case_sensitive=False, extra="ignore"
     )
+
+
+class ContentDeconstructionSupplyConfig(DatabaseConfig):
+    """解构数据库配置"""
+
+    host: str = "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com"
+    user: str = "content_rw"
+    password: str = "bC1aH4bA1lB0"
+    db: str = "content-deconstruction-supply"
+
+    model_config = SettingsConfigDict(
+        env_prefix="CONTENT_DECONSREUCTION_DB_", env_file=".env", case_sensitive=False, extra="ignore"
+    )
+

+ 1 - 0
app/core/database/mysql_pools.py

@@ -19,6 +19,7 @@ class DatabaseManager(LogService):
             "long_video": config.long_video_db,
             "long_articles": config.long_articles_db,
             "piaoquan_crawler": config.piaoquan_crawler_db,
+            "content_decode": config.content_decode_db,
         }
         self.pools = {}
 

+ 2 - 0
app/domains/llm_tasks/__init__.py

@@ -2,10 +2,12 @@ from .process_title import TitleRewrite
 from .process_title import ArticlePoolCategoryGeneration
 from .process_title import ExtractTitleFeatures
 from .candidate_account_process import CandidateAccountQualityScoreRecognizer
+from .decode_video import DecodeVideoProduce
 
 __all__ = [
     "TitleRewrite",
     "CandidateAccountQualityScoreRecognizer",
     "ArticlePoolCategoryGeneration",
     "ExtractTitleFeatures",
+    "DecodeVideoProduce"
 ]

+ 6 - 0
app/domains/llm_tasks/decode_video/__init__.py

@@ -0,0 +1,6 @@
+from .decode_video_produce import DecodeVideoProduce
+
+
+__all__ = [
+    "DecodeVideoProduce"
+]

+ 14 - 0
app/domains/llm_tasks/decode_video/_const.py

@@ -0,0 +1,14 @@
+class VideoDecodeConst:
+    TASK_NAME = "decode_video_produce"
+    FEISHU_BOT_ENV = "long_articles_task"
+    FAIL_RATE_THRESHOLD = 0.1
+    ERROR_SAMPLE_LIMIT = 20
+
+    class TaskStatus:
+        INIT = 0
+        PROCESSING = 1
+        SUCCESS = 2
+        FAILED = 99
+
+    class SceneType:
+        DAILY_ARTICLE: str = "1058"

+ 81 - 0
app/domains/llm_tasks/decode_video/_mapper.py

@@ -0,0 +1,81 @@
+from app.core.database import DatabaseManager
+
+from ._const import VideoDecodeConst
+
+
+class VideoDecodeMapper(VideoDecodeConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def fetch_video_source_content(self, root_source_id: str):
+        query = """
+            SELECT content_id, gh_id, video_id, trace_id
+            FROM long_articles_root_source_id
+            WHERE root_source_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(root_source_id,))
+
+    async def fetch_video_match_result_v1(self, gh_id: str, content_id: str):
+        query = """
+            SELECT response FROM long_articles_match_videos WHERE gh_id = %s AND content_id = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(
+                gh_id,
+                content_id,
+            ),
+        )
+
+    async def fetch_video_match_result_v2(self, trace_id: str):
+        query = """
+            SELECT response FROM long_articles_match_videos WHERE trace_id = %s
+        """
+        return await self.pool.async_fetch(query=query, params=(trace_id,))
+
+    async def save_video_to_decode_data(self, data: tuple):
+        """
+        存储数据到 video_decode_data
+        """
+        query = """
+            INSERT IGNORE INTO video_decode_data
+                (video_id, channel, hot_scene_type, video_path, title, root_source_id, dt)
+            VALUES
+                (%s, %s, %s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=data,
+        )
+
+    async def update_video_decode_data_status(self, video_id, ori_status, new_status):
+        query = """
+            UPDATE video_decode_data
+            SET status = %s
+            WHERE video_id = %s AND status = %s;
+        """
+        affected_rows = await self.pool.async_save(
+            query=query,
+            params=(new_status, video_id, ori_status),
+        )
+        return bool(affected_rows)
+
+    async def insert_into_decode_task_queue(self, data: tuple):
+        query = """
+            INSERT IGNORE INTO video_decode_queue
+                (video_path, sample_video_id, dt)
+            VALUES
+                (%s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=data,
+        )
+
+    async def is_video_decoded(self, video_id: int):
+        query = """
+            SELECT vid FROM aigc_topic_decode_task_result WHERE vid = %s AND status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="content_decode", params=(video_id, self.TaskStatus.SUCCESS)
+        )

+ 86 - 0
app/domains/llm_tasks/decode_video/_utils.py

@@ -0,0 +1,86 @@
+import json
+from typing import List
+
+from datetime import datetime, timedelta
+from odps.types import Record
+
+from app.infra.shared.tools import fetch_from_odps
+from app.infra.internal.piaoquan import fetch_piaoquan_video_list_detail
+
+
+class VideoDecodeUtils:
+    # 获取检测的账号 list
+    @staticmethod
+    def get_top_head_videos(execute_dt):
+        uv_threshold = 500
+        query = f"""
+            WITH top_video AS 
+        (
+            SELECT  dt, channel, 合作方名,包名,公众号名,hotsencetype, videoid, rootsourceid,title,`merge二级品类`
+                    ,COUNT(DISTINCT mid) AS 访问uv
+            FROM    loghubods.opengid_base_data
+            WHERE   dt = '{execute_dt}'
+            AND     hotsencetype != 1167
+            AND     videoid IS NOT NULL
+            AND     usersharedepth = 0
+            AND     channel IN ('公众号合作-即转-稳定','小程序投流-稳定','服务号合作-Daily-自选','群/企微合作-稳定','公众号买号','服务号买号','公众号投流-稳定','公众号代运营-Daily-系统','公众号合作-Daily-自选','服务号投流','公众号完全代投放','群/企微合作-稳定')
+            GROUP BY dt
+                     ,channel
+                     ,合作方名
+                     ,包名
+                     ,公众号名
+                     ,hotsencetype
+                     ,videoid
+                     ,rootsourceid
+                     ,title
+                     ,`merge二级品类`
+                     ,推荐状态
+            HAVING  访问uv >= {uv_threshold}
+        )
+        SELECT  DISTINCT channel
+                ,公众号名
+                ,hotsencetype
+                ,videoid
+                ,rootsourceid
+                ,title
+        FROM    top_video
+        WHERE   访问uv >= {uv_threshold}
+;
+        """
+        result = fetch_from_odps(query)
+        return result
+
+    @staticmethod
+    def process_odps_data(odps_list: List[Record]):
+        return [
+            {
+                "channel": i.channel,
+                "account_name": i.公众号名,
+                "hot_scene_type": i.hotsencetype,  # 大数据单词写错。场景: scene
+                "video_id": i.videoid,
+                "root_source_id": i.rootsourceid,
+                "title": i.title,
+            }
+            for i in odps_list
+        ]
+
+    @staticmethod
+    def get_match_video_real_path(raw_match_videos: List, video_id: int):
+        _response = raw_match_videos[0]["response"]
+        match_video = json.loads(_response)
+
+        for i in match_video:
+            if i["videoId"] == video_id:
+                return i["videoOss"]
+
+        return None
+
+    @staticmethod
+    async def get_pq_video_real_path(video_id):
+        response = await fetch_piaoquan_video_list_detail(video_list=[video_id])
+        video_path = response["data"][0]["ossVideoPath"]
+        return video_path
+
+    @staticmethod
+    def get_yesterday_dt():
+        return (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")

+ 259 - 0
app/domains/llm_tasks/decode_video/decode_video_produce.py

@@ -0,0 +1,259 @@
+"""
+建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度
+"""
+
+import time
+import traceback
+from functools import partial
+from typing import Dict
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.shared import run_tasks_with_asyncio_task_group
+from app.infra.shared.tools import generate_task_trace_id
+from app.infra.external import feishu_robot
+
+from ._const import VideoDecodeConst
+from ._utils import VideoDecodeUtils
+from ._mapper import VideoDecodeMapper
+
+
+class DecodeVideoProduce(VideoDecodeConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.log_service: LogService = log_service
+        self.tool: VideoDecodeUtils = VideoDecodeUtils()
+        self.mapper: VideoDecodeMapper = VideoDecodeMapper(pool)
+
+    async def _trace_log(self, **kwargs):
+        payload = {"task": self.TASK_NAME, **kwargs}
+        await self.log_service.log(contents=payload)
+
+    async def save_decode_info(
+        self, video_id, channel, hot_scene_type, video_path, title, root_source_id, dt
+    ):
+        if not video_path:
+            await self._trace_log(
+                event="video_path_missing",
+                video_id=video_id,
+                message="video_path 为空,跳过入队",
+            )
+            return
+
+        # 存储到 video_decode_data 表中
+        insert_row = await self.mapper.save_video_to_decode_data(
+            data=(
+                video_id,
+                channel,
+                hot_scene_type,
+                video_path,
+                title,
+                root_source_id,
+                dt,
+            )
+        )
+        if not insert_row:
+            await self._trace_log(
+                event="video_duplicated",
+                video_id=video_id,
+                message="重复 video_id,跳过",
+            )
+            return
+
+        # 进入解构队列(INSERT IGNORE 保证幂等,避免先查后插的并发竞争)
+        insert_queue_row = await self.mapper.insert_into_decode_task_queue(
+            data=(video_path, video_id, dt)
+        )
+        if not insert_queue_row:
+            await self._trace_log(
+                event="video_already_in_queue",
+                video_id=video_id,
+                video_path=video_path,
+                message="该视频已在解构队列中,跳过",
+            )
+            return
+
+        await self._trace_log(
+            event="video_enqueued",
+            video_id=video_id,
+            video_path=video_path,
+            dt=dt,
+            message="视频已成功加入解构队列",
+        )
+
+    async def decode_daily_video(self, video_obj: Dict, dt: str):
+        root_source_id = video_obj["root_source_id"]  # 上游肯定有,不然上游就会报错
+        video_id = int(video_obj["video_id"])
+
+        # 查文章信息
+        article_info = await self.mapper.fetch_video_source_content(root_source_id)
+        if not article_info:
+            await self._trace_log(
+                event="daily_article_not_found",
+                video_id=video_id,
+                root_source_id=root_source_id,
+                message="未找到文章信息,降级走普通视频链路",
+            )
+            return await self.decode_other_video(video_obj=video_obj, dt=dt)
+        gh_id = article_info[0]["gh_id"]
+        content_id = article_info[0]["content_id"]
+        inner_vid = article_info[0]["video_id"]
+        trace_id = article_info[0]["trace_id"]
+        if inner_vid != video_id:
+            await self._trace_log(
+                event="video_id_mismatch",
+                video_id=video_id,
+                inner_video_id=inner_vid,
+                root_source_id=root_source_id,
+                message="root_source_id 与 video_id 映射失败,降级走普通视频链路",
+            )
+            return await self.decode_other_video(video_obj=video_obj, dt=dt)
+
+        # 查匹配小程序信息
+        match_video_info = await self.mapper.fetch_video_match_result_v1(
+            gh_id=gh_id, content_id=content_id
+        )
+        if not match_video_info:
+            match_video_info = await self.mapper.fetch_video_match_result_v2(
+                trace_id=trace_id
+            )
+
+        if not match_video_info:
+            await self._trace_log(
+                event="match_video_not_found",
+                video_id=video_id,
+                gh_id=gh_id,
+                content_id=content_id,
+                content_trace_id=trace_id,
+                message="未命中匹配视频,降级走普通视频链路",
+            )
+            return await self.decode_other_video(video_obj=video_obj, dt=dt)
+
+        video_path = self.tool.get_match_video_real_path(match_video_info, video_id)
+        return await self.save_decode_info(
+            video_id=video_id,
+            channel=video_obj["channel"],
+            hot_scene_type=video_obj["hot_scene_type"],
+            video_path=video_path,
+            title=video_obj["title"],
+            root_source_id=video_obj["root_source_id"],
+            dt=dt,
+        )
+
+    async def decode_other_video(self, video_obj: Dict, dt: str):
+        video_id = int(video_obj["video_id"])
+        video_path = await self.tool.get_pq_video_real_path(video_id=video_id)
+        await self.save_decode_info(
+            video_id=video_id,
+            channel=video_obj["channel"],
+            hot_scene_type=video_obj["hot_scene_type"],
+            video_path=video_path,
+            title=video_obj["title"],
+            root_source_id=video_obj["root_source_id"],
+            dt=dt,
+        )
+
+    async def process_single_video(self, video_obj: Dict, dt, trace_id: str):
+        video_id = int(video_obj["video_id"])
+        # 判断视频是否解构过,如果有解构结果,那么直接 return
+        if await self.mapper.is_video_decoded(video_id):
+            return
+
+        hot_scene_type = video_obj.get("hot_scene_type")
+        try:
+            if hot_scene_type == self.SceneType.DAILY_ARTICLE:
+                await self.decode_daily_video(video_obj, dt)
+            else:
+                await self.decode_other_video(video_obj, dt)
+
+            await self._trace_log(
+                event="video_processed",
+                trace_id=trace_id,
+                video_id=video_id,
+                hot_scene_type=hot_scene_type,
+                status="success",
+            )
+        except Exception as e:
+            await self._trace_log(
+                event="video_process_failed",
+                trace_id=trace_id,
+                video_id=video_id,
+                hot_scene_type=hot_scene_type,
+                status="fail",
+                error=str(e),
+                traceback=traceback.format_exc(),
+            )
+            raise
+
+    async def deal(self, execute_dt=None):
+        if not execute_dt:
+            execute_dt = self.tool.get_yesterday_dt()
+
+        trace_id = generate_task_trace_id()
+        start_time = time.time()
+
+        odps_video_list = self.tool.get_top_head_videos(execute_dt=execute_dt)
+        if not odps_video_list:
+            await self._trace_log(
+                event="task_empty",
+                trace_id=trace_id,
+                dt=execute_dt,
+                message="未获取到待解构视频",
+            )
+            return
+
+        task_list = self.tool.process_odps_data(odps_video_list)
+        handler = partial(self.process_single_video, dt=execute_dt, trace_id=trace_id)
+
+        await self._trace_log(
+            event="task_start",
+            trace_id=trace_id,
+            dt=execute_dt,
+            total_videos=len(odps_video_list),
+            processed_videos=len(task_list),
+        )
+
+        result = await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=handler,
+            description="解构视频生产",
+            unit="video",
+            max_concurrency=10,
+        )
+
+        duration = time.time() - start_time
+        error_count = len(result["errors"])
+        total_task = result["total_task"]
+        fail_rate = error_count / total_task if total_task else 0
+
+        await self._trace_log(
+            event="task_complete",
+            trace_id=trace_id,
+            dt=execute_dt,
+            total_task=total_task,
+            success_count=result["processed_task"],
+            error_count=error_count,
+            fail_rate=fail_rate,
+            duration_seconds=duration,
+            avg_time_per_video=duration / total_task if total_task else 0,
+        )
+
+        if error_count:
+            error_samples = [
+                f"https://admin.piaoquantv.com/cms/post-detail/{task_obj.get('video_id')}/detail/ "
+                for idx, task_obj, err in result["errors"][: self.ERROR_SAMPLE_LIMIT]
+            ]
+            await feishu_robot.bot(
+                title="视频解构生产任务异常",
+                detail={
+                    "dt": execute_dt,
+                    "trace_id": trace_id,
+                    "总任务数": total_task,
+                    "成功数": result["processed_task"],
+                    "失败数": error_count,
+                    "失败率": round(fail_rate, 4),
+                    "错误样例": error_samples,
+                },
+                mention=fail_rate >= self.FAIL_RATE_THRESHOLD,
+                env=self.FEISHU_BOT_ENV,
+            )

+ 2 - 0
app/jobs/domains/llm_task.py

@@ -6,6 +6,7 @@ from app.domains.llm_tasks import TitleRewrite
 from app.domains.llm_tasks import ArticlePoolCategoryGeneration
 from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
 from app.domains.llm_tasks import ExtractTitleFeatures
+from app.domains.llm_tasks import DecodeVideoProduce
 
 __all__ = [
     "CreateAdPlatformArticlesDecodeTask",
@@ -16,4 +17,5 @@ __all__ = [
     "ArticlePoolCategoryGeneration",
     "CandidateAccountQualityScoreRecognizer",
     "ExtractTitleFeatures",
+    "DecodeVideoProduce",
 ]

+ 7 - 0
app/jobs/task_handler.py

@@ -357,6 +357,13 @@ class TaskHandler:
         await task.deal(data=self.data)
         return TaskStatus.SUCCESS
 
+    @register("produce_decode_growth_head_videos")
+    async def _produce_decode_growth_head_videos(self) -> int:
+        """生产增长头部待解构视频"""
+        task = DecodeVideoProduce(pool=self.db_client, log_service=self.log_client)
+        await task.deal()
+        return TaskStatus.SUCCESS
+
     # ==================== 统计分析类任务 ====================
 
     @register("update_account_read_rate_avg")