123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- # video_item.py 修改后
- import time
- import uuid
- from typing import Optional, Union
- from pydantic import BaseModel, Field, field_validator, ConfigDict
- from services.clean_title import clean_title
- class VideoItem(BaseModel):
- # Pydantic 2.x 配置
- model_config = ConfigDict(arbitrary_types_allowed=True)
- video_id: Optional[Union[int, str]] = Field(default_factory=lambda: str(uuid.uuid4()))
- user_id: Union[int, str]
- user_name: str
- out_video_id: Union[int, str]
- out_user_id: Optional[Union[int, str]]
- video_url: str
- cover_url: str
- platform: str
- strategy: str
- session: Optional[str] = None
- video_title: Optional[str] = None
- publish_time_stamp: Optional[Union[int, str]] = None
- update_time_stamp: Optional[Union[int, str]] = None
- duration: int = 0
- play_cnt: int = 0
- like_cnt: int = 0
- comment_cnt: int = 0
- share_cnt: int = 0
- width: int = 0
- height: int = 0
- publish_time_str: Optional[str] = None
- publish_time: Optional[str] = None
- # Pydantic 2.x 验证器语法
- @field_validator('duration', 'play_cnt', 'like_cnt', 'comment_cnt', 'share_cnt', 'width', 'height')
- @classmethod
- def validate_non_negative(cls, v: int) -> int:
- if v < 0:
- raise ValueError('Value must be non-negative')
- return v
- @field_validator('video_url', 'cover_url')
- @classmethod
- def validate_url(cls, v: str) -> str:
- if v and not (v.startswith('http://') or v.startswith('https://')):
- raise ValueError('URL must start with http:// or https://')
- return v
- async def prepare(self):
- """异步预处理(保持不变)"""
- if self.video_title:
- self.video_title = await clean_title(self.video_title)
- if self.publish_time_stamp:
- if isinstance(self.publish_time_stamp, str):
- try:
- if len(self.publish_time_stamp) == 13:
- self.publish_time_stamp = int(self.publish_time_stamp) // 1000
- else:
- self.publish_time_stamp = int(self.publish_time_stamp)
- except ValueError:
- self.publish_time_stamp = int(time.time())
- elif isinstance(self.publish_time_stamp, int) and len(str(self.publish_time_stamp)) == 13:
- self.publish_time_stamp = self.publish_time_stamp // 1000
- else:
- self.publish_time_stamp = int(time.time())
- self.publish_time_str = time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp)
- )
- self.publish_time = self.publish_time_str
- if not self.update_time_stamp:
- self.update_time_stamp = int(time.time())
- else:
- if isinstance(self.update_time_stamp, str):
- try:
- if len(self.update_time_stamp) == 13:
- self.update_time_stamp = int(self.update_time_stamp) // 1000
- else:
- self.update_time_stamp = int(self.update_time_stamp)
- except ValueError:
- self.update_time_stamp = int(time.time())
- elif isinstance(self.update_time_stamp, int) and len(str(self.update_time_stamp)) == 13:
- self.update_time_stamp = self.update_time_stamp // 1000
- if not self.session:
- self.session = str(f"{self.platform}_{int(time.time())}")
- async def produce_item(self) -> Optional[dict]:
- """异步生成最终数据字典"""
- await self.prepare()
- must_fields = [
- "video_id", "user_id", "user_name", "out_video_id", "session",
- "video_url", "cover_url", "platform", "strategy"
- ]
- for f in must_fields:
- if not getattr(self, f, None):
- return None
- # Pydantic 2.x: 使用 model_dump() 替代 dict()
- return self.model_dump()
|