_util.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import json
  2. from typing import Dict, List
  3. from app.infra.internal import DecodeServer
  4. from ._const import InnerArticlesDecodeConst
  5. class InnerArticlesDecodeUtil(InnerArticlesDecodeConst):
  6. decode_server = DecodeServer()
  7. @staticmethod
  8. def format_images(images: str) -> List[str]:
  9. """
  10. 格式化图片字符串,空/非法 JSON 返回空列表。
  11. """
  12. if not images or not images.strip():
  13. return []
  14. try:
  15. image_list = json.loads(images)
  16. except (json.JSONDecodeError, TypeError):
  17. return []
  18. if not isinstance(image_list, list):
  19. return []
  20. return [
  21. i.get("image_url")
  22. for i in image_list
  23. if isinstance(i, dict) and i.get("image_url")
  24. ]
  25. async def create_decode_task(self, article: Dict, article_produce_info: List[Dict]):
  26. images = [
  27. i["output"]
  28. for i in article_produce_info
  29. if i["produce_module_type"]
  30. in (self.ProduceModuleType.COVER, self.ProduceModuleType.IMAGE)
  31. ]
  32. article["images"] = images
  33. text = [
  34. i["output"]
  35. for i in article_produce_info
  36. if i["produce_module_type"] == self.ProduceModuleType.CONTENT
  37. ]
  38. article["article_text"] = "\n".join(text)
  39. request_body = self.prepare_extract_body(article)
  40. return await self.decode_server.create_decode_task(request_body)
  41. async def fetch_decode_result(self, task_id: str):
  42. return await self.decode_server.fetch_result(task_id)
  43. def prepare_extract_body(self, article: Dict) -> Dict:
  44. return {
  45. "scene": self.BusinessScene.POINT_PICK,
  46. "content_type": self.ContentType.LONG_ARTICLE,
  47. "content": {
  48. "channel_content_id": article.get("wx_sn", ""),
  49. "video_url": "",
  50. "images": article.get("images", []),
  51. "body_text": article.get("article_text", ""),
  52. "title": article.get("title", ""),
  53. "channel_account_id": article.get("gh_id", ""),
  54. "channel_account_name": article.get("account_name", ""),
  55. },
  56. }
  57. @staticmethod
  58. def extract_decode_result(result: Dict) -> Dict:
  59. """
  60. 从结构的结果中,解析出灵感点、目的点、关键点;
  61. """
  62. final_result = result.get("final_normalization_rebuild")
  63. if not final_result:
  64. return {"error": "解构结果中无 final_normalization_rebuild 信息"}
  65. # 灵感点
  66. inspiration_list = final_result.get("inspiration_final_result", {}).get(
  67. "最终灵感点列表", []
  68. )
  69. # 目的
  70. purpose_list = final_result.get("purpose_final_result", {}).get(
  71. "最终目的点列表", []
  72. )
  73. # 关键点
  74. keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
  75. topic_fusion = final_result.get("topic_fusion_result", {})
  76. # 选题
  77. topic_text = (
  78. topic_fusion.get("最终选题", {}).get("选题", "")
  79. if isinstance(topic_fusion.get("最终选题"), dict)
  80. else ""
  81. )
  82. def _join_points(items: list, key: str) -> str:
  83. parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
  84. return ",".join(parts)
  85. return {
  86. "inspiration": _join_points(inspiration_list, "灵感点"),
  87. "purpose": _join_points(purpose_list, "目的点"),
  88. "key_point": _join_points(keypoint_list, "关键点"),
  89. "topic": topic_text,
  90. }