| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- """
- 执行记录收集器
- 从各个模块的缓存目录收集执行详情,汇总成完整的execution_record.json
- """
- import os
- import json
- import hashlib
- from typing import Dict, Any, List
- from loguru import logger
- class ExecutionCollector:
- """执行记录收集器"""
-
- def __init__(self, base_cache_dir: str = None):
- """
- 初始化
-
- Args:
- base_cache_dir: 缓存基础目录,默认为当前目录下的.cache
- """
- if base_cache_dir is None:
- current_dir = os.path.dirname(os.path.abspath(__file__))
- base_cache_dir = os.path.join(current_dir, '.cache')
-
- self.base_cache_dir = base_cache_dir
-
- def collect_execution_record(self, cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]:
- """
- 收集完整的执行记录
-
- Args:
- cache_key: 缓存键(通常是combined_question)
- input_info: 输入信息 {"question": ..., "post_info": ..., "persona_info": ...}
-
- Returns:
- dict: 完整的执行记录
- """
- logger.info("=" * 60)
- logger.info("开始收集执行记录...")
-
- # 计算hash
- question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12]
- cache_dir = os.path.join(self.base_cache_dir, question_hash)
-
- if not os.path.exists(cache_dir):
- logger.warning(f"缓存目录不存在: {cache_dir}")
- return self._create_empty_record(input_info)
-
- # 初始化执行记录
- execution_record = {
- "input": input_info,
- "execution": {},
- "output": {
- "result": None,
- },
- "metadata": {
- "execution_time": input_info.get("execution_time", 0),
- "cache_hits": [],
- "errors": []
- }
- }
-
- try:
- # 1. Generate Query
- query_detail = self._read_json(cache_dir, 'function_knowledge', 'generated_query.json')
- if query_detail:
- execution_record["execution"]["generate_query"] = query_detail
- if query_detail.get("cached"):
- execution_record["metadata"]["cache_hits"].append("generate_query")
- # 2. Select Tool
- tool_detail = self._read_json(cache_dir, 'function_knowledge', 'selected_tool.json')
- if tool_detail:
- execution_record["execution"]["select_tool"] = tool_detail
- if tool_detail.get("cached"):
- execution_record["metadata"]["cache_hits"].append("select_tool")
- # 3. Check for Search or Tool Call
- tool_call_detail = self._read_json(cache_dir, 'function_knowledge', 'tool_call.json')
-
- if tool_call_detail:
- # Flow A: Tool Call
-
- # Extract Params
- params_detail = self._read_json(cache_dir, 'function_knowledge', 'extracted_params.json')
- if params_detail:
- execution_record["execution"]["extract_params"] = params_detail
- if params_detail.get("cached"):
- execution_record["metadata"]["cache_hits"].append("extract_params")
-
- # Tool Call
- execution_record["execution"]["tool_call"] = tool_call_detail
- if tool_call_detail.get("cached"):
- execution_record["metadata"]["cache_hits"].append("tool_call")
-
- # Result
- execution_record["output"]["result"] = tool_call_detail.get("result", "")
- execution_record["output"]["tool"] = tool_call_detail.get("tool_name", "")
- else:
- # Flow B: Search (Multi/LLM)
- search_detail = self._collect_search_detail(cache_dir)
- if search_detail:
- execution_record["execution"]["knowledge_search"] = search_detail
-
- # Result
- # merged_knowledge_detail from multi_search usually contains final response
- merged_detail = search_detail.get("multi_search_merge")
- if merged_detail:
- execution_record["result"]["type"] = "knowledge_search"
- response = merged_detail.get("response", "")
- execution_record["result"]["content"] = response
- execution_record["result"]["success"] = True
- if merged_detail.get("cached"):
- execution_record["metadata"]["cache_hits"].append("multi_search_merge")
-
- # Clean up metadata
- # execution_time is retrieved from input_info if provided
-
- logger.info("✓ 执行记录收集完成")
- logger.info("=" * 60)
-
- except Exception as e:
- logger.error(f"✗ 收集执行记录失败: {e}")
- execution_record["metadata"]["errors"].append(str(e))
-
- return execution_record
- def _read_json(self, base_dir: str, *paths) -> Dict[str, Any]:
- """读取JSON文件"""
- file_path = os.path.join(base_dir, *paths)
- if os.path.exists(file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.warning(f"读取JSON文件失败 {file_path}: {e}")
- return None
- def _collect_search_detail(self, cache_dir: str) -> Dict[str, Any]:
- """收集搜索流程详情"""
- search_detail = {}
-
- # 1. LLM Search
- llm_detail = {}
-
- # Generated Queries
- queries_detail = self._read_json(cache_dir, 'llm_search', 'generated_queries.json')
- if queries_detail:
- llm_detail["generated_queries"] = queries_detail
-
- # Search Results
- # Search for search_result_XXX.json
- search_results = []
- llm_search_dir = os.path.join(cache_dir, 'llm_search', 'search_results')
- if os.path.exists(llm_search_dir):
- for filename in sorted(os.listdir(llm_search_dir)):
- if filename.endswith('.json') and filename.startswith('search_result_'):
- res = self._read_json(llm_search_dir, filename)
- if res:
- search_results.append(res)
- if search_results:
- llm_detail["search_results"] = search_results
-
- # Merge
- merge_detail_llm = self._read_json(cache_dir, 'llm_search', 'merged_knowledge_detail.json')
- if merge_detail_llm:
- llm_detail["merge"] = merge_detail_llm
-
- if llm_detail:
- search_detail["llm_search"] = llm_detail
-
- # 2. Multi Search Merge
- merge_detail_multi = self._read_json(cache_dir, 'multi_search', 'merged_knowledge_detail.json')
- if merge_detail_multi:
- search_detail["multi_search_merge"] = merge_detail_multi
-
- return search_detail if search_detail else None
- def _create_empty_record(self, input_info: Dict[str, Any]) -> Dict[str, Any]:
- """创建空的执行记录"""
- return {
- "input": input_info,
- "execution": {},
- "result": {
- "type": "error",
- "content": "缓存目录不存在",
- "raw_data": None
- },
- "metadata": {
- "execution_time": 0,
- "cache_hits": [],
- "errors": ["缓存目录不存在"]
- }
- }
-
- def save_execution_record(self, cache_key: str, execution_record: Dict[str, Any]) -> str:
- """
- 保存执行记录到文件
-
- Args:
- cache_key: 缓存键
- execution_record: 执行记录
-
- Returns:
- str: 保存的文件路径
- """
- question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12]
- cache_dir = os.path.join(self.base_cache_dir, question_hash)
- os.makedirs(cache_dir, exist_ok=True)
-
- output_file = os.path.join(cache_dir, 'execution_record.json')
-
- try:
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(execution_record, f, ensure_ascii=False, indent=2)
-
- logger.info(f"✓ 执行记录已保存: {output_file}")
- return output_file
-
- except Exception as e:
- logger.error(f"✗ 保存执行记录失败: {e}")
- raise
- def collect_and_save_execution_record(cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]:
- """
- 便捷函数:收集并保存执行记录
-
- Args:
- cache_key: 缓存键
- input_info: 输入信息
-
- Returns:
- dict: 完整的执行记录
- """
- collector = ExecutionCollector()
- execution_record = collector.collect_execution_record(cache_key, input_info)
- collector.save_execution_record(cache_key, execution_record)
- return execution_record
- if __name__ == "__main__":
- # 测试
- import time
-
- cache_key = "测试问题||无||测试人设"
- input_info = {
- "question": "测试问题",
- "post_info": "无",
- "persona_info": "测试人设",
- "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
- }
-
- # 注意:测试时需要确保缓存目录有数据,否则会返回空记录
- try:
- record = collect_and_save_execution_record(cache_key, input_info)
- print(json.dumps(record, ensure_ascii=False, indent=2))
- except Exception as e:
- print(f"Test failed: {e}")
|