video_item.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import time
  2. import uuid
  3. from typing import Optional, Union
  4. from pydantic import BaseModel, Field, validator
  5. from services.clean_title import clean_title
  6. class VideoItem(BaseModel):
  7. """
  8. 视频数据结构,支持字段校验和预处理逻辑
  9. - 字段初始化后可通过 `prepare()` 异步方法补全和清洗数据
  10. - 使用 `produce_item()` 返回最终有效数据 dict
  11. """
  12. video_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()))
  13. user_id: str
  14. user_name: str
  15. out_video_id: str
  16. out_user_id: Optional[str]
  17. video_url: str
  18. cover_url: str
  19. platform: str
  20. strategy: str
  21. session: Optional[str]
  22. video_title: Optional[str]
  23. publish_time_stamp: Optional[Union[int, str]] = None
  24. update_time_stamp: Optional[Union[int, str]] = None
  25. duration: int = 0
  26. play_cnt: int = 0
  27. like_cnt: int = 0
  28. comment_cnt: int = 0
  29. share_cnt: int = 0
  30. width: int = 0
  31. height: int = 0
  32. publish_time_str: Optional[str] = None
  33. publish_time: Optional[str] = None
  34. # 添加验证器确保数值字段非负
  35. @validator('duration', 'play_cnt', 'like_cnt', 'comment_cnt', 'share_cnt', 'width', 'height')
  36. def validate_non_negative(cls, v, field):
  37. if v < 0:
  38. raise ValueError(f'{field.name} must be non-negative')
  39. return v
  40. @validator('video_url', 'cover_url')
  41. def validate_url(cls, v, field):
  42. if v and not (v.startswith('http://') or v.startswith('https://')):
  43. raise ValueError(f'{field.name} must be a valid URL')
  44. return v
  45. async def prepare(self):
  46. """
  47. 异步预处理:清洗标题、补全发布时间和更新时间
  48. """
  49. # 标题清洗
  50. if self.video_title:
  51. self.video_title = await clean_title(self.video_title)
  52. # 发布时间处理
  53. if self.publish_time_stamp:
  54. # 确保publish_time_stamp是整数类型
  55. if isinstance(self.publish_time_stamp, str):
  56. try:
  57. if len(self.publish_time_stamp) == 13:
  58. self.publish_time_stamp = int(self.publish_time_stamp) // 1000
  59. else:
  60. self.publish_time_stamp = int(self.publish_time_stamp)
  61. except ValueError:
  62. self.publish_time_stamp = int(time.time())
  63. elif isinstance(self.publish_time_stamp, int) and len(str(self.publish_time_stamp)) == 13:
  64. self.publish_time_stamp = self.publish_time_stamp // 1000
  65. else:
  66. self.publish_time_stamp = int(time.time())
  67. self.publish_time_str = time.strftime(
  68. "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp)
  69. )
  70. self.publish_time = self.publish_time_str
  71. # 更新时间戳默认当前时间
  72. if not self.update_time_stamp:
  73. self.update_time_stamp = int(time.time())
  74. else:
  75. # 确保update_time_stamp是整数类型
  76. if isinstance(self.update_time_stamp, str):
  77. try:
  78. if len(self.update_time_stamp) == 13:
  79. self.update_time_stamp = int(self.update_time_stamp) // 1000
  80. else:
  81. self.update_time_stamp = int(self.update_time_stamp)
  82. except ValueError:
  83. self.update_time_stamp = int(time.time())
  84. elif isinstance(self.update_time_stamp, int) and len(str(self.update_time_stamp)) == 13:
  85. self.update_time_stamp = self.update_time_stamp // 1000
  86. if not self.session:
  87. self.session = str(f"{self.platform}_{int(time.time())}")
  88. async def produce_item(self) -> Optional[dict]:
  89. """
  90. 异步生成最终数据字典,校验必要字段是否存在,返回 None 则不合格
  91. """
  92. await self.prepare()
  93. must_fields = [
  94. "video_id", "user_id", "user_name", "out_video_id", "session",
  95. "video_url", "cover_url", "platform", "strategy"
  96. ]
  97. for f in must_fields:
  98. if not getattr(self, f, None):
  99. return None
  100. return self.dict()