video_processor.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # core/video_processor.py
  2. import time
  3. import traceback
  4. from typing import Dict, Optional, Any
  5. from core.models.video_item import VideoItem
  6. from core.utils.extractors import extract_fields
  7. from core.utils.helpers import generate_titles
  8. from core.utils.log.logger_manager import LoggerManager
  9. from services.pipeline import PiaoQuanPipeline
  10. import uuid
  11. class VideoProcessor:
  12. """
  13. 视频处理器:
  14. 1、字段统一
  15. 2、数据过滤
  16. 3、视频处理(标题生成)
  17. """
  18. def __init__(self, platform: str, mode: str, field_map: Dict,
  19. feishu_sheetid: str = None, logger=None, aliyun_log=None):
  20. self.platform = platform
  21. self.mode = mode
  22. self.field_map = field_map
  23. self.feishu_sheetid = feishu_sheetid
  24. self.logger = logger or LoggerManager.get_logger(platform=platform, mode=mode)
  25. self.aliyun_log = aliyun_log or LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  26. async def process_single_video(self, raw_data: Dict, user_info: Dict,
  27. rule_dict: Dict, env: str = "prod") -> Optional[Dict]:
  28. """
  29. 完整的单条视频处理流程
  30. """
  31. try:
  32. # 1. 提取字段并创建视频对象
  33. video_obj = await self._create_video_item(raw_data, user_info)
  34. if not video_obj:
  35. return None
  36. # 2. 数据过滤
  37. if not await self._filter_video(video_obj, rule_dict, env):
  38. return None
  39. # 3. 集成视频处理(标题生成等)
  40. processed_video = await self._integrated_handling(video_obj)
  41. if not processed_video:
  42. return None
  43. return processed_video
  44. except Exception as e:
  45. self.logger.error(f"视频处理失败: {e}")
  46. return None
  47. async def _create_video_item(self, raw_data: Dict, user_info: Dict) -> Optional[Dict]:
  48. """创建视频数据对象"""
  49. try:
  50. # 提取字段
  51. item_kwargs = extract_fields(raw_data, self.field_map,
  52. logger=self.logger,
  53. aliyun_log=self.aliyun_log)
  54. if not item_kwargs:
  55. self.logger.error("字段提取失败")
  56. return None
  57. # 确保必需的用户信息存在
  58. user_id = user_info.get("uid")
  59. user_name = user_info.get("nick_name")
  60. if not user_id:
  61. self.logger.error("用户ID为空")
  62. return None
  63. # 添加平台相关信息
  64. item_kwargs.update({
  65. "user_id": str(user_id),
  66. "user_name": user_name,
  67. "platform": self.platform,
  68. "strategy": self.mode,
  69. "session": f"{self.platform}_{int(time.time())}" # 确保session字段存在
  70. })
  71. # 创建视频项
  72. video_item = VideoItem(**item_kwargs)
  73. return await video_item.produce_item()
  74. except Exception as e:
  75. self.logger.error(f"创建视频项失败: {e}")
  76. return None
  77. async def _filter_video(self, video: Dict, rule_dict: Dict, env: str) -> bool:
  78. """过滤视频数据"""
  79. try:
  80. pipeline = PiaoQuanPipeline(
  81. platform=self.platform,
  82. mode=self.mode,
  83. rule_dict=rule_dict,
  84. env=env,
  85. item=video,
  86. trace_id=f"{self.platform}_{uuid.uuid1()}"
  87. )
  88. return await pipeline.process_item()
  89. except Exception as e:
  90. self.logger.error(f"过滤视频失败: {e}")
  91. return False
  92. async def _integrated_handling(self, video: Dict) -> Optional[Dict]:
  93. """
  94. 集成视频处理 - 标题生成等业务逻辑
  95. 返回处理后的视频数据,如果处理失败返回None
  96. """
  97. try:
  98. if self.feishu_sheetid:
  99. # 调用标题生成逻辑
  100. processed_video = await generate_titles(
  101. self.feishu_sheetid,
  102. video,
  103. self.logger,
  104. self.aliyun_log
  105. )
  106. if processed_video:
  107. self.logger.info(f"视频标题处理完成: {video}")
  108. return processed_video
  109. else:
  110. self.logger.warning("标题生成失败,使用原始视频数据")
  111. return video
  112. else:
  113. self.logger.debug("未配置飞书sheetid,跳过标题生成")
  114. return video
  115. except Exception as e:
  116. tb = traceback.format_exc()
  117. self.logger.error(f"集成视频处理失败: {e} \n {tb}")
  118. # 处理失败时返回原始数据,不中断流程
  119. return video