Explorar o código

解构增长头部视频

luojunhui hai 1 semana
pai
achega
b4e22773c3

+ 1 - 1
app/domains/llm_tasks/decode_video/__init__.py

@@ -1 +1 @@
-from .decode_video_produce import DecodeVideoProduce
+from .decode_video_produce import DecodeVideoProduce

+ 5 - 0
app/domains/llm_tasks/decode_video/_const.py

@@ -1,4 +1,9 @@
 class VideoDecodeConst:
+    TASK_NAME = "decode_video_produce"
+    FEISHU_BOT_ENV = "long_articles_task"
+    FAIL_RATE_THRESHOLD = 0.1
+    ERROR_SAMPLE_LIMIT = 5
+
     class TaskStatus:
         INIT = 0
         PROCESSING = 1

+ 11 - 10
app/domains/llm_tasks/decode_video/_mapper.py

@@ -46,8 +46,17 @@ class VideoDecodeMapper:
             params=data,
         )
 
-    async def update_video_decode_data(self, ori_status, new_status, video_path):
-        pass
+    async def update_video_decode_data_status(self, video_id, ori_status, new_status):
+        query = """
+            UPDATE video_decode_data
+            SET status = %s
+            WHERE video_id = %s AND status = %s;
+        """
+        affected_rows = await self.pool.async_save(
+            query=query,
+            params=(new_status, video_id, ori_status),
+        )
+        return bool(affected_rows)
 
     async def insert_into_decode_task_queue(self, data: tuple):
         query = """
@@ -60,11 +69,3 @@ class VideoDecodeMapper:
             query=query,
             params=data,
         )
-
-    async def fetch_decode_task_queue(self, video_path):
-        query = """
-            SELECT video_path, sample_video_id, dt
-            FROM video_decode_queue
-            WHERE video_path = %s;
-            """
-        return await self.pool.async_fetch(query=query, params=(video_path,))

+ 146 - 22
app/domains/llm_tasks/decode_video/decode_video_produce.py

@@ -2,6 +2,8 @@
 建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度
 """
 
+import time
+import traceback
 from functools import partial
 from typing import Dict
 
@@ -9,6 +11,8 @@ from app.core.database import DatabaseManager
 from app.core.observability import LogService
 
 from app.infra.shared import run_tasks_with_asyncio_task_group
+from app.infra.shared.tools import generate_task_trace_id
+from app.infra.external import feishu_robot
 
 from ._const import VideoDecodeConst
 from ._utils import VideoDecodeUtils
@@ -21,9 +25,21 @@ class DecodeVideoProduce(VideoDecodeConst):
         self.tool: VideoDecodeUtils = VideoDecodeUtils()
         self.mapper: VideoDecodeMapper = VideoDecodeMapper(pool)
 
+    async def _trace_log(self, **kwargs):
+        payload = {"task": self.TASK_NAME, **kwargs}
+        await self.log_service.log(contents=payload)
+
     async def save_decode_info(
         self, video_id, channel, hot_scene_type, video_path, title, root_source_id, dt
     ):
+        if not video_path:
+            await self._trace_log(
+                event="video_path_missing",
+                video_id=video_id,
+                message="video_path 为空,跳过入队",
+            )
+            return
+
         # 存储到 video_decode_data 表中
         insert_row = await self.mapper.save_video_to_decode_data(
             data=(
@@ -37,20 +53,33 @@ class DecodeVideoProduce(VideoDecodeConst):
             )
         )
         if not insert_row:
-            print(f"INSERT error!!!! Duplicated VideoId, video_id {video_id}")
+            await self._trace_log(
+                event="video_duplicated",
+                video_id=video_id,
+                message="重复 video_id,跳过",
+            )
             return
 
-        # 进入解构队列
-        decoding_queue = await self.mapper.fetch_decode_task_queue(video_path)
-        if decoding_queue:
-            # 已经解构成功
-            print("该视频已经解构成功,无需解构")
+        # 进入解构队列(INSERT IGNORE 保证幂等,避免先查后插的并发竞争)
+        insert_queue_row = await self.mapper.insert_into_decode_task_queue(
+            data=(video_path, video_id, dt)
+        )
+        if not insert_queue_row:
+            await self._trace_log(
+                event="video_already_in_queue",
+                video_id=video_id,
+                video_path=video_path,
+                message="该视频已在解构队列中,跳过",
+            )
             return
 
-        else:
-            await self.mapper.insert_into_decode_task_queue(
-                data=(video_path, video_id, dt)
-            )
+        await self._trace_log(
+            event="video_enqueued",
+            video_id=video_id,
+            video_path=video_path,
+            dt=dt,
+            message="视频已成功加入解构队列",
+        )
 
     async def decode_daily_video(self, video_obj: Dict, dt: str):
         root_source_id = video_obj["root_source_id"]  # 上游肯定有,不然上游就会报错
@@ -59,14 +88,24 @@ class DecodeVideoProduce(VideoDecodeConst):
         # 查文章信息
         article_info = await self.mapper.fetch_video_source_content(root_source_id)
         if not article_info:
+            await self._trace_log(
+                event="daily_article_not_found",
+                video_id=video_id,
+                root_source_id=root_source_id,
+                message="未找到文章信息,降级走普通视频链路",
+            )
             return await self.decode_other_video(video_obj=video_obj, dt=dt)
         gh_id = article_info[0]["gh_id"]
         content_id = article_info[0]["content_id"]
         inner_vid = article_info[0]["video_id"]
         trace_id = article_info[0]["trace_id"]
         if inner_vid != video_id:
-            print(
-                f"error!!!!, root_source_id 和  video_id 映射失败 内部视频id{inner_vid}, 内部视频 type{type(inner_vid)}, 外部视频id{video_id}, 外部视频类型{type(video_id)}"
+            await self._trace_log(
+                event="video_id_mismatch",
+                video_id=video_id,
+                inner_video_id=inner_vid,
+                root_source_id=root_source_id,
+                message="root_source_id 与 video_id 映射失败,降级走普通视频链路",
             )
             return await self.decode_other_video(video_obj=video_obj, dt=dt)
 
@@ -80,8 +119,13 @@ class DecodeVideoProduce(VideoDecodeConst):
             )
 
         if not match_video_info:
-            print(
-                f"error!!!!, gh_id{gh_id}, content_id{content_id}, trace_id{trace_id}"
+            await self._trace_log(
+                event="match_video_not_found",
+                video_id=video_id,
+                gh_id=gh_id,
+                content_id=content_id,
+                content_trace_id=trace_id,
+                message="未命中匹配视频,降级走普通视频链路",
             )
             return await self.decode_other_video(video_obj=video_obj, dt=dt)
 
@@ -97,7 +141,7 @@ class DecodeVideoProduce(VideoDecodeConst):
         )
 
     async def decode_other_video(self, video_obj: Dict, dt: str):
-        video_id = video_obj["video_id"]
+        video_id = int(video_obj["video_id"])
         video_path = await self.tool.get_pq_video_real_path(video_id=video_id)
         await self.save_decode_info(
             video_id=video_id,
@@ -109,24 +153,104 @@ class DecodeVideoProduce(VideoDecodeConst):
             dt=dt,
         )
 
-    async def process_single_video(self, video_obj: Dict, dt):
+    async def process_single_video(self, video_obj: Dict, dt, trace_id: str):
+        video_id = int(video_obj["video_id"])
         hot_scene_type = video_obj.get("hot_scene_type")
-        if hot_scene_type == self.SceneType.DAILY_ARTICLE:
-            return await self.decode_daily_video(video_obj, dt)
-        else:
-            return await self.decode_other_video(video_obj, dt)
+        try:
+            if hot_scene_type == self.SceneType.DAILY_ARTICLE:
+                await self.decode_daily_video(video_obj, dt)
+            else:
+                await self.decode_other_video(video_obj, dt)
+
+            await self._trace_log(
+                event="video_processed",
+                trace_id=trace_id,
+                video_id=video_id,
+                hot_scene_type=hot_scene_type,
+                status="success",
+            )
+        except Exception as e:
+            await self._trace_log(
+                event="video_process_failed",
+                trace_id=trace_id,
+                video_id=video_id,
+                hot_scene_type=hot_scene_type,
+                status="fail",
+                error=str(e),
+                traceback=traceback.format_exc(),
+            )
+            raise
 
     async def deal(self, execute_dt="20260401"):
+        trace_id = generate_task_trace_id()
+        start_time = time.time()
+
         odps_video_list = self.tool.get_top_head_videos(execute_dt=execute_dt)
         if not odps_video_list:
+            await self._trace_log(
+                event="task_empty",
+                trace_id=trace_id,
+                dt=execute_dt,
+                message="未获取到待解构视频",
+            )
             return
+
         task_list = self.tool.process_odps_data(odps_video_list)
-        handler = partial(self.process_single_video, dt=execute_dt)
+        handler = partial(self.process_single_video, dt=execute_dt, trace_id=trace_id)
 
-        await run_tasks_with_asyncio_task_group(
+        await self._trace_log(
+            event="task_start",
+            trace_id=trace_id,
+            dt=execute_dt,
+            total_videos=len(odps_video_list),
+            processed_videos=len(task_list),
+        )
+
+        result = await run_tasks_with_asyncio_task_group(
             task_list=task_list,
             handler=handler,
             description="解构视频生产",
             unit="video",
             max_concurrency=10,
         )
+
+        duration = time.time() - start_time
+        error_count = len(result["errors"])
+        total_task = result["total_task"]
+        fail_rate = error_count / total_task if total_task else 0
+
+        await self._trace_log(
+            event="task_complete",
+            trace_id=trace_id,
+            dt=execute_dt,
+            total_task=total_task,
+            success_count=result["processed_task"],
+            error_count=error_count,
+            fail_rate=fail_rate,
+            duration_seconds=duration,
+            avg_time_per_video=duration / total_task if total_task else 0,
+        )
+
+        if error_count:
+            error_samples = [
+                {
+                    "index": idx,
+                    "video_id": task_obj.get("video_id"),
+                    "error": str(err),
+                }
+                for idx, task_obj, err in result["errors"][: self.ERROR_SAMPLE_LIMIT]
+            ]
+            await feishu_robot.bot(
+                title="视频解构生产任务异常",
+                detail={
+                    "dt": execute_dt,
+                    "trace_id": trace_id,
+                    "总任务数": total_task,
+                    "成功数": result["processed_task"],
+                    "失败数": error_count,
+                    "失败率": round(fail_rate, 4),
+                    "错误样例": error_samples,
+                },
+                mention=fail_rate >= self.FAIL_RATE_THRESHOLD,
+                env=self.FEISHU_BOT_ENV,
+            )