task.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. from typing import Optional
  2. from pydantic import BaseModel, Field
  3. from typing_extensions import Annotated
  4. from utils.sync_mysql_help import mysql
  5. from utils.params import SceneEnum, CapabilityEnum, ContentTypeEnum
  6. from utils.general import generate_task_id
  7. class WorkflowTask(BaseModel):
  8. table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_task'
  9. task_id: Annotated[str, Field(description='任务ID')]
  10. root_task_id: Annotated[str, Field(description='任务根节点', default='')]
  11. status: Annotated[int, Field(description='任务状态', default=0)] # 0:待处理 1:执行中 2:成功 3:失败
  12. scene: Annotated[int, Field(description='业务场景')] # 0:选题 1:创作 2:制作
  13. capability: Annotated[int, Field(description='能力类型')] # 0:解构 1:聚类
  14. content_type: Annotated[int, Field(description='内容类型')] # 1:文本 2:图片 3:视频
  15. def save(self):
  16. """保存任务到数据库"""
  17. record = self.model_dump(exclude={'table_name'})
  18. keys = record.keys()
  19. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  20. mysql.execute(sql, tuple([record[key] for key in keys]))
  21. @staticmethod
  22. def create_task(scene: SceneEnum, capability: CapabilityEnum, content_type: ContentTypeEnum, root_task_id: str = '') -> 'WorkflowTask':
  23. """创建新任务"""
  24. task = WorkflowTask(
  25. task_id=generate_task_id(),
  26. root_task_id=root_task_id,
  27. status=0, # 待处理
  28. scene=scene.value,
  29. capability=capability.value,
  30. content_type=content_type.value
  31. )
  32. task.save()
  33. return task