# video_item.py 修改后 import time import uuid from typing import Optional, Union from pydantic import BaseModel, Field, field_validator, ConfigDict from services.clean_title import clean_title class VideoItem(BaseModel): # Pydantic 2.x 配置 model_config = ConfigDict(arbitrary_types_allowed=True) video_id: Optional[Union[int, str]] = Field(default_factory=lambda: str(uuid.uuid4())) user_id: Union[int, str] user_name: str out_video_id: Union[int, str] out_user_id: Optional[Union[int, str]] video_url: str cover_url: str platform: str strategy: str session: Optional[str] = None video_title: Optional[str] = None publish_time_stamp: Optional[Union[int, str]] = None update_time_stamp: Optional[Union[int, str]] = None duration: int = 0 play_cnt: int = 0 like_cnt: int = 0 comment_cnt: int = 0 share_cnt: int = 0 width: int = 0 height: int = 0 publish_time_str: Optional[str] = None publish_time: Optional[str] = None # Pydantic 2.x 验证器语法 @field_validator('duration', 'play_cnt', 'like_cnt', 'comment_cnt', 'share_cnt', 'width', 'height') @classmethod def validate_non_negative(cls, v: int) -> int: if v < 0: raise ValueError('Value must be non-negative') return v @field_validator('video_url', 'cover_url') @classmethod def validate_url(cls, v: str) -> str: if v and not (v.startswith('http://') or v.startswith('https://')): raise ValueError('URL must start with http:// or https://') return v async def prepare(self): """异步预处理(保持不变)""" if self.video_title: self.video_title = await clean_title(self.video_title) if self.publish_time_stamp: if isinstance(self.publish_time_stamp, str): try: if len(self.publish_time_stamp) == 13: self.publish_time_stamp = int(self.publish_time_stamp) // 1000 else: self.publish_time_stamp = int(self.publish_time_stamp) except ValueError: self.publish_time_stamp = int(time.time()) elif isinstance(self.publish_time_stamp, int) and len(str(self.publish_time_stamp)) == 13: self.publish_time_stamp = self.publish_time_stamp // 1000 else: self.publish_time_stamp = int(time.time()) self.publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp) ) self.publish_time = self.publish_time_str if not self.update_time_stamp: self.update_time_stamp = int(time.time()) else: if isinstance(self.update_time_stamp, str): try: if len(self.update_time_stamp) == 13: self.update_time_stamp = int(self.update_time_stamp) // 1000 else: self.update_time_stamp = int(self.update_time_stamp) except ValueError: self.update_time_stamp = int(time.time()) elif isinstance(self.update_time_stamp, int) and len(str(self.update_time_stamp)) == 13: self.update_time_stamp = self.update_time_stamp // 1000 if not self.session: self.session = str(f"{self.platform}_{int(time.time())}") async def produce_item(self) -> Optional[dict]: """异步生成最终数据字典""" await self.prepare() must_fields = [ "video_id", "user_id", "user_name", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy" ] for f in must_fields: if not getattr(self, f, None): return None # Pydantic 2.x: 使用 model_dump() 替代 dict() return self.model_dump()