| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import json
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from agent import tool
- from examples.piaoquan_demand.topic_build_agent_context import TopicBuildAgentContext
- from examples.piaoquan_demand.topic_build_pattern_tools import _log_tool_output, _log_tool_input
- def _get_result_base_dir() -> Path:
- """输出到“当前工作目录/result/”下。"""
- return Path.cwd() / "result"
- @tool(
- "存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)"
- )
- def create_demand_item(
- element_names: List[str] = None,
- reason: str = None,
- desc: str = None) -> str:
- """
- 每次调用向“execution_id 对应的本地 JSON 文件”追加一条记录。
- 写入对象仅包含三个字段:
- - element_names
- - reason(原因)
- - desc(需求描述)
- """
- execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
- params: Dict[str, Any] = {
- "execution_id": execution_id,
- "element_names": element_names,
- "reason": reason,
- "desc": desc,
- }
- _log_tool_input("create_demand_item", params)
- if not execution_id:
- return _log_tool_output("create_demand_item", "错误: 未设置 execution_id")
- record: Dict[str, Any] = {
- "element_names": element_names,
- "reason": reason,
- "desc": desc,
- }
- # 按 execution_id 区分文件,避免不同执行互相污染。
- # 例如:result/{execution_id}/execution_id_{execution_id}_demand_items.json
- output_dir = _get_result_base_dir() / f"{execution_id}"
- output_path = output_dir / f"execution_id_{execution_id}_demand_items.json"
- output_path.parent.mkdir(parents=True, exist_ok=True)
- items: List[Dict[str, Any]] = []
- if output_path.exists():
- try:
- with open(output_path, "r", encoding="utf-8") as f:
- loaded = json.load(f)
- if isinstance(loaded, list):
- items = loaded
- elif isinstance(loaded, dict) and isinstance(loaded.get("items"), list):
- # 兼容可能的包装格式:{"items":[...]}
- items = loaded["items"]
- else:
- # 兜底:把已有内容当作单条记录追加
- items = [loaded]
- except json.JSONDecodeError:
- # 文件内容损坏时,不阻断执行;从空列表开始追加
- items = []
- items.append(record)
- with open(output_path, "w", encoding="utf-8") as f:
- json.dump(items, f, ensure_ascii=False, indent=2)
- result = json.dumps(
- {"success": True, "execution_id": execution_id, "written_to": str(output_path)},
- ensure_ascii=False,
- )
- return _log_tool_output("create_demand_item", result)
- @tool(
- "批量存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)"
- )
- def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str:
- """
- 一次调用追加多条记录到“execution_id 对应的本地 JSON 文件”(JSON 数组)。
- 每条记录字段:
- - element_names
- - reason(原因)
- - desc(需求描述)
- """
- execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
- params: Dict[str, Any] = {"execution_id": execution_id, "count": len(demand_items or []),
- "demand_items": demand_items}
- _log_tool_input("create_demand_items", params)
- if not execution_id:
- return _log_tool_output("create_demand_items", "错误: 未设置 execution_id")
- if not demand_items or not isinstance(demand_items, list):
- return _log_tool_output("create_demand_items", "错误: demand_items 必须为非空列表")
- output_dir = _get_result_base_dir() / f"{execution_id}"
- output_path = output_dir / f"execution_id_{execution_id}_demand_items.json"
- output_path.parent.mkdir(parents=True, exist_ok=True)
- items: List[Dict[str, Any]] = []
- if output_path.exists():
- try:
- with open(output_path, "r", encoding="utf-8") as f:
- loaded = json.load(f)
- if isinstance(loaded, list):
- items = loaded
- elif isinstance(loaded, dict) and isinstance(loaded.get("items"), list):
- items = loaded["items"]
- else:
- items = [loaded]
- except json.JSONDecodeError:
- items = []
- written_records: List[Dict[str, Any]] = []
- for i, di in enumerate(demand_items):
- if not isinstance(di, dict):
- return _log_tool_output("create_demand_items", f"错误: demand_items[{i}] 必须为对象(dict)")
- record = {
- "element_names": di.get("element_names"),
- "reason": di.get("reason"),
- "desc": di.get("desc"),
- }
- written_records.append(record)
- items.extend(written_records)
- with open(output_path, "w", encoding="utf-8") as f:
- json.dump(items, f, ensure_ascii=False, indent=2)
- result = json.dumps(
- {
- "success": True,
- "execution_id": execution_id,
- "written_to": str(output_path),
- "written_count": len(written_records),
- },
- ensure_ascii=False,
- )
- return _log_tool_output("create_demand_items", result)
- @tool(
- "写入本次执行总结(在所有分类完成后调用)。"
- "\n\n该工具用于把最终总结记录到本地/trace输出中(框架侧通过返回值与日志落盘)。"
- )
- def write_execution_summary(summary: str) -> str:
- """写入本次执行总结。在所有分类完成后调用。
- Args:
- summary: 执行总结(Markdown 格式)。
- Returns:
- JSON 字符串:
- - 成功:`{"success": True, "execution_id": execution_id}`
- - 失败:`"错误: 未设置 execution_id"`
- """
- execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
- params: Dict[str, str] = {"summary": summary}
- _log_tool_input("write_execution_summary", params)
- if not execution_id:
- return _log_tool_output("write_execution_summary", "错误: 未设置 execution_id")
- result = json.dumps({"success": True, "execution_id": execution_id}, ensure_ascii=False)
- return _log_tool_output("write_execution_summary", result)
|