decode_task_result.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from typing import Optional, List, TYPE_CHECKING
  2. from pydantic import BaseModel, Field
  3. from typing_extensions import Annotated
  4. from utils.sync_mysql_help import mysql
  5. if TYPE_CHECKING:
  6. from utils.params import ContentParam
  7. class WorkflowDecodeTaskResult(BaseModel):
  8. table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_decode_task_result'
  9. task_id: Annotated[str, Field(description='任务ID')]
  10. channel_content_id: Annotated[str, Field(description='内容ID')]
  11. images: Annotated[str, Field(description='内容url', default='')]
  12. title: Annotated[str, Field(description='内容title', default='')]
  13. error_code: Annotated[Optional[str], Field(description='失败码', default=None)]
  14. error_message: Annotated[Optional[str], Field(description='失败信息', default=None)]
  15. result_payload: Annotated[Optional[str], Field(description='执行结果', default=None)]
  16. result_size: Annotated[Optional[int], Field(description='执行结果大小', default=None)]
  17. channel_account_id: Annotated[Optional[str], Field(description='作者上下文', default=None)]
  18. channel_account_name: Annotated[Optional[str], Field(description='作者上下文', default=None)]
  19. body_text: Annotated[Optional[str], Field(description='内容上下文', default=None)]
  20. video_url: Annotated[Optional[str], Field(description='内容视频地址', default=None)]
  21. def save(self):
  22. """保存结果到数据库"""
  23. record = self.model_dump(exclude={'table_name'})
  24. keys = record.keys()
  25. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  26. mysql.execute(sql, tuple([record[key] for key in keys]))
  27. def get_images_list(self) -> List[str]:
  28. """解析 images 字段,返回 URL 列表(逗号分隔格式)"""
  29. if not self.images:
  30. return []
  31. return [url.strip() for url in self.images.split(',') if url.strip()]
  32. @staticmethod
  33. def create_result(task_id: str, content: 'ContentParam') -> 'WorkflowDecodeTaskResult':
  34. """创建并初始化结果记录"""
  35. # 处理 images 列表,使用逗号分隔的字符串格式存储
  36. # 数据库字段建议使用 TEXT 类型,可存储大量 URL
  37. # 如果有 video_url,也将其加入到 images 字段中
  38. images_list = []
  39. if content.images and len(content.images) > 0:
  40. images_list.extend(content.images)
  41. if content.video_url:
  42. ## 解构阶段要求视频链接也插入到images里面
  43. images_list.append(content.video_url)
  44. images_str = ','.join(images_list) if images_list else ''
  45. result = WorkflowDecodeTaskResult(
  46. task_id=task_id,
  47. channel_content_id=content.channel_content_id,
  48. images=images_str,
  49. title=content.title[:64] if content.title and len(content.title) > 64 else (content.title or ''), # 限制长度
  50. channel_account_id=content.channel_account_id,
  51. channel_account_name=content.channel_account_name[:64] if content.channel_account_name and len(content.channel_account_name) > 64 else (content.channel_account_name or ''),
  52. body_text=content.body_text or '',
  53. video_url=content.video_url[:100] if content.video_url and len(content.video_url) > 100 else (content.video_url or ''),
  54. result_payload=None,
  55. result_size=0
  56. )
  57. result.save()
  58. return result