""" 执行记录收集器 从各个模块的缓存目录收集执行详情,汇总成完整的execution_record.json """ import os import json import hashlib from typing import Dict, Any, List from loguru import logger class ExecutionCollector: """执行记录收集器""" def __init__(self, base_cache_dir: str = None): """ 初始化 Args: base_cache_dir: 缓存基础目录,默认为当前目录下的.cache """ if base_cache_dir is None: current_dir = os.path.dirname(os.path.abspath(__file__)) base_cache_dir = os.path.join(current_dir, '.cache') self.base_cache_dir = base_cache_dir def collect_execution_record(self, cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]: """ 收集完整的执行记录 Args: cache_key: 缓存键(通常是combined_question) input_info: 输入信息 {"question": ..., "post_info": ..., "persona_info": ...} Returns: dict: 完整的执行记录 """ logger.info("=" * 60) logger.info("开始收集执行记录...") # 计算hash question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12] cache_dir = os.path.join(self.base_cache_dir, question_hash) if not os.path.exists(cache_dir): logger.warning(f"缓存目录不存在: {cache_dir}") return self._create_empty_record(input_info) # 初始化执行记录 execution_record = { "input": input_info, "execution": {}, "output": { "result": None, }, "metadata": { "execution_time": input_info.get("execution_time", 0), "cache_hits": [], "errors": [] } } try: # 1. Generate Query query_detail = self._read_json(cache_dir, 'function_knowledge', 'generated_query.json') if query_detail: execution_record["execution"]["generate_query"] = query_detail if query_detail.get("cached"): execution_record["metadata"]["cache_hits"].append("generate_query") # 2. Select Tool tool_detail = self._read_json(cache_dir, 'function_knowledge', 'selected_tool.json') if tool_detail: execution_record["execution"]["select_tool"] = tool_detail if tool_detail.get("cached"): execution_record["metadata"]["cache_hits"].append("select_tool") # 3. Check for Search or Tool Call tool_call_detail = self._read_json(cache_dir, 'function_knowledge', 'tool_call.json') if tool_call_detail: # Flow A: Tool Call # Extract Params params_detail = self._read_json(cache_dir, 'function_knowledge', 'extracted_params.json') if params_detail: execution_record["execution"]["extract_params"] = params_detail if params_detail.get("cached"): execution_record["metadata"]["cache_hits"].append("extract_params") # Tool Call execution_record["execution"]["tool_call"] = tool_call_detail if tool_call_detail.get("cached"): execution_record["metadata"]["cache_hits"].append("tool_call") # Result execution_record["output"]["result"] = tool_call_detail.get("result", "") execution_record["output"]["tool"] = tool_call_detail.get("tool_name", "") else: # Flow B: Search (Multi/LLM) search_detail = self._collect_search_detail(cache_dir) if search_detail: execution_record["execution"]["knowledge_search"] = search_detail filter_tools = self._collect_filter_tools(cache_dir) if filter_tools: execution_record["execution"]["knowledge_search"]["extra_tools"] = filter_tools # Clean up metadata # execution_time is retrieved from input_info if provided logger.info("✓ 执行记录收集完成") logger.info("=" * 60) except Exception as e: logger.error(f"✗ 收集执行记录失败: {e}") execution_record["metadata"]["errors"].append(str(e)) return execution_record def _read_json(self, base_dir: str, *paths) -> Dict[str, Any]: """读取JSON文件""" file_path = os.path.join(base_dir, *paths) if os.path.exists(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"读取JSON文件失败 {file_path}: {e}") return None def _collect_search_detail(self, cache_dir: str) -> Dict[str, Any]: """收集搜索流程详情""" search_detail = {} # 1. LLM Search llm_detail = {} # Generated Queries queries_detail = self._read_json(cache_dir, 'llm_search', 'generated_queries.json') if queries_detail: llm_detail["generated_queries"] = queries_detail # Search Results # Search for search_result_XXX.json search_results = [] llm_search_dir = os.path.join(cache_dir, 'llm_search', 'search_results') if os.path.exists(llm_search_dir): for filename in sorted(os.listdir(llm_search_dir)): if filename.endswith('.json') and filename.startswith('search_result_'): res = self._read_json(llm_search_dir, filename) if res: search_results.append(res) if search_results: llm_detail["search_results"] = search_results # Merge merge_detail_llm = self._read_json(cache_dir, 'llm_search', 'merged_knowledge_detail.json') if merge_detail_llm: llm_detail["merge"] = merge_detail_llm if llm_detail: search_detail["llm_search"] = llm_detail # 2. Multi Search Merge merge_detail_multi = self._read_json(cache_dir, 'multi_search', 'merged_knowledge_detail.json') if merge_detail_multi: search_detail["multi_search_merge"] = merge_detail_multi return search_detail if search_detail else None def _collect_filter_tools(self, cache_dir: str) -> str: """收集筛选工具""" filter_tools = self._read_json(cache_dir, 'multi_search', 'match_tools.json') return filter_tools if filter_tools else None def _create_empty_record(self, input_info: Dict[str, Any]) -> Dict[str, Any]: """创建空的执行记录""" return { "input": input_info, "execution": {}, "result": { "type": "error", "content": "缓存目录不存在", "raw_data": None }, "metadata": { "execution_time": 0, "cache_hits": [], "errors": ["缓存目录不存在"] } } def save_execution_record(self, cache_key: str, execution_record: Dict[str, Any]) -> str: """ 保存执行记录到文件 Args: cache_key: 缓存键 execution_record: 执行记录 Returns: str: 保存的文件路径 """ question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12] cache_dir = os.path.join(self.base_cache_dir, question_hash) os.makedirs(cache_dir, exist_ok=True) output_file = os.path.join(cache_dir, 'execution_record.json') try: with open(output_file, 'w', encoding='utf-8') as f: json.dump(execution_record, f, ensure_ascii=False, indent=2) logger.info(f"✓ 执行记录已保存: {output_file}") return output_file except Exception as e: logger.error(f"✗ 保存执行记录失败: {e}") raise def collect_and_save_execution_record(cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]: """ 便捷函数:收集并保存执行记录 Args: cache_key: 缓存键 input_info: 输入信息 Returns: dict: 完整的执行记录 """ collector = ExecutionCollector() execution_record = collector.collect_execution_record(cache_key, input_info) collector.save_execution_record(cache_key, execution_record) return execution_record if __name__ == "__main__": # 测试 import time cache_key = "测试问题||无||测试人设" input_info = { "question": "测试问题", "post_info": "无", "persona_info": "测试人设", "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # 注意:测试时需要确保缓存目录有数据,否则会返回空记录 try: record = collect_and_save_execution_record(cache_key, input_info) print(json.dumps(record, ensure_ascii=False, indent=2)) except Exception as e: print(f"Test failed: {e}")