""" 执行记录收集器 从各个模块的缓存目录收集执行详情,汇总成完整的execution_record.json """ import os import json import hashlib from typing import Dict, Any, List from loguru import logger class ExecutionCollector: """执行记录收集器""" def __init__(self, base_cache_dir: str = None): """ 初始化 Args: base_cache_dir: 缓存基础目录,默认为当前目录下的.cache """ if base_cache_dir is None: current_dir = os.path.dirname(os.path.abspath(__file__)) base_cache_dir = os.path.join(current_dir, '.cache') self.base_cache_dir = base_cache_dir def collect_execution_record(self, cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]: """ 收集完整的执行记录 Args: cache_key: 缓存键(通常是combined_question) input_info: 输入信息 {"question": ..., "post_info": ..., "persona_info": ...} Returns: dict: 完整的执行记录 """ logger.info("=" * 60) logger.info("开始收集执行记录...") # 计算hash question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12] cache_dir = os.path.join(self.base_cache_dir, question_hash) if not os.path.exists(cache_dir): logger.warning(f"缓存目录不存在: {cache_dir}") return self._create_empty_record(input_info) # 初始化执行记录 execution_record = { "input": input_info, "execution": { "modules": {} }, "result": { "type": None, "content": None, "raw_data": None }, "metadata": { "execution_time": 0, "cache_hits": [], "errors": [] } } # 收集各模块的执行详情 try: # 1. 收集 function_knowledge 的详情 function_detail = self._collect_function_knowledge_detail(cache_dir) if function_detail: execution_record["execution"]["modules"]["function_knowledge"] = function_detail # 2. 收集 multi_search 的详情 multi_detail = self._collect_multi_search_detail(cache_dir) if multi_detail: execution_record["execution"]["modules"]["multi_search"] = multi_detail # 3. 收集 llm_search 的详情 llm_detail = self._collect_llm_search_detail(cache_dir) if llm_detail: execution_record["execution"]["modules"]["llm_search"] = llm_detail # 4.设置结果信息 result_detail = self._collect_result_detail(cache_dir) if result_detail: execution_record["result"] = result_detail # 5. 计算总结信息 self._calculate_summary(execution_record) logger.info("✓ 执行记录收集完成") logger.info("=" * 60) except Exception as e: logger.error(f"✗ 收集执行记录失败: {e}") execution_record["metadata"]["errors"].append(str(e)) return execution_record def _collect_function_knowledge_detail(self, cache_dir: str) -> Dict[str, Any]: """收集function_knowledge模块的详情""" detail_file = os.path.join(cache_dir, 'function_knowledge', 'execution_detail.json') if os.path.exists(detail_file): try: with open(detail_file, 'r', encoding='utf-8') as f: detail = json.load(f) logger.info(" ✓ 收集 function_knowledge 详情") return detail except Exception as e: logger.error(f" ✗ 读取 function_knowledge 详情失败: {e}") return None def _collect_multi_search_detail(self, cache_dir: str) -> Dict[str, Any]: """收集multi_search模块的详情""" detail_file = os.path.join(cache_dir, 'multi_search', 'execution_detail.json') if os.path.exists(detail_file): try: with open(detail_file, 'r', encoding='utf-8') as f: detail = json.load(f) logger.info(" ✓ 收集 multi_search 详情") return detail except Exception as e: logger.error(f" ✗ 读取 multi_search 详情失败: {e}") return None def _collect_llm_search_detail(self, cache_dir: str) -> Dict[str, Any]: """收集llm_search模块的详情""" detail_file = os.path.join(cache_dir, 'llm_search', 'execution_detail.json') if os.path.exists(detail_file): try: with open(detail_file, 'r', encoding='utf-8') as f: detail = json.load(f) logger.info(" ✓ 收集 llm_search 详情") return detail except Exception as e: logger.error(f" ✗ 读取 llm_search 详情失败: {e}") return None def _collect_result_detail(self, cache_dir: str) -> Dict[str, Any]: """收集result模块的详情""" detail_file = os.path.join(cache_dir, 'function_knowledge', 'tool_result.json') if os.path.exists(detail_file): try: with open(detail_file, 'r', encoding='utf-8') as f: detail = json.load(f) logger.info(" ✓ 收集 result 详情") return detail except Exception as e: logger.error(f" ✗ 读取 result 详情失败: {e}") return None def _calculate_summary(self, execution_record: Dict[str, Any]): """计算总结信息""" total_time = 0 cache_hits = [] # 遍历所有模块 for module_name, module_detail in execution_record["execution"]["modules"].items(): if "execution_time" in module_detail: total_time += module_detail["execution_time"] if "cache_hits" in module_detail: cache_hits.extend([f"{module_name}/{hit}" for hit in module_detail["cache_hits"]]) execution_record["metadata"]["execution_time"] = total_time execution_record["metadata"]["cache_hits"] = cache_hits def _create_empty_record(self, input_info: Dict[str, Any]) -> Dict[str, Any]: """创建空的执行记录""" return { "input": input_info, "execution": { "steps": [], "modules": {} }, "result": { "type": "error", "content": "缓存目录不存在", "raw_data": None }, "metadata": { "execution_time": 0, "cache_hits": [], "errors": ["缓存目录不存在"] } } def save_execution_record(self, cache_key: str, execution_record: Dict[str, Any]) -> str: """ 保存执行记录到文件 Args: cache_key: 缓存键 execution_record: 执行记录 Returns: str: 保存的文件路径 """ question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12] cache_dir = os.path.join(self.base_cache_dir, question_hash) os.makedirs(cache_dir, exist_ok=True) output_file = os.path.join(cache_dir, 'execution_record.json') try: with open(output_file, 'w', encoding='utf-8') as f: json.dump(execution_record, f, ensure_ascii=False, indent=2) logger.info(f"✓ 执行记录已保存: {output_file}") return output_file except Exception as e: logger.error(f"✗ 保存执行记录失败: {e}") raise def collect_and_save_execution_record(cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]: """ 便捷函数:收集并保存执行记录 Args: cache_key: 缓存键 input_info: 输入信息 Returns: dict: 完整的执行记录 """ collector = ExecutionCollector() execution_record = collector.collect_execution_record(cache_key, input_info) collector.save_execution_record(cache_key, execution_record) return execution_record if __name__ == "__main__": # 测试 import time cache_key = "测试问题||无||测试人设" input_info = { "question": "测试问题", "post_info": "无", "persona_info": "测试人设", "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } record = collect_and_save_execution_record(cache_key, input_info) print(json.dumps(record, ensure_ascii=False, indent=2))