123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- # core/video_processor.py
- import time
- import traceback
- from typing import Dict, Optional, Any
- from core.models.video_item import VideoItem
- from core.utils.extractors import extract_fields
- from core.utils.helpers import generate_titles
- from core.utils.log.logger_manager import LoggerManager
- from services.pipeline import PiaoQuanPipeline
- import uuid
- class VideoProcessor:
- """
- 视频处理器:
- 1、字段统一
- 2、数据过滤
- 3、视频处理(标题生成)
- """
- def __init__(self, platform: str, mode: str, field_map: Dict,
- feishu_sheetid: str = None, logger=None, aliyun_log=None):
- self.platform = platform
- self.mode = mode
- self.field_map = field_map
- self.feishu_sheetid = feishu_sheetid
- self.logger = logger or LoggerManager.get_logger(platform=platform, mode=mode)
- self.aliyun_log = aliyun_log or LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
- async def process_single_video(self, raw_data: Dict, user_info: Dict,
- rule_dict: Dict, env: str = "prod") -> Optional[Dict]:
- """
- 完整的单条视频处理流程
- """
- try:
- # 1. 提取字段并创建视频对象
- video_obj = await self._create_video_item(raw_data, user_info)
- if not video_obj:
- return None
- # 2. 数据过滤
- if not await self._filter_video(video_obj, rule_dict, env):
- return None
- # 3. 集成视频处理(标题生成等)
- processed_video = await self._integrated_handling(video_obj)
- if not processed_video:
- return None
- return processed_video
- except Exception as e:
- self.logger.error(f"视频处理失败: {e}")
- return None
- async def _create_video_item(self, raw_data: Dict, user_info: Dict) -> Optional[Dict]:
- """创建视频数据对象"""
- try:
- # 提取字段
- item_kwargs = extract_fields(raw_data, self.field_map,
- logger=self.logger,
- aliyun_log=self.aliyun_log)
- if not item_kwargs:
- self.logger.error("字段提取失败")
- return None
- # 确保必需的用户信息存在
- user_id = user_info.get("uid")
- user_name = user_info.get("nick_name")
-
- if not user_id:
- self.logger.error("用户ID为空")
- return None
- # 添加平台相关信息
- item_kwargs.update({
- "user_id": str(user_id),
- "user_name": user_name,
- "platform": self.platform,
- "strategy": self.mode,
- "session": f"{self.platform}_{int(time.time())}" # 确保session字段存在
- })
- # 创建视频项
- video_item = VideoItem(**item_kwargs)
- return await video_item.produce_item()
- except Exception as e:
- self.logger.error(f"创建视频项失败: {e}")
- return None
- async def _filter_video(self, video: Dict, rule_dict: Dict, env: str) -> bool:
- """过滤视频数据"""
- try:
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=rule_dict,
- env=env,
- item=video,
- trace_id=f"{self.platform}_{uuid.uuid1()}"
- )
- return await pipeline.process_item()
- except Exception as e:
- self.logger.error(f"过滤视频失败: {e}")
- return False
- async def _integrated_handling(self, video: Dict) -> Optional[Dict]:
- """
- 集成视频处理 - 标题生成等业务逻辑
- 返回处理后的视频数据,如果处理失败返回None
- """
- try:
- if self.feishu_sheetid:
- # 调用标题生成逻辑
- processed_video = await generate_titles(
- self.feishu_sheetid,
- video,
- self.logger,
- self.aliyun_log
- )
- if processed_video:
- self.logger.info(f"视频标题处理完成: {video}")
- return processed_video
- else:
- self.logger.warning("标题生成失败,使用原始视频数据")
- return video
- else:
- self.logger.debug("未配置飞书sheetid,跳过标题生成")
- return video
- except Exception as e:
- tb = traceback.format_exc()
- self.logger.error(f"集成视频处理失败: {e} \n {tb}")
- # 处理失败时返回原始数据,不中断流程
- return video
|