decode_task_result.py 4.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from typing import Optional, List, TYPE_CHECKING
  2. from pydantic import BaseModel, Field
  3. from typing_extensions import Annotated
  4. from utils.sync_mysql_help import mysql
  5. if TYPE_CHECKING:
  6. from utils.params import ContentParam
  7. class WorkflowDecodeTaskResult(BaseModel):
  8. table_name: Annotated[Optional[str], Field(description='表名', exclude=True)] = 'workflow_decode_task_result'
  9. task_id: Annotated[str, Field(description='任务ID')]
  10. channel_content_id: Annotated[str, Field(description='内容ID')]
  11. images: Annotated[str, Field(description='内容url', default='')]
  12. title: Annotated[str, Field(description='内容title', default='')]
  13. error_code: Annotated[Optional[str], Field(description='失败码', default=None)]
  14. error_message: Annotated[Optional[str], Field(description='失败信息', default=None)]
  15. result_payload: Annotated[Optional[str], Field(description='执行结果', default=None)]
  16. result_size: Annotated[Optional[int], Field(description='执行结果大小', default=None)]
  17. channel_account_id: Annotated[Optional[str], Field(description='作者上下文', default=None)]
  18. channel_account_name: Annotated[Optional[str], Field(description='作者上下文', default=None)]
  19. body_text: Annotated[Optional[str], Field(description='内容上下文', default=None)]
  20. video_url: Annotated[Optional[str], Field(description='内容视频地址', default=None)]
  21. merge_leve2: Annotated[Optional[str], Field(description='二级品类/标签', default=None)]
  22. def save(self):
  23. """保存结果到数据库"""
  24. record = self.model_dump(exclude={'table_name'})
  25. keys = record.keys()
  26. sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
  27. mysql.execute(sql, tuple([record[key] for key in keys]))
  28. def get_images_list(self) -> List[str]:
  29. """解析 images 字段,返回 URL 列表(逗号分隔格式)"""
  30. if not self.images:
  31. return []
  32. return [url.strip() for url in self.images.split(',') if url.strip()]
  33. @staticmethod
  34. def create_result(task_id: str, content: 'ContentParam', table_name: Optional[str] = None) -> 'WorkflowDecodeTaskResult':
  35. """创建并初始化结果记录
  36. :param task_id: 任务ID
  37. :param content: 内容参数
  38. :param table_name: 可选,自定义结果表名;为空时使用默认的 workflow_decode_task_result
  39. """
  40. # 处理 images 列表,使用逗号分隔的字符串格式存储
  41. # 数据库字段建议使用 TEXT 类型,可存储大量 URL
  42. # 如果有 video_url,也将其加入到 images 字段中
  43. images_list = []
  44. if content.images and len(content.images) > 0:
  45. images_list.extend(content.images)
  46. if content.video_url:
  47. ## 解构阶段要求视频链接也插入到images里面
  48. images_list.append(content.video_url)
  49. images_str = ','.join(images_list) if images_list else ''
  50. ml2 = (content.merge_leve2 or "").strip()
  51. merge_leve2_val: Optional[str] = None
  52. if ml2:
  53. merge_leve2_val = ml2[:128] if len(ml2) > 128 else ml2
  54. result = WorkflowDecodeTaskResult(
  55. task_id=task_id,
  56. channel_content_id=content.channel_content_id,
  57. images=images_str,
  58. title=content.title[:64] if content.title and len(content.title) > 64 else (content.title or ''), # 限制长度
  59. channel_account_id=content.channel_account_id,
  60. channel_account_name=content.channel_account_name[:64] if content.channel_account_name and len(content.channel_account_name) > 64 else (content.channel_account_name or ''),
  61. body_text=content.body_text or '',
  62. video_url=content.video_url[:100] if content.video_url and len(content.video_url) > 100 else (content.video_url or ''),
  63. merge_leve2=merge_leve2_val,
  64. result_payload=None,
  65. result_size=0
  66. )
  67. # 如果指定了表名,则覆盖默认表名
  68. if table_name:
  69. result.table_name = table_name
  70. result.save()
  71. return result