1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import time
- import uuid
- from typing import Optional
- from pydantic import BaseModel, Field
- from services import clean_title
- class VideoItem(BaseModel):
- """
- 视频数据结构,支持字段校验和预处理逻辑
- - 字段初始化后可通过 `prepare()` 异步方法补全和清洗数据
- - 使用 `produce_item()` 返回最终有效数据 dict
- """
- video_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()))
- user_id: str
- user_name: str
- out_video_id: str
- out_user_id: Optional[str]
- video_url: str
- cover_url: str
- video_title: str
- platform: str
- strategy: str
- session: Optional[str]
- publish_time_stamp: Optional[int] = None
- update_time_stamp: Optional[int] = None
- duration: int = 0
- play_cnt: int = 0
- like_cnt: int = 0
- comment_cnt: int = 0
- share_cnt: int = 0
- width: int = 0
- height: int = 0
- publish_time_str: Optional[str] = None
- publish_time: Optional[str] = None
- async def prepare(self):
- """
- 异步预处理:清洗标题、补全发布时间和更新时间
- """
- # 标题清洗
- self.video_title = await clean_title(self.video_title)
- # 发布时间处理
- if not self.publish_time_stamp:
- self.publish_time_stamp = int(time.time())
- self.publish_time_str = time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp)
- )
- self.publish_time = self.publish_time_str
- # 更新时间戳默认当前时间
- if not self.update_time_stamp:
- self.update_time_stamp = int(time.time())
- if not self.session:
- self.session = str(f"{self.platform}_{int(time.time())}")
- async def produce_item(self) -> Optional[dict]:
- """
- 异步生成最终数据字典,校验必要字段是否存在,返回 None 则不合格
- """
- await self.prepare()
- must_fields = [
- "video_id", "user_id", "user_name", "out_video_id", "session",
- "video_url", "cover_url", "classname", "strategy"
- ]
- for f in must_fields:
- if not getattr(self, f, None):
- return False
- return self.dict()
|