| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- """
- 建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度
- """
- import time
- import traceback
- from functools import partial
- from typing import Dict
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.shared import run_tasks_with_asyncio_task_group
- from app.infra.shared.tools import generate_task_trace_id
- from app.infra.external import feishu_robot
- from ._const import VideoDecodeConst
- from ._utils import VideoDecodeUtils
- from ._mapper import VideoDecodeMapper
- class DecodeVideoProduce(VideoDecodeConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.log_service: LogService = log_service
- self.tool: VideoDecodeUtils = VideoDecodeUtils()
- self.mapper: VideoDecodeMapper = VideoDecodeMapper(pool)
- async def _trace_log(self, **kwargs):
- payload = {"task": self.TASK_NAME, **kwargs}
- await self.log_service.log(contents=payload)
- async def save_decode_info(
- self, video_id, channel, hot_scene_type, video_path, title, root_source_id, dt
- ):
- if not video_path:
- await self._trace_log(
- event="video_path_missing",
- video_id=video_id,
- message="video_path 为空,跳过入队",
- )
- return
- # 存储到 video_decode_data 表中
- insert_row = await self.mapper.save_video_to_decode_data(
- data=(
- video_id,
- channel,
- hot_scene_type,
- video_path,
- title,
- root_source_id,
- dt,
- )
- )
- if not insert_row:
- await self._trace_log(
- event="video_duplicated",
- video_id=video_id,
- message="重复 video_id,跳过",
- )
- return
- # 进入解构队列(INSERT IGNORE 保证幂等,避免先查后插的并发竞争)
- insert_queue_row = await self.mapper.insert_into_decode_task_queue(
- data=(video_path, video_id, dt)
- )
- if not insert_queue_row:
- await self._trace_log(
- event="video_already_in_queue",
- video_id=video_id,
- video_path=video_path,
- message="该视频已在解构队列中,跳过",
- )
- return
- await self._trace_log(
- event="video_enqueued",
- video_id=video_id,
- video_path=video_path,
- dt=dt,
- message="视频已成功加入解构队列",
- )
- async def decode_daily_video(self, video_obj: Dict, dt: str):
- root_source_id = video_obj["root_source_id"] # 上游肯定有,不然上游就会报错
- video_id = int(video_obj["video_id"])
- # 查文章信息
- article_info = await self.mapper.fetch_video_source_content(root_source_id)
- if not article_info:
- await self._trace_log(
- event="daily_article_not_found",
- video_id=video_id,
- root_source_id=root_source_id,
- message="未找到文章信息,降级走普通视频链路",
- )
- return await self.decode_other_video(video_obj=video_obj, dt=dt)
- gh_id = article_info[0]["gh_id"]
- content_id = article_info[0]["content_id"]
- inner_vid = article_info[0]["video_id"]
- trace_id = article_info[0]["trace_id"]
- if inner_vid != video_id:
- await self._trace_log(
- event="video_id_mismatch",
- video_id=video_id,
- inner_video_id=inner_vid,
- root_source_id=root_source_id,
- message="root_source_id 与 video_id 映射失败,降级走普通视频链路",
- )
- return await self.decode_other_video(video_obj=video_obj, dt=dt)
- # 查匹配小程序信息
- match_video_info = await self.mapper.fetch_video_match_result_v1(
- gh_id=gh_id, content_id=content_id
- )
- if not match_video_info:
- match_video_info = await self.mapper.fetch_video_match_result_v2(
- trace_id=trace_id
- )
- if not match_video_info:
- await self._trace_log(
- event="match_video_not_found",
- video_id=video_id,
- gh_id=gh_id,
- content_id=content_id,
- content_trace_id=trace_id,
- message="未命中匹配视频,降级走普通视频链路",
- )
- return await self.decode_other_video(video_obj=video_obj, dt=dt)
- video_path = self.tool.get_match_video_real_path(match_video_info, video_id)
- return await self.save_decode_info(
- video_id=video_id,
- channel=video_obj["channel"],
- hot_scene_type=video_obj["hot_scene_type"],
- video_path=video_path,
- title=video_obj["title"],
- root_source_id=video_obj["root_source_id"],
- dt=dt,
- )
- async def decode_other_video(self, video_obj: Dict, dt: str):
- video_id = int(video_obj["video_id"])
- video_path = await self.tool.get_pq_video_real_path(video_id=video_id)
- await self.save_decode_info(
- video_id=video_id,
- channel=video_obj["channel"],
- hot_scene_type=video_obj["hot_scene_type"],
- video_path=video_path,
- title=video_obj["title"],
- root_source_id=video_obj["root_source_id"],
- dt=dt,
- )
- async def process_single_video(self, video_obj: Dict, dt, trace_id: str):
- video_id = int(video_obj["video_id"])
- # 判断视频是否解构过,如果有解构结果,那么直接 return
- if await self.mapper.is_video_decoded(video_id):
- return
- hot_scene_type = video_obj.get("hot_scene_type")
- try:
- if hot_scene_type == self.SceneType.DAILY_ARTICLE:
- await self.decode_daily_video(video_obj, dt)
- else:
- await self.decode_other_video(video_obj, dt)
- await self._trace_log(
- event="video_processed",
- trace_id=trace_id,
- video_id=video_id,
- hot_scene_type=hot_scene_type,
- status="success",
- )
- except Exception as e:
- await self._trace_log(
- event="video_process_failed",
- trace_id=trace_id,
- video_id=video_id,
- hot_scene_type=hot_scene_type,
- status="fail",
- error=str(e),
- traceback=traceback.format_exc(),
- )
- raise
- async def deal(self, execute_dt=None):
- if not execute_dt:
- execute_dt = self.tool.get_yesterday_dt()
- trace_id = generate_task_trace_id()
- start_time = time.time()
- odps_video_list = self.tool.get_top_head_videos(execute_dt=execute_dt)
- if not odps_video_list:
- await self._trace_log(
- event="task_empty",
- trace_id=trace_id,
- dt=execute_dt,
- message="未获取到待解构视频",
- )
- return
- task_list = self.tool.process_odps_data(odps_video_list)
- handler = partial(self.process_single_video, dt=execute_dt, trace_id=trace_id)
- await self._trace_log(
- event="task_start",
- trace_id=trace_id,
- dt=execute_dt,
- total_videos=len(odps_video_list),
- processed_videos=len(task_list),
- )
- result = await run_tasks_with_asyncio_task_group(
- task_list=task_list,
- handler=handler,
- description="解构视频生产",
- unit="video",
- max_concurrency=10,
- )
- duration = time.time() - start_time
- error_count = len(result["errors"])
- total_task = result["total_task"]
- fail_rate = error_count / total_task if total_task else 0
- await self._trace_log(
- event="task_complete",
- trace_id=trace_id,
- dt=execute_dt,
- total_task=total_task,
- success_count=result["processed_task"],
- error_count=error_count,
- fail_rate=fail_rate,
- duration_seconds=duration,
- avg_time_per_video=duration / total_task if total_task else 0,
- )
- if error_count:
- error_samples = [
- f"https://admin.piaoquantv.com/cms/post-detail/{task_obj.get('video_id')}/detail/ "
- for idx, task_obj, err in result["errors"][: self.ERROR_SAMPLE_LIMIT]
- ]
- await feishu_robot.bot(
- title="视频解构生产任务异常",
- detail={
- "dt": execute_dt,
- "trace_id": trace_id,
- "总任务数": total_task,
- "成功数": result["processed_task"],
- "失败数": error_count,
- "失败率": round(fail_rate, 4),
- "错误样例": error_samples,
- },
- mention=fail_rate >= self.FAIL_RATE_THRESHOLD,
- env=self.FEISHU_BOT_ENV,
- )
|