demand_build_agent_tools.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import json
  2. from pathlib import Path
  3. from typing import Any, Dict, List, Optional
  4. from agent import tool
  5. from examples.piaoquan_demand.topic_build_agent_context import TopicBuildAgentContext
  6. from examples.piaoquan_demand.topic_build_pattern_tools import _log_tool_output, _log_tool_input
  7. def _get_result_base_dir() -> Path:
  8. """输出到“当前工作目录/result/”下。"""
  9. return Path.cwd() / "result"
  10. @tool(
  11. "存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)"
  12. )
  13. def create_demand_item(
  14. element_names: List[str] = None,
  15. reason: str = None,
  16. desc: str = None) -> str:
  17. """
  18. 每次调用向“execution_id 对应的本地 JSON 文件”追加一条记录。
  19. 写入对象仅包含三个字段:
  20. - element_names
  21. - reason(原因)
  22. - desc(需求描述)
  23. """
  24. execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
  25. params: Dict[str, Any] = {
  26. "execution_id": execution_id,
  27. "element_names": element_names,
  28. "reason": reason,
  29. "desc": desc,
  30. }
  31. _log_tool_input("create_demand_item", params)
  32. if not execution_id:
  33. return _log_tool_output("create_demand_item", "错误: 未设置 execution_id")
  34. record: Dict[str, Any] = {
  35. "element_names": element_names,
  36. "reason": reason,
  37. "desc": desc,
  38. }
  39. # 按 execution_id 区分文件,避免不同执行互相污染。
  40. # 例如:result/{execution_id}/execution_id_{execution_id}_demand_items.json
  41. output_dir = _get_result_base_dir() / f"{execution_id}"
  42. output_path = output_dir / f"execution_id_{execution_id}_demand_items.json"
  43. output_path.parent.mkdir(parents=True, exist_ok=True)
  44. items: List[Dict[str, Any]] = []
  45. if output_path.exists():
  46. try:
  47. with open(output_path, "r", encoding="utf-8") as f:
  48. loaded = json.load(f)
  49. if isinstance(loaded, list):
  50. items = loaded
  51. elif isinstance(loaded, dict) and isinstance(loaded.get("items"), list):
  52. # 兼容可能的包装格式:{"items":[...]}
  53. items = loaded["items"]
  54. else:
  55. # 兜底:把已有内容当作单条记录追加
  56. items = [loaded]
  57. except json.JSONDecodeError:
  58. # 文件内容损坏时,不阻断执行;从空列表开始追加
  59. items = []
  60. items.append(record)
  61. with open(output_path, "w", encoding="utf-8") as f:
  62. json.dump(items, f, ensure_ascii=False, indent=2)
  63. result = json.dumps(
  64. {"success": True, "execution_id": execution_id, "written_to": str(output_path)},
  65. ensure_ascii=False,
  66. )
  67. return _log_tool_output("create_demand_item", result)
  68. @tool(
  69. "批量存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)"
  70. )
  71. def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str:
  72. """
  73. 一次调用追加多条记录到“execution_id 对应的本地 JSON 文件”(JSON 数组)。
  74. 每条记录字段:
  75. - element_names
  76. - reason(原因)
  77. - desc(需求描述)
  78. """
  79. execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
  80. params: Dict[str, Any] = {"execution_id": execution_id, "count": len(demand_items or []),
  81. "demand_items": demand_items}
  82. _log_tool_input("create_demand_items", params)
  83. if not execution_id:
  84. return _log_tool_output("create_demand_items", "错误: 未设置 execution_id")
  85. if not demand_items or not isinstance(demand_items, list):
  86. return _log_tool_output("create_demand_items", "错误: demand_items 必须为非空列表")
  87. output_dir = _get_result_base_dir() / f"{execution_id}"
  88. output_path = output_dir / f"execution_id_{execution_id}_demand_items.json"
  89. output_path.parent.mkdir(parents=True, exist_ok=True)
  90. items: List[Dict[str, Any]] = []
  91. if output_path.exists():
  92. try:
  93. with open(output_path, "r", encoding="utf-8") as f:
  94. loaded = json.load(f)
  95. if isinstance(loaded, list):
  96. items = loaded
  97. elif isinstance(loaded, dict) and isinstance(loaded.get("items"), list):
  98. items = loaded["items"]
  99. else:
  100. items = [loaded]
  101. except json.JSONDecodeError:
  102. items = []
  103. written_records: List[Dict[str, Any]] = []
  104. for i, di in enumerate(demand_items):
  105. if not isinstance(di, dict):
  106. return _log_tool_output("create_demand_items", f"错误: demand_items[{i}] 必须为对象(dict)")
  107. record = {
  108. "element_names": di.get("element_names"),
  109. "reason": di.get("reason"),
  110. "desc": di.get("desc"),
  111. }
  112. written_records.append(record)
  113. items.extend(written_records)
  114. with open(output_path, "w", encoding="utf-8") as f:
  115. json.dump(items, f, ensure_ascii=False, indent=2)
  116. result = json.dumps(
  117. {
  118. "success": True,
  119. "execution_id": execution_id,
  120. "written_to": str(output_path),
  121. "written_count": len(written_records),
  122. },
  123. ensure_ascii=False,
  124. )
  125. return _log_tool_output("create_demand_items", result)
  126. @tool(
  127. "写入本次执行总结(在所有分类完成后调用)。"
  128. "\n\n该工具用于把最终总结记录到本地/trace输出中(框架侧通过返回值与日志落盘)。"
  129. )
  130. def write_execution_summary(summary: str) -> str:
  131. """写入本次执行总结。在所有分类完成后调用。
  132. Args:
  133. summary: 执行总结(Markdown 格式)。
  134. Returns:
  135. JSON 字符串:
  136. - 成功:`{"success": True, "execution_id": execution_id}`
  137. - 失败:`"错误: 未设置 execution_id"`
  138. """
  139. execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
  140. params: Dict[str, str] = {"summary": summary}
  141. _log_tool_input("write_execution_summary", params)
  142. if not execution_id:
  143. return _log_tool_output("write_execution_summary", "错误: 未设置 execution_id")
  144. result = json.dumps({"success": True, "execution_id": execution_id}, ensure_ascii=False)
  145. return _log_tool_output("write_execution_summary", result)