task.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from typing import Optional
  2. from pydantic import BaseModel, Field
  3. from typing_extensions import Annotated
  4. from utils.sync_mysql_help import mysql
  5. from utils.params import SceneEnum, CapabilityEnum
  6. from utils.general import generate_task_id
  7. class WorkflowTask(BaseModel):
  8. table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_task'
  9. task_id: Annotated[str, Field(description='任务ID')]
  10. root_task_id: Annotated[str, Field(description='任务根节点', default='')]
  11. status: Annotated[int, Field(description='任务状态', default=0)] # 0:待处理 1:执行中 2:成功 3:失败
  12. scene: Annotated[int, Field(description='业务场景')] # 0:选题 1:创作 2:制作
  13. capability: Annotated[int, Field(description='能力类型')] # 0:解构 1:聚类
  14. def save(self):
  15. """保存任务到数据库"""
  16. record = self.model_dump(exclude={'table_name'})
  17. keys = record.keys()
  18. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  19. mysql.execute(sql, tuple([record[key] for key in keys]))
  20. @staticmethod
  21. def create_task(scene: SceneEnum, capability: CapabilityEnum, root_task_id: str = '') -> 'WorkflowTask':
  22. """创建新任务"""
  23. task = WorkflowTask(
  24. task_id=generate_task_id(),
  25. root_task_id=root_task_id,
  26. status=0, # 待处理
  27. scene=scene.value,
  28. capability=capability.value
  29. )
  30. task.save()
  31. return task