| 12345678910111213141516171819202122232425262728293031323334 |
- from typing import Optional
- from pydantic import BaseModel, Field
- from typing_extensions import Annotated
- from utils.general import get_now_ts
- from utils.sync_mysql_help import mysql
- class DecodeWorkflow(BaseModel):
- table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_workflow'
- task_id: Annotated[str, Field(description='任务ID')]
- video_id: Annotated[Optional[str], Field(description='视频ID', default=None)]
- video_url: Annotated[str, Field(description='视频地址')]
- title: Annotated[Optional[str], Field(description='视频标题', default=None)]
- task_status: Annotated[Optional[int], Field(description='任务状态', default=0)] # 0:待执行 / 1:执行中 / 2:执行成功 3:执行失败
- result: Annotated[Optional[str], Field(description='任务结果')]
- created_at: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
- error_reason: Annotated[Optional[str], Field(description='错误原因', default=None)]
- type: Annotated[Optional[int], Field(description='任务类型', default=1)] # 0:解码任务 / 1:评估任务
- def save(self):
- record = self.model_dump(exclude={'table_name'})
- keys = record.keys()
- sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
- mysql.execute(sql, tuple([record[key] for key in keys]))
- async def async_save(self):
-
- record = self.model_dump(exclude={'table_name'})
- keys = record.keys()
- sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
- await mysql.execute(sql, tuple([record[key] for key in keys]))
|