# core/video_processor.py import time import traceback from typing import Dict, Optional, Any from core.models.video_item import VideoItem from core.utils.extractors import extract_fields from core.utils.helpers import generate_titles from core.utils.log.logger_manager import LoggerManager from services.pipeline import PiaoQuanPipeline import uuid class VideoProcessor: """ 视频处理器: 1、字段统一 2、数据过滤 3、视频处理(标题生成) """ def __init__(self, platform: str, mode: str, field_map: Dict, feishu_sheetid: str = None, logger=None, aliyun_log=None): self.platform = platform self.mode = mode self.field_map = field_map self.feishu_sheetid = feishu_sheetid self.logger = logger or LoggerManager.get_logger(platform=platform, mode=mode) self.aliyun_log = aliyun_log or LoggerManager.get_aliyun_logger(platform=platform, mode=mode) async def process_single_video(self, raw_data: Dict, user_info: Dict, rule_dict: Dict, env: str = "prod") -> Optional[Dict]: """ 完整的单条视频处理流程 """ try: # 1. 提取字段并创建视频对象 video_obj = await self._create_video_item(raw_data, user_info) if not video_obj: return None # 2. 数据过滤 if not await self._filter_video(video_obj, rule_dict, env): return None # 3. 集成视频处理(标题生成等) processed_video = await self._integrated_handling(video_obj) if not processed_video: return None return processed_video except Exception as e: self.logger.error(f"视频处理失败: {e}") return None async def _create_video_item(self, raw_data: Dict, user_info: Dict) -> Optional[Dict]: """创建视频数据对象""" try: # 提取字段 item_kwargs = extract_fields(raw_data, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log) if not item_kwargs: self.logger.error("字段提取失败") return None # 确保必需的用户信息存在 user_id = user_info.get("uid") user_name = user_info.get("nick_name") if not user_id: self.logger.error("用户ID为空") return None # 添加平台相关信息 item_kwargs.update({ "user_id": str(user_id), "user_name": user_name, "platform": self.platform, "strategy": self.mode, "session": f"{self.platform}_{int(time.time())}" # 确保session字段存在 }) # 创建视频项 video_item = VideoItem(**item_kwargs) return await video_item.produce_item() except Exception as e: self.logger.error(f"创建视频项失败: {e}") return None async def _filter_video(self, video: Dict, rule_dict: Dict, env: str) -> bool: """过滤视频数据""" try: pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=rule_dict, env=env, item=video, trace_id=f"{self.platform}_{uuid.uuid1()}" ) return await pipeline.process_item() except Exception as e: self.logger.error(f"过滤视频失败: {e}") return False async def _integrated_handling(self, video: Dict) -> Optional[Dict]: """ 集成视频处理 - 标题生成等业务逻辑 返回处理后的视频数据,如果处理失败返回None """ try: if self.feishu_sheetid: # 调用标题生成逻辑 processed_video = await generate_titles( self.feishu_sheetid, video, self.logger, self.aliyun_log ) if processed_video: self.logger.info(f"视频标题处理完成: {video}") return processed_video else: self.logger.warning("标题生成失败,使用原始视频数据") return video else: self.logger.debug("未配置飞书sheetid,跳过标题生成") return video except Exception as e: tb = traceback.format_exc() self.logger.error(f"集成视频处理失败: {e} \n {tb}") # 处理失败时返回原始数据,不中断流程 return video