video_item.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import time
  2. import uuid
  3. from typing import Optional
  4. from pydantic import BaseModel, Field
  5. from services import clean_title
  6. class VideoItem(BaseModel):
  7. """
  8. 视频数据结构,支持字段校验和预处理逻辑
  9. - 字段初始化后可通过 `prepare()` 异步方法补全和清洗数据
  10. - 使用 `produce_item()` 返回最终有效数据 dict
  11. """
  12. video_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()))
  13. user_id: str
  14. user_name: str
  15. out_video_id: str
  16. out_user_id: Optional[str]
  17. video_url: str
  18. cover_url: str
  19. video_title: str
  20. platform: str
  21. strategy: str
  22. session: Optional[str]
  23. publish_time_stamp: Optional[int] = None
  24. update_time_stamp: Optional[int] = None
  25. duration: int = 0
  26. play_cnt: int = 0
  27. like_cnt: int = 0
  28. comment_cnt: int = 0
  29. share_cnt: int = 0
  30. width: int = 0
  31. height: int = 0
  32. publish_time_str: Optional[str] = None
  33. publish_time: Optional[str] = None
  34. async def prepare(self):
  35. """
  36. 异步预处理:清洗标题、补全发布时间和更新时间
  37. """
  38. # 标题清洗
  39. self.video_title = await clean_title(self.video_title)
  40. # 发布时间处理
  41. if not self.publish_time_stamp:
  42. self.publish_time_stamp = int(time.time())
  43. self.publish_time_str = time.strftime(
  44. "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp)
  45. )
  46. self.publish_time = self.publish_time_str
  47. # 更新时间戳默认当前时间
  48. if not self.update_time_stamp:
  49. self.update_time_stamp = int(time.time())
  50. if not self.session:
  51. self.session = str(f"{self.platform}_{int(time.time())}")
  52. async def produce_item(self) -> Optional[dict]:
  53. """
  54. 异步生成最终数据字典,校验必要字段是否存在,返回 None 则不合格
  55. """
  56. await self.prepare()
  57. must_fields = [
  58. "video_id", "user_id", "user_name", "out_video_id", "session",
  59. "video_url", "cover_url", "classname", "strategy"
  60. ]
  61. for f in must_fields:
  62. if not getattr(self, f, None):
  63. return False
  64. return self.dict()