| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- from typing import Optional, List, TYPE_CHECKING
- from pydantic import BaseModel, Field
- from typing_extensions import Annotated
- from utils.sync_mysql_help import mysql
- if TYPE_CHECKING:
- from utils.params import ContentParam
- class WorkflowDecodeTaskResult(BaseModel):
- table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_decode_task_result'
- task_id: Annotated[str, Field(description='任务ID')]
- channel_content_id: Annotated[str, Field(description='内容ID')]
- images: Annotated[str, Field(description='内容url', default='')]
- title: Annotated[str, Field(description='内容title', default='')]
- error_code: Annotated[Optional[str], Field(description='失败码', default=None)]
- error_message: Annotated[Optional[str], Field(description='失败信息', default=None)]
- result_payload: Annotated[Optional[str], Field(description='执行结果', default=None)]
- result_size: Annotated[Optional[int], Field(description='执行结果大小', default=None)]
- channel_account_id: Annotated[Optional[str], Field(description='作者上下文', default=None)]
- channel_account_name: Annotated[Optional[str], Field(description='作者上下文', default=None)]
- body_text: Annotated[Optional[str], Field(description='内容上下文', default=None)]
- video_url: Annotated[Optional[str], Field(description='内容视频地址', default=None)]
- def save(self):
- """保存结果到数据库"""
- record = self.model_dump(exclude={'table_name'})
- keys = record.keys()
- sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
- mysql.execute(sql, tuple([record[key] for key in keys]))
-
- def get_images_list(self) -> List[str]:
- """解析 images 字段,返回 URL 列表(逗号分隔格式)"""
- if not self.images:
- return []
- return [url.strip() for url in self.images.split(',') if url.strip()]
- @staticmethod
- def create_result(task_id: str, content: 'ContentParam', table_name: Optional[str] = None) -> 'WorkflowDecodeTaskResult':
- """创建并初始化结果记录
- :param task_id: 任务ID
- :param content: 内容参数
- :param table_name: 可选,自定义结果表名;为空时使用默认的 workflow_decode_task_result
- """
- # 处理 images 列表,使用逗号分隔的字符串格式存储
- # 数据库字段建议使用 TEXT 类型,可存储大量 URL
- # 如果有 video_url,也将其加入到 images 字段中
- images_list = []
- if content.images and len(content.images) > 0:
- images_list.extend(content.images)
- if content.video_url:
- ## 解构阶段要求视频链接也插入到images里面
- images_list.append(content.video_url)
- images_str = ','.join(images_list) if images_list else ''
-
- result = WorkflowDecodeTaskResult(
- task_id=task_id,
- channel_content_id=content.channel_content_id,
- images=images_str,
- title=content.title[:64] if content.title and len(content.title) > 64 else (content.title or ''), # 限制长度
- channel_account_id=content.channel_account_id,
- channel_account_name=content.channel_account_name[:64] if content.channel_account_name and len(content.channel_account_name) > 64 else (content.channel_account_name or ''),
- body_text=content.body_text or '',
- video_url=content.video_url[:100] if content.video_url and len(content.video_url) > 100 else (content.video_url or ''),
- result_payload=None,
- result_size=0
- )
- # 如果指定了表名,则覆盖默认表名
- if table_name:
- result.table_name = table_name
- result.save()
- return result
|