decode_task_result.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from typing import Optional, List, TYPE_CHECKING
  2. from pydantic import BaseModel, Field
  3. from typing_extensions import Annotated
  4. from utils.sync_mysql_help import mysql
  5. if TYPE_CHECKING:
  6. from utils.params import ContentParam
  7. class WorkflowDecodeTaskResult(BaseModel):
  8. table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_decode_task_result'
  9. task_id: Annotated[str, Field(description='任务ID')]
  10. channel_content_id: Annotated[str, Field(description='内容ID')]
  11. images: Annotated[str, Field(description='内容url', default='')]
  12. title: Annotated[str, Field(description='内容title', default='')]
  13. error_code: Annotated[Optional[str], Field(description='失败码', default=None)]
  14. error_message: Annotated[Optional[str], Field(description='失败信息', default=None)]
  15. result_payload: Annotated[Optional[str], Field(description='执行结果', default=None)]
  16. result_size: Annotated[Optional[int], Field(description='执行结果大小', default=None)]
  17. channel_account_id: Annotated[Optional[str], Field(description='作者上下文', default=None)]
  18. channel_account_name: Annotated[Optional[str], Field(description='作者上下文', default=None)]
  19. body_text: Annotated[Optional[str], Field(description='内容上下文', default=None)]
  20. video_url: Annotated[Optional[str], Field(description='内容视频地址', default=None)]
  21. def save(self):
  22. """保存结果到数据库"""
  23. record = self.model_dump(exclude={'table_name'})
  24. keys = record.keys()
  25. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  26. mysql.execute(sql, tuple([record[key] for key in keys]))
  27. def get_images_list(self) -> List[str]:
  28. """解析 images 字段,返回 URL 列表(逗号分隔格式)"""
  29. if not self.images:
  30. return []
  31. return [url.strip() for url in self.images.split(',') if url.strip()]
  32. @staticmethod
  33. def create_result(task_id: str, content: 'ContentParam', table_name: Optional[str] = None) -> 'WorkflowDecodeTaskResult':
  34. """创建并初始化结果记录
  35. :param task_id: 任务ID
  36. :param content: 内容参数
  37. :param table_name: 可选,自定义结果表名;为空时使用默认的 workflow_decode_task_result
  38. """
  39. # 处理 images 列表,使用逗号分隔的字符串格式存储
  40. # 数据库字段建议使用 TEXT 类型,可存储大量 URL
  41. # 如果有 video_url,也将其加入到 images 字段中
  42. images_list = []
  43. if content.images and len(content.images) > 0:
  44. images_list.extend(content.images)
  45. if content.video_url:
  46. ## 解构阶段要求视频链接也插入到images里面
  47. images_list.append(content.video_url)
  48. images_str = ','.join(images_list) if images_list else ''
  49. result = WorkflowDecodeTaskResult(
  50. task_id=task_id,
  51. channel_content_id=content.channel_content_id,
  52. images=images_str,
  53. title=content.title[:64] if content.title and len(content.title) > 64 else (content.title or ''), # 限制长度
  54. channel_account_id=content.channel_account_id,
  55. channel_account_name=content.channel_account_name[:64] if content.channel_account_name and len(content.channel_account_name) > 64 else (content.channel_account_name or ''),
  56. body_text=content.body_text or '',
  57. video_url=content.video_url[:100] if content.video_url and len(content.video_url) > 100 else (content.video_url or ''),
  58. result_payload=None,
  59. result_size=0
  60. )
  61. # 如果指定了表名,则覆盖默认表名
  62. if table_name:
  63. result.table_name = table_name
  64. result.save()
  65. return result