| 12345678910111213141516171819202122232425262728293031323334353637 |
- from typing import Optional
- from pydantic import BaseModel, Field
- from typing_extensions import Annotated
- from utils.sync_mysql_help import mysql
- from utils.params import SceneEnum, CapabilityEnum
- from utils.general import generate_task_id
- class WorkflowTask(BaseModel):
- table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_task'
- task_id: Annotated[str, Field(description='任务ID')]
- root_task_id: Annotated[str, Field(description='任务根节点', default='')]
- status: Annotated[int, Field(description='任务状态', default=0)] # 0:待处理 1:执行中 2:成功 3:失败
- scene: Annotated[int, Field(description='业务场景')] # 0:选题 1:创作 2:制作
- capability: Annotated[int, Field(description='能力类型')] # 0:解构 1:聚类
- def save(self):
- """保存任务到数据库"""
- record = self.model_dump(exclude={'table_name'})
- keys = record.keys()
- sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
- mysql.execute(sql, tuple([record[key] for key in keys]))
- @staticmethod
- def create_task(scene: SceneEnum, capability: CapabilityEnum, root_task_id: str = '') -> 'WorkflowTask':
- """创建新任务"""
- task = WorkflowTask(
- task_id=generate_task_id(),
- root_task_id=root_task_id,
- status=0, # 待处理
- scene=scene.value,
- capability=capability.value
- )
- task.save()
- return task
|