execution_collector.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """
  2. 执行记录收集器
  3. 从各个模块的缓存目录收集执行详情,汇总成完整的execution_record.json
  4. """
  5. import os
  6. import json
  7. import hashlib
  8. from typing import Dict, Any, List
  9. from loguru import logger
  10. class ExecutionCollector:
  11. """执行记录收集器"""
  12. def __init__(self, base_cache_dir: str = None):
  13. """
  14. 初始化
  15. Args:
  16. base_cache_dir: 缓存基础目录,默认为当前目录下的.cache
  17. """
  18. if base_cache_dir is None:
  19. current_dir = os.path.dirname(os.path.abspath(__file__))
  20. base_cache_dir = os.path.join(current_dir, '.cache')
  21. self.base_cache_dir = base_cache_dir
  22. def collect_execution_record(self, cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]:
  23. """
  24. 收集完整的执行记录
  25. Args:
  26. cache_key: 缓存键(通常是combined_question)
  27. input_info: 输入信息 {"question": ..., "post_info": ..., "persona_info": ...}
  28. Returns:
  29. dict: 完整的执行记录
  30. """
  31. logger.info("=" * 60)
  32. logger.info("开始收集执行记录...")
  33. # 计算hash
  34. question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12]
  35. cache_dir = os.path.join(self.base_cache_dir, question_hash)
  36. if not os.path.exists(cache_dir):
  37. logger.warning(f"缓存目录不存在: {cache_dir}")
  38. return self._create_empty_record(input_info)
  39. # 初始化执行记录
  40. execution_record = {
  41. "input": input_info,
  42. "execution": {},
  43. "output": {
  44. "result": None,
  45. },
  46. "metadata": {
  47. "execution_time": input_info.get("execution_time", 0),
  48. "cache_hits": [],
  49. "errors": []
  50. }
  51. }
  52. try:
  53. # 1. Generate Query
  54. query_detail = self._read_json(cache_dir, 'function_knowledge', 'generated_query.json')
  55. if query_detail:
  56. execution_record["execution"]["generate_query"] = query_detail
  57. if query_detail.get("cached"):
  58. execution_record["metadata"]["cache_hits"].append("generate_query")
  59. # 2. Select Tool
  60. tool_detail = self._read_json(cache_dir, 'function_knowledge', 'selected_tool.json')
  61. if tool_detail:
  62. execution_record["execution"]["select_tool"] = tool_detail
  63. if tool_detail.get("cached"):
  64. execution_record["metadata"]["cache_hits"].append("select_tool")
  65. # 3. Check for Search or Tool Call
  66. tool_call_detail = self._read_json(cache_dir, 'function_knowledge', 'tool_call.json')
  67. if tool_call_detail:
  68. # Flow A: Tool Call
  69. # Extract Params
  70. params_detail = self._read_json(cache_dir, 'function_knowledge', 'extracted_params.json')
  71. if params_detail:
  72. execution_record["execution"]["extract_params"] = params_detail
  73. if params_detail.get("cached"):
  74. execution_record["metadata"]["cache_hits"].append("extract_params")
  75. # Tool Call
  76. execution_record["execution"]["tool_call"] = tool_call_detail
  77. if tool_call_detail.get("cached"):
  78. execution_record["metadata"]["cache_hits"].append("tool_call")
  79. # Result
  80. execution_record["output"]["result"] = tool_call_detail.get("result", "")
  81. execution_record["output"]["tool"] = tool_call_detail.get("tool_name", "")
  82. else:
  83. # Flow B: Search (Multi/LLM)
  84. search_detail = self._collect_search_detail(cache_dir)
  85. if search_detail:
  86. execution_record["execution"]["knowledge_search"] = search_detail
  87. # Result
  88. # merged_knowledge_detail from multi_search usually contains final response
  89. merged_detail = search_detail.get("multi_search_merge")
  90. if merged_detail:
  91. execution_record["result"]["type"] = "knowledge_search"
  92. response = merged_detail.get("response", "")
  93. execution_record["result"]["content"] = response
  94. execution_record["result"]["success"] = True
  95. if merged_detail.get("cached"):
  96. execution_record["metadata"]["cache_hits"].append("multi_search_merge")
  97. # Clean up metadata
  98. # execution_time is retrieved from input_info if provided
  99. logger.info("✓ 执行记录收集完成")
  100. logger.info("=" * 60)
  101. except Exception as e:
  102. logger.error(f"✗ 收集执行记录失败: {e}")
  103. execution_record["metadata"]["errors"].append(str(e))
  104. return execution_record
  105. def _read_json(self, base_dir: str, *paths) -> Dict[str, Any]:
  106. """读取JSON文件"""
  107. file_path = os.path.join(base_dir, *paths)
  108. if os.path.exists(file_path):
  109. try:
  110. with open(file_path, 'r', encoding='utf-8') as f:
  111. return json.load(f)
  112. except Exception as e:
  113. logger.warning(f"读取JSON文件失败 {file_path}: {e}")
  114. return None
  115. def _collect_search_detail(self, cache_dir: str) -> Dict[str, Any]:
  116. """收集搜索流程详情"""
  117. search_detail = {}
  118. # 1. LLM Search
  119. llm_detail = {}
  120. # Generated Queries
  121. queries_detail = self._read_json(cache_dir, 'llm_search', 'generated_queries.json')
  122. if queries_detail:
  123. llm_detail["generated_queries"] = queries_detail
  124. # Search Results
  125. # Search for search_result_XXX.json
  126. search_results = []
  127. llm_search_dir = os.path.join(cache_dir, 'llm_search', 'search_results')
  128. if os.path.exists(llm_search_dir):
  129. for filename in sorted(os.listdir(llm_search_dir)):
  130. if filename.endswith('.json') and filename.startswith('search_result_'):
  131. res = self._read_json(llm_search_dir, filename)
  132. if res:
  133. search_results.append(res)
  134. if search_results:
  135. llm_detail["search_results"] = search_results
  136. # Merge
  137. merge_detail_llm = self._read_json(cache_dir, 'llm_search', 'merged_knowledge_detail.json')
  138. if merge_detail_llm:
  139. llm_detail["merge"] = merge_detail_llm
  140. if llm_detail:
  141. search_detail["llm_search"] = llm_detail
  142. # 2. Multi Search Merge
  143. merge_detail_multi = self._read_json(cache_dir, 'multi_search', 'merged_knowledge_detail.json')
  144. if merge_detail_multi:
  145. search_detail["multi_search_merge"] = merge_detail_multi
  146. return search_detail if search_detail else None
  147. def _create_empty_record(self, input_info: Dict[str, Any]) -> Dict[str, Any]:
  148. """创建空的执行记录"""
  149. return {
  150. "input": input_info,
  151. "execution": {},
  152. "result": {
  153. "type": "error",
  154. "content": "缓存目录不存在",
  155. "raw_data": None
  156. },
  157. "metadata": {
  158. "execution_time": 0,
  159. "cache_hits": [],
  160. "errors": ["缓存目录不存在"]
  161. }
  162. }
  163. def save_execution_record(self, cache_key: str, execution_record: Dict[str, Any]) -> str:
  164. """
  165. 保存执行记录到文件
  166. Args:
  167. cache_key: 缓存键
  168. execution_record: 执行记录
  169. Returns:
  170. str: 保存的文件路径
  171. """
  172. question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12]
  173. cache_dir = os.path.join(self.base_cache_dir, question_hash)
  174. os.makedirs(cache_dir, exist_ok=True)
  175. output_file = os.path.join(cache_dir, 'execution_record.json')
  176. try:
  177. with open(output_file, 'w', encoding='utf-8') as f:
  178. json.dump(execution_record, f, ensure_ascii=False, indent=2)
  179. logger.info(f"✓ 执行记录已保存: {output_file}")
  180. return output_file
  181. except Exception as e:
  182. logger.error(f"✗ 保存执行记录失败: {e}")
  183. raise
  184. def collect_and_save_execution_record(cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]:
  185. """
  186. 便捷函数:收集并保存执行记录
  187. Args:
  188. cache_key: 缓存键
  189. input_info: 输入信息
  190. Returns:
  191. dict: 完整的执行记录
  192. """
  193. collector = ExecutionCollector()
  194. execution_record = collector.collect_execution_record(cache_key, input_info)
  195. collector.save_execution_record(cache_key, execution_record)
  196. return execution_record
  197. if __name__ == "__main__":
  198. # 测试
  199. import time
  200. cache_key = "测试问题||无||测试人设"
  201. input_info = {
  202. "question": "测试问题",
  203. "post_info": "无",
  204. "persona_info": "测试人设",
  205. "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
  206. }
  207. # 注意:测试时需要确保缓存目录有数据,否则会返回空记录
  208. try:
  209. record = collect_and_save_execution_record(cache_key, input_info)
  210. print(json.dumps(record, ensure_ascii=False, indent=2))
  211. except Exception as e:
  212. print(f"Test failed: {e}")