import time import uuid from typing import Optional, Union from pydantic import BaseModel, Field, validator from services.clean_title import clean_title class VideoItem(BaseModel): """ 视频数据结构,支持字段校验和预处理逻辑 - 字段初始化后可通过 `prepare()` 异步方法补全和清洗数据 - 使用 `produce_item()` 返回最终有效数据 dict """ video_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4())) user_id: str user_name: str out_video_id: str out_user_id: Optional[str] video_url: str cover_url: str platform: str strategy: str session: Optional[str] video_title: Optional[str] publish_time_stamp: Optional[Union[int, str]] = None update_time_stamp: Optional[Union[int, str]] = None duration: int = 0 play_cnt: int = 0 like_cnt: int = 0 comment_cnt: int = 0 share_cnt: int = 0 width: int = 0 height: int = 0 publish_time_str: Optional[str] = None publish_time: Optional[str] = None # 添加验证器确保数值字段非负 @validator('duration', 'play_cnt', 'like_cnt', 'comment_cnt', 'share_cnt', 'width', 'height') def validate_non_negative(cls, v, field): if v < 0: raise ValueError(f'{field.name} must be non-negative') return v @validator('video_url', 'cover_url') def validate_url(cls, v, field): if v and not (v.startswith('http://') or v.startswith('https://')): raise ValueError(f'{field.name} must be a valid URL') return v async def prepare(self): """ 异步预处理:清洗标题、补全发布时间和更新时间 """ # 标题清洗 if self.video_title: self.video_title = await clean_title(self.video_title) # 发布时间处理 if self.publish_time_stamp: # 确保publish_time_stamp是整数类型 if isinstance(self.publish_time_stamp, str): try: if len(self.publish_time_stamp) == 13: self.publish_time_stamp = int(self.publish_time_stamp) // 1000 else: self.publish_time_stamp = int(self.publish_time_stamp) except ValueError: self.publish_time_stamp = int(time.time()) elif isinstance(self.publish_time_stamp, int) and len(str(self.publish_time_stamp)) == 13: self.publish_time_stamp = self.publish_time_stamp // 1000 else: self.publish_time_stamp = int(time.time()) self.publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp) ) self.publish_time = self.publish_time_str # 更新时间戳默认当前时间 if not self.update_time_stamp: self.update_time_stamp = int(time.time()) else: # 确保update_time_stamp是整数类型 if isinstance(self.update_time_stamp, str): try: if len(self.update_time_stamp) == 13: self.update_time_stamp = int(self.update_time_stamp) // 1000 else: self.update_time_stamp = int(self.update_time_stamp) except ValueError: self.update_time_stamp = int(time.time()) elif isinstance(self.update_time_stamp, int) and len(str(self.update_time_stamp)) == 13: self.update_time_stamp = self.update_time_stamp // 1000 if not self.session: self.session = str(f"{self.platform}_{int(time.time())}") async def produce_item(self) -> Optional[dict]: """ 异步生成最终数据字典,校验必要字段是否存在,返回 None 则不合格 """ await self.prepare() must_fields = [ "video_id", "user_id", "user_name", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy" ] for f in must_fields: if not getattr(self, f, None): return None return self.dict()