from typing import Optional from pydantic import BaseModel, Field from typing_extensions import Annotated from utils.sync_mysql_help import mysql from utils.params import SceneEnum, CapabilityEnum from utils.general import generate_task_id class WorkflowTask(BaseModel): table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_task' task_id: Annotated[str, Field(description='任务ID')] root_task_id: Annotated[str, Field(description='任务根节点', default='')] status: Annotated[int, Field(description='任务状态', default=0)] # 0:待处理 1:执行中 2:成功 3:失败 scene: Annotated[int, Field(description='业务场景')] # 0:选题 1:创作 2:制作 capability: Annotated[int, Field(description='能力类型')] # 0:解构 1:聚类 def save(self): """保存任务到数据库""" record = self.model_dump(exclude={'table_name'}) keys = record.keys() sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})''' mysql.execute(sql, tuple([record[key] for key in keys])) @staticmethod def create_task(scene: SceneEnum, capability: CapabilityEnum, root_task_id: str = '') -> 'WorkflowTask': """创建新任务""" task = WorkflowTask( task_id=generate_task_id(), root_task_id=root_task_id, status=0, # 待处理 scene=scene.value, capability=capability.value ) task.save() return task