decode_record.py 1.6 KB

123456789101112131415161718192021222324252627282930313233
  1. from typing import Optional
  2. from pydantic import BaseModel, Field
  3. from typing_extensions import Annotated
  4. from utils.general import get_now_ts
  5. from utils.sync_mysql_help import mysql
  6. class DecodeRecord(BaseModel):
  7. table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_record'
  8. task_id: Annotated[str, Field(description='任务ID')]
  9. video_id: Annotated[str, Field(description='视频ID')]
  10. video_url: Annotated[str, Field(description='视频地址')]
  11. task_params: Annotated[Optional[str], Field(description='任务参数', default=None)]
  12. task_status: Annotated[Optional[int], Field(description='任务状态', default=1)] # 1: 进行中, 2: 已完成, 3: 失败
  13. decode_result: Annotated[Optional[str], Field(description='解构结果', default=None)]
  14. script_result: Annotated[Optional[str], Field(description='脚本结果', default=None)]
  15. create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
  16. def save(self):
  17. record = self.model_dump(exclude={'table_name'})
  18. keys = record.keys()
  19. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  20. mysql.execute(sql, tuple([record[key] for key in keys]))
  21. async def async_save(self):
  22. record = self.model_dump(exclude={'table_name'})
  23. keys = record.keys()
  24. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  25. await mysql.execute(sql, tuple([record[key] for key in keys]))