import json from pathlib import Path from typing import Any, Dict, List, Optional from agent import tool from examples.piaoquan_demand.topic_build_agent_context import TopicBuildAgentContext from examples.piaoquan_demand.topic_build_pattern_tools import _log_tool_output, _log_tool_input def _get_result_base_dir() -> Path: """输出到“当前工作目录/result/”下。""" return Path.cwd() / "result" @tool( "存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)" ) def create_demand_item( element_names: List[str] = None, reason: str = None, desc: str = None) -> str: """ 每次调用向“execution_id 对应的本地 JSON 文件”追加一条记录。 写入对象仅包含三个字段: - element_names - reason(原因) - desc(需求描述) """ execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id() params: Dict[str, Any] = { "execution_id": execution_id, "element_names": element_names, "reason": reason, "desc": desc, } _log_tool_input("create_demand_item", params) if not execution_id: return _log_tool_output("create_demand_item", "错误: 未设置 execution_id") record: Dict[str, Any] = { "element_names": element_names, "reason": reason, "desc": desc, } # 按 execution_id 区分文件,避免不同执行互相污染。 # 例如:result/{execution_id}/execution_id_{execution_id}_demand_items.json output_dir = _get_result_base_dir() / f"{execution_id}" output_path = output_dir / f"execution_id_{execution_id}_demand_items.json" output_path.parent.mkdir(parents=True, exist_ok=True) items: List[Dict[str, Any]] = [] if output_path.exists(): try: with open(output_path, "r", encoding="utf-8") as f: loaded = json.load(f) if isinstance(loaded, list): items = loaded elif isinstance(loaded, dict) and isinstance(loaded.get("items"), list): # 兼容可能的包装格式:{"items":[...]} items = loaded["items"] else: # 兜底:把已有内容当作单条记录追加 items = [loaded] except json.JSONDecodeError: # 文件内容损坏时,不阻断执行;从空列表开始追加 items = [] items.append(record) with open(output_path, "w", encoding="utf-8") as f: json.dump(items, f, ensure_ascii=False, indent=2) result = json.dumps( {"success": True, "execution_id": execution_id, "written_to": str(output_path)}, ensure_ascii=False, ) return _log_tool_output("create_demand_item", result) @tool( "批量存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)" ) def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str: """ 一次调用追加多条记录到“execution_id 对应的本地 JSON 文件”(JSON 数组)。 每条记录字段: - element_names - reason(原因) - desc(需求描述) """ execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id() params: Dict[str, Any] = {"execution_id": execution_id, "count": len(demand_items or []), "demand_items": demand_items} _log_tool_input("create_demand_items", params) if not execution_id: return _log_tool_output("create_demand_items", "错误: 未设置 execution_id") if not demand_items or not isinstance(demand_items, list): return _log_tool_output("create_demand_items", "错误: demand_items 必须为非空列表") output_dir = _get_result_base_dir() / f"{execution_id}" output_path = output_dir / f"execution_id_{execution_id}_demand_items.json" output_path.parent.mkdir(parents=True, exist_ok=True) items: List[Dict[str, Any]] = [] if output_path.exists(): try: with open(output_path, "r", encoding="utf-8") as f: loaded = json.load(f) if isinstance(loaded, list): items = loaded elif isinstance(loaded, dict) and isinstance(loaded.get("items"), list): items = loaded["items"] else: items = [loaded] except json.JSONDecodeError: items = [] written_records: List[Dict[str, Any]] = [] for i, di in enumerate(demand_items): if not isinstance(di, dict): return _log_tool_output("create_demand_items", f"错误: demand_items[{i}] 必须为对象(dict)") record = { "element_names": di.get("element_names"), "reason": di.get("reason"), "desc": di.get("desc"), } written_records.append(record) items.extend(written_records) with open(output_path, "w", encoding="utf-8") as f: json.dump(items, f, ensure_ascii=False, indent=2) result = json.dumps( { "success": True, "execution_id": execution_id, "written_to": str(output_path), "written_count": len(written_records), }, ensure_ascii=False, ) return _log_tool_output("create_demand_items", result) @tool( "写入本次执行总结(在所有分类完成后调用)。" "\n\n该工具用于把最终总结记录到本地/trace输出中(框架侧通过返回值与日志落盘)。" ) def write_execution_summary(summary: str) -> str: """写入本次执行总结。在所有分类完成后调用。 Args: summary: 执行总结(Markdown 格式)。 Returns: JSON 字符串: - 成功:`{"success": True, "execution_id": execution_id}` - 失败:`"错误: 未设置 execution_id"` """ execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id() params: Dict[str, str] = {"summary": summary} _log_tool_input("write_execution_summary", params) if not execution_id: return _log_tool_output("write_execution_summary", "错误: 未设置 execution_id") result = json.dumps({"success": True, "execution_id": execution_id}, ensure_ascii=False) return _log_tool_output("write_execution_summary", result)