video_item.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # video_item.py 修改后
  2. import time
  3. import uuid
  4. from typing import Optional, Union
  5. from pydantic import BaseModel, Field, field_validator, ConfigDict
  6. from services.clean_title import clean_title
  7. class VideoItem(BaseModel):
  8. # Pydantic 2.x 配置
  9. model_config = ConfigDict(arbitrary_types_allowed=True)
  10. video_id: Optional[Union[int, str]] = Field(default_factory=lambda: str(uuid.uuid4()))
  11. user_id: Union[int, str]
  12. user_name: str
  13. out_video_id: Union[int, str]
  14. out_user_id: Optional[Union[int, str]]
  15. video_url: str
  16. cover_url: str
  17. platform: str
  18. strategy: str
  19. session: Optional[str] = None
  20. video_title: Optional[str] = None
  21. publish_time_stamp: Optional[Union[int, str]] = None
  22. update_time_stamp: Optional[Union[int, str]] = None
  23. duration: int = 0
  24. play_cnt: int = 0
  25. like_cnt: int = 0
  26. comment_cnt: int = 0
  27. share_cnt: int = 0
  28. width: int = 0
  29. height: int = 0
  30. publish_time_str: Optional[str] = None
  31. publish_time: Optional[str] = None
  32. # Pydantic 2.x 验证器语法
  33. @field_validator('duration', 'play_cnt', 'like_cnt', 'comment_cnt', 'share_cnt', 'width', 'height')
  34. @classmethod
  35. def validate_non_negative(cls, v: int) -> int:
  36. if v < 0:
  37. raise ValueError('Value must be non-negative')
  38. return v
  39. @field_validator('video_url', 'cover_url')
  40. @classmethod
  41. def validate_url(cls, v: str) -> str:
  42. if v and not (v.startswith('http://') or v.startswith('https://')):
  43. raise ValueError('URL must start with http:// or https://')
  44. return v
  45. async def prepare(self):
  46. """异步预处理(保持不变)"""
  47. if self.video_title:
  48. self.video_title = await clean_title(self.video_title)
  49. if self.publish_time_stamp:
  50. if isinstance(self.publish_time_stamp, str):
  51. try:
  52. if len(self.publish_time_stamp) == 13:
  53. self.publish_time_stamp = int(self.publish_time_stamp) // 1000
  54. else:
  55. self.publish_time_stamp = int(self.publish_time_stamp)
  56. except ValueError:
  57. self.publish_time_stamp = int(time.time())
  58. elif isinstance(self.publish_time_stamp, int) and len(str(self.publish_time_stamp)) == 13:
  59. self.publish_time_stamp = self.publish_time_stamp // 1000
  60. else:
  61. self.publish_time_stamp = int(time.time())
  62. self.publish_time_str = time.strftime(
  63. "%Y-%m-%d %H:%M:%S", time.localtime(self.publish_time_stamp)
  64. )
  65. self.publish_time = self.publish_time_str
  66. if not self.update_time_stamp:
  67. self.update_time_stamp = int(time.time())
  68. else:
  69. if isinstance(self.update_time_stamp, str):
  70. try:
  71. if len(self.update_time_stamp) == 13:
  72. self.update_time_stamp = int(self.update_time_stamp) // 1000
  73. else:
  74. self.update_time_stamp = int(self.update_time_stamp)
  75. except ValueError:
  76. self.update_time_stamp = int(time.time())
  77. elif isinstance(self.update_time_stamp, int) and len(str(self.update_time_stamp)) == 13:
  78. self.update_time_stamp = self.update_time_stamp // 1000
  79. if not self.session:
  80. self.session = str(f"{self.platform}_{int(time.time())}")
  81. async def produce_item(self) -> Optional[dict]:
  82. """异步生成最终数据字典"""
  83. await self.prepare()
  84. must_fields = [
  85. "video_id", "user_id", "user_name", "out_video_id", "session",
  86. "video_url", "cover_url", "platform", "strategy"
  87. ]
  88. for f in must_fields:
  89. if not getattr(self, f, None):
  90. return None
  91. # Pydantic 2.x: 使用 model_dump() 替代 dict()
  92. return self.model_dump()