""" 建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度 """ import time import traceback from functools import partial from typing import Dict from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.shared import run_tasks_with_asyncio_task_group from app.infra.shared.tools import generate_task_trace_id from app.infra.external import feishu_robot from ._const import VideoDecodeConst from ._utils import VideoDecodeUtils from ._mapper import VideoDecodeMapper class DecodeVideoProduce(VideoDecodeConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.log_service: LogService = log_service self.tool: VideoDecodeUtils = VideoDecodeUtils() self.mapper: VideoDecodeMapper = VideoDecodeMapper(pool) async def _trace_log(self, **kwargs): payload = {"task": self.TASK_NAME, **kwargs} await self.log_service.log(contents=payload) async def save_decode_info( self, video_id, channel, hot_scene_type, video_path, title, root_source_id, dt ): if not video_path: await self._trace_log( event="video_path_missing", video_id=video_id, message="video_path 为空,跳过入队", ) return # 存储到 video_decode_data 表中 insert_row = await self.mapper.save_video_to_decode_data( data=( video_id, channel, hot_scene_type, video_path, title, root_source_id, dt, ) ) if not insert_row: await self._trace_log( event="video_duplicated", video_id=video_id, message="重复 video_id,跳过", ) return # 进入解构队列(INSERT IGNORE 保证幂等,避免先查后插的并发竞争) insert_queue_row = await self.mapper.insert_into_decode_task_queue( data=(video_path, video_id, dt) ) if not insert_queue_row: await self._trace_log( event="video_already_in_queue", video_id=video_id, video_path=video_path, message="该视频已在解构队列中,跳过", ) return await self._trace_log( event="video_enqueued", video_id=video_id, video_path=video_path, dt=dt, message="视频已成功加入解构队列", ) async def decode_daily_video(self, video_obj: Dict, dt: str): root_source_id = video_obj["root_source_id"] # 上游肯定有,不然上游就会报错 video_id = int(video_obj["video_id"]) # 查文章信息 article_info = await self.mapper.fetch_video_source_content(root_source_id) if not article_info: await self._trace_log( event="daily_article_not_found", video_id=video_id, root_source_id=root_source_id, message="未找到文章信息,降级走普通视频链路", ) return await self.decode_other_video(video_obj=video_obj, dt=dt) gh_id = article_info[0]["gh_id"] content_id = article_info[0]["content_id"] inner_vid = article_info[0]["video_id"] trace_id = article_info[0]["trace_id"] if inner_vid != video_id: await self._trace_log( event="video_id_mismatch", video_id=video_id, inner_video_id=inner_vid, root_source_id=root_source_id, message="root_source_id 与 video_id 映射失败,降级走普通视频链路", ) return await self.decode_other_video(video_obj=video_obj, dt=dt) # 查匹配小程序信息 match_video_info = await self.mapper.fetch_video_match_result_v1( gh_id=gh_id, content_id=content_id ) if not match_video_info: match_video_info = await self.mapper.fetch_video_match_result_v2( trace_id=trace_id ) if not match_video_info: await self._trace_log( event="match_video_not_found", video_id=video_id, gh_id=gh_id, content_id=content_id, content_trace_id=trace_id, message="未命中匹配视频,降级走普通视频链路", ) return await self.decode_other_video(video_obj=video_obj, dt=dt) video_path = self.tool.get_match_video_real_path(match_video_info, video_id) return await self.save_decode_info( video_id=video_id, channel=video_obj["channel"], hot_scene_type=video_obj["hot_scene_type"], video_path=video_path, title=video_obj["title"], root_source_id=video_obj["root_source_id"], dt=dt, ) async def decode_other_video(self, video_obj: Dict, dt: str): video_id = int(video_obj["video_id"]) video_path = await self.tool.get_pq_video_real_path(video_id=video_id) await self.save_decode_info( video_id=video_id, channel=video_obj["channel"], hot_scene_type=video_obj["hot_scene_type"], video_path=video_path, title=video_obj["title"], root_source_id=video_obj["root_source_id"], dt=dt, ) async def process_single_video(self, video_obj: Dict, dt, trace_id: str): video_id = int(video_obj["video_id"]) # 判断视频是否解构过,如果有解构结果,那么直接 return if await self.mapper.is_video_decoded(video_id): return hot_scene_type = video_obj.get("hot_scene_type") try: if hot_scene_type == self.SceneType.DAILY_ARTICLE: await self.decode_daily_video(video_obj, dt) else: await self.decode_other_video(video_obj, dt) await self._trace_log( event="video_processed", trace_id=trace_id, video_id=video_id, hot_scene_type=hot_scene_type, status="success", ) except Exception as e: await self._trace_log( event="video_process_failed", trace_id=trace_id, video_id=video_id, hot_scene_type=hot_scene_type, status="fail", error=str(e), traceback=traceback.format_exc(), ) raise async def deal(self, execute_dt=None): if not execute_dt: execute_dt = self.tool.get_yesterday_dt() trace_id = generate_task_trace_id() start_time = time.time() odps_video_list = self.tool.get_top_head_videos(execute_dt=execute_dt) if not odps_video_list: await self._trace_log( event="task_empty", trace_id=trace_id, dt=execute_dt, message="未获取到待解构视频", ) return task_list = self.tool.process_odps_data(odps_video_list) handler = partial(self.process_single_video, dt=execute_dt, trace_id=trace_id) await self._trace_log( event="task_start", trace_id=trace_id, dt=execute_dt, total_videos=len(odps_video_list), processed_videos=len(task_list), ) result = await run_tasks_with_asyncio_task_group( task_list=task_list, handler=handler, description="解构视频生产", unit="video", max_concurrency=10, ) duration = time.time() - start_time error_count = len(result["errors"]) total_task = result["total_task"] fail_rate = error_count / total_task if total_task else 0 await self._trace_log( event="task_complete", trace_id=trace_id, dt=execute_dt, total_task=total_task, success_count=result["processed_task"], error_count=error_count, fail_rate=fail_rate, duration_seconds=duration, avg_time_per_video=duration / total_task if total_task else 0, ) if error_count: error_samples = [ f"https://admin.piaoquantv.com/cms/post-detail/{task_obj.get('video_id')}/detail/ " for idx, task_obj, err in result["errors"][: self.ERROR_SAMPLE_LIMIT] ] await feishu_robot.bot( title="视频解构生产任务异常", detail={ "dt": execute_dt, "trace_id": trace_id, "总任务数": total_task, "成功数": result["processed_task"], "失败数": error_count, "失败率": round(fail_rate, 4), "错误样例": error_samples, }, mention=fail_rate >= self.FAIL_RATE_THRESHOLD, env=self.FEISHU_BOT_ENV, )