decode_video_produce.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. """
  2. 建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度
  3. """
  4. import time
  5. import traceback
  6. from functools import partial
  7. from typing import Dict
  8. from app.core.database import DatabaseManager
  9. from app.core.observability import LogService
  10. from app.infra.shared import run_tasks_with_asyncio_task_group
  11. from app.infra.shared.tools import generate_task_trace_id
  12. from app.infra.external import feishu_robot
  13. from ._const import VideoDecodeConst
  14. from ._utils import VideoDecodeUtils
  15. from ._mapper import VideoDecodeMapper
  16. class DecodeVideoProduce(VideoDecodeConst):
  17. def __init__(self, pool: DatabaseManager, log_service: LogService):
  18. self.log_service: LogService = log_service
  19. self.tool: VideoDecodeUtils = VideoDecodeUtils()
  20. self.mapper: VideoDecodeMapper = VideoDecodeMapper(pool)
  21. async def _trace_log(self, **kwargs):
  22. payload = {"task": self.TASK_NAME, **kwargs}
  23. await self.log_service.log(contents=payload)
  24. async def save_decode_info(
  25. self, video_id, channel, hot_scene_type, video_path, title, root_source_id, dt
  26. ):
  27. if not video_path:
  28. await self._trace_log(
  29. event="video_path_missing",
  30. video_id=video_id,
  31. message="video_path 为空,跳过入队",
  32. )
  33. return
  34. # 存储到 video_decode_data 表中
  35. insert_row = await self.mapper.save_video_to_decode_data(
  36. data=(
  37. video_id,
  38. channel,
  39. hot_scene_type,
  40. video_path,
  41. title,
  42. root_source_id,
  43. dt,
  44. )
  45. )
  46. if not insert_row:
  47. await self._trace_log(
  48. event="video_duplicated",
  49. video_id=video_id,
  50. message="重复 video_id,跳过",
  51. )
  52. return
  53. # 进入解构队列(INSERT IGNORE 保证幂等,避免先查后插的并发竞争)
  54. insert_queue_row = await self.mapper.insert_into_decode_task_queue(
  55. data=(video_path, video_id, dt)
  56. )
  57. if not insert_queue_row:
  58. await self._trace_log(
  59. event="video_already_in_queue",
  60. video_id=video_id,
  61. video_path=video_path,
  62. message="该视频已在解构队列中,跳过",
  63. )
  64. return
  65. await self._trace_log(
  66. event="video_enqueued",
  67. video_id=video_id,
  68. video_path=video_path,
  69. dt=dt,
  70. message="视频已成功加入解构队列",
  71. )
  72. async def decode_daily_video(self, video_obj: Dict, dt: str):
  73. root_source_id = video_obj["root_source_id"] # 上游肯定有,不然上游就会报错
  74. video_id = int(video_obj["video_id"])
  75. # 查文章信息
  76. article_info = await self.mapper.fetch_video_source_content(root_source_id)
  77. if not article_info:
  78. await self._trace_log(
  79. event="daily_article_not_found",
  80. video_id=video_id,
  81. root_source_id=root_source_id,
  82. message="未找到文章信息,降级走普通视频链路",
  83. )
  84. return await self.decode_other_video(video_obj=video_obj, dt=dt)
  85. gh_id = article_info[0]["gh_id"]
  86. content_id = article_info[0]["content_id"]
  87. inner_vid = article_info[0]["video_id"]
  88. trace_id = article_info[0]["trace_id"]
  89. if inner_vid != video_id:
  90. await self._trace_log(
  91. event="video_id_mismatch",
  92. video_id=video_id,
  93. inner_video_id=inner_vid,
  94. root_source_id=root_source_id,
  95. message="root_source_id 与 video_id 映射失败,降级走普通视频链路",
  96. )
  97. return await self.decode_other_video(video_obj=video_obj, dt=dt)
  98. # 查匹配小程序信息
  99. match_video_info = await self.mapper.fetch_video_match_result_v1(
  100. gh_id=gh_id, content_id=content_id
  101. )
  102. if not match_video_info:
  103. match_video_info = await self.mapper.fetch_video_match_result_v2(
  104. trace_id=trace_id
  105. )
  106. if not match_video_info:
  107. await self._trace_log(
  108. event="match_video_not_found",
  109. video_id=video_id,
  110. gh_id=gh_id,
  111. content_id=content_id,
  112. content_trace_id=trace_id,
  113. message="未命中匹配视频,降级走普通视频链路",
  114. )
  115. return await self.decode_other_video(video_obj=video_obj, dt=dt)
  116. video_path = self.tool.get_match_video_real_path(match_video_info, video_id)
  117. return await self.save_decode_info(
  118. video_id=video_id,
  119. channel=video_obj["channel"],
  120. hot_scene_type=video_obj["hot_scene_type"],
  121. video_path=video_path,
  122. title=video_obj["title"],
  123. root_source_id=video_obj["root_source_id"],
  124. dt=dt,
  125. )
  126. async def decode_other_video(self, video_obj: Dict, dt: str):
  127. video_id = int(video_obj["video_id"])
  128. video_path = await self.tool.get_pq_video_real_path(video_id=video_id)
  129. await self.save_decode_info(
  130. video_id=video_id,
  131. channel=video_obj["channel"],
  132. hot_scene_type=video_obj["hot_scene_type"],
  133. video_path=video_path,
  134. title=video_obj["title"],
  135. root_source_id=video_obj["root_source_id"],
  136. dt=dt,
  137. )
  138. async def process_single_video(self, video_obj: Dict, dt, trace_id: str):
  139. video_id = int(video_obj["video_id"])
  140. # 判断视频是否解构过,如果有解构结果,那么直接 return
  141. if await self.mapper.is_video_decoded(video_id):
  142. return
  143. hot_scene_type = video_obj.get("hot_scene_type")
  144. try:
  145. if hot_scene_type == self.SceneType.DAILY_ARTICLE:
  146. await self.decode_daily_video(video_obj, dt)
  147. else:
  148. await self.decode_other_video(video_obj, dt)
  149. await self._trace_log(
  150. event="video_processed",
  151. trace_id=trace_id,
  152. video_id=video_id,
  153. hot_scene_type=hot_scene_type,
  154. status="success",
  155. )
  156. except Exception as e:
  157. await self._trace_log(
  158. event="video_process_failed",
  159. trace_id=trace_id,
  160. video_id=video_id,
  161. hot_scene_type=hot_scene_type,
  162. status="fail",
  163. error=str(e),
  164. traceback=traceback.format_exc(),
  165. )
  166. raise
  167. async def deal(self, execute_dt=None):
  168. if not execute_dt:
  169. execute_dt = self.tool.get_yesterday_dt()
  170. trace_id = generate_task_trace_id()
  171. start_time = time.time()
  172. odps_video_list = self.tool.get_top_head_videos(execute_dt=execute_dt)
  173. if not odps_video_list:
  174. await self._trace_log(
  175. event="task_empty",
  176. trace_id=trace_id,
  177. dt=execute_dt,
  178. message="未获取到待解构视频",
  179. )
  180. return
  181. task_list = self.tool.process_odps_data(odps_video_list)
  182. handler = partial(self.process_single_video, dt=execute_dt, trace_id=trace_id)
  183. await self._trace_log(
  184. event="task_start",
  185. trace_id=trace_id,
  186. dt=execute_dt,
  187. total_videos=len(odps_video_list),
  188. processed_videos=len(task_list),
  189. )
  190. result = await run_tasks_with_asyncio_task_group(
  191. task_list=task_list,
  192. handler=handler,
  193. description="解构视频生产",
  194. unit="video",
  195. max_concurrency=10,
  196. )
  197. duration = time.time() - start_time
  198. error_count = len(result["errors"])
  199. total_task = result["total_task"]
  200. fail_rate = error_count / total_task if total_task else 0
  201. await self._trace_log(
  202. event="task_complete",
  203. trace_id=trace_id,
  204. dt=execute_dt,
  205. total_task=total_task,
  206. success_count=result["processed_task"],
  207. error_count=error_count,
  208. fail_rate=fail_rate,
  209. duration_seconds=duration,
  210. avg_time_per_video=duration / total_task if total_task else 0,
  211. )
  212. if error_count:
  213. error_samples = [
  214. f"https://admin.piaoquantv.com/cms/post-detail/{task_obj.get('video_id')}/detail/ "
  215. for idx, task_obj, err in result["errors"][: self.ERROR_SAMPLE_LIMIT]
  216. ]
  217. await feishu_robot.bot(
  218. title="视频解构生产任务异常",
  219. detail={
  220. "dt": execute_dt,
  221. "trace_id": trace_id,
  222. "总任务数": total_task,
  223. "成功数": result["processed_task"],
  224. "失败数": error_count,
  225. "失败率": round(fail_rate, 4),
  226. "错误样例": error_samples,
  227. },
  228. mention=fail_rate >= self.FAIL_RATE_THRESHOLD,
  229. env=self.FEISHU_BOT_ENV,
  230. )