from typing import Optional, List, TYPE_CHECKING from pydantic import BaseModel, Field from typing_extensions import Annotated from utils.sync_mysql_help import mysql if TYPE_CHECKING: from utils.params import ContentParam class WorkflowDecodeTaskResult(BaseModel): table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_decode_task_result' task_id: Annotated[str, Field(description='任务ID')] channel_content_id: Annotated[str, Field(description='内容ID')] images: Annotated[str, Field(description='内容url', default='')] title: Annotated[str, Field(description='内容title', default='')] error_code: Annotated[Optional[str], Field(description='失败码', default=None)] error_message: Annotated[Optional[str], Field(description='失败信息', default=None)] result_payload: Annotated[Optional[str], Field(description='执行结果', default=None)] result_size: Annotated[Optional[int], Field(description='执行结果大小', default=None)] channel_account_id: Annotated[Optional[str], Field(description='作者上下文', default=None)] channel_account_name: Annotated[Optional[str], Field(description='作者上下文', default=None)] body_text: Annotated[Optional[str], Field(description='内容上下文', default=None)] video_url: Annotated[Optional[str], Field(description='内容视频地址', default=None)] def save(self): """保存结果到数据库""" record = self.model_dump(exclude={'table_name'}) keys = record.keys() sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})''' mysql.execute(sql, tuple([record[key] for key in keys])) def get_images_list(self) -> List[str]: """解析 images 字段,返回 URL 列表(逗号分隔格式)""" if not self.images: return [] return [url.strip() for url in self.images.split(',') if url.strip()] @staticmethod def create_result(task_id: str, content: 'ContentParam') -> 'WorkflowDecodeTaskResult': """创建并初始化结果记录""" # 处理 images 列表,使用逗号分隔的字符串格式存储 # 数据库字段建议使用 TEXT 类型,可存储大量 URL # 如果有 video_url,也将其加入到 images 字段中 images_list = [] if content.images and len(content.images) > 0: images_list.extend(content.images) if content.video_url: ## 解构阶段要求视频链接也插入到images里面 images_list.append(content.video_url) images_str = ','.join(images_list) if images_list else '' result = WorkflowDecodeTaskResult( task_id=task_id, channel_content_id=content.channel_content_id, images=images_str, title=content.title[:64] if len(content.title) > 64 else content.title, # 限制长度 channel_account_id=content.channel_account_id, channel_account_name=content.channel_account_name[:64] if content.channel_account_name and len(content.channel_account_name) > 64 else content.channel_account_name, body_text=content.body_text, video_url=content.video_url[:100] if content.video_url and len(content.video_url) > 100 else content.video_url, result_payload=None, result_size=0 ) result.save() return result