execution_collector.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. """
  2. 执行记录收集器
  3. 从各个模块的缓存目录收集执行详情,汇总成完整的execution_record.json
  4. """
  5. import os
  6. import json
  7. import hashlib
  8. from typing import Dict, Any, List
  9. from loguru import logger
  10. class ExecutionCollector:
  11. """执行记录收集器"""
  12. def __init__(self, base_cache_dir: str = None):
  13. """
  14. 初始化
  15. Args:
  16. base_cache_dir: 缓存基础目录,默认为当前目录下的.cache
  17. """
  18. if base_cache_dir is None:
  19. current_dir = os.path.dirname(os.path.abspath(__file__))
  20. base_cache_dir = os.path.join(current_dir, '.cache')
  21. self.base_cache_dir = base_cache_dir
  22. def collect_execution_record(self, cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]:
  23. """
  24. 收集完整的执行记录
  25. Args:
  26. cache_key: 缓存键(通常是combined_question)
  27. input_info: 输入信息 {"question": ..., "post_info": ..., "persona_info": ...}
  28. Returns:
  29. dict: 完整的执行记录
  30. """
  31. logger.info("=" * 60)
  32. logger.info("开始收集执行记录...")
  33. # 计算hash
  34. question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12]
  35. cache_dir = os.path.join(self.base_cache_dir, question_hash)
  36. if not os.path.exists(cache_dir):
  37. logger.warning(f"缓存目录不存在: {cache_dir}")
  38. return self._create_empty_record(input_info)
  39. # 初始化执行记录
  40. execution_record = {
  41. "input": input_info,
  42. "execution": {
  43. "modules": {}
  44. },
  45. "result": {
  46. "type": None,
  47. "content": None,
  48. "raw_data": None
  49. },
  50. "metadata": {
  51. "execution_time": 0,
  52. "cache_hits": [],
  53. "errors": []
  54. }
  55. }
  56. # 收集各模块的执行详情
  57. try:
  58. # 1. 收集 function_knowledge 的详情
  59. function_detail = self._collect_function_knowledge_detail(cache_dir)
  60. if function_detail:
  61. execution_record["execution"]["modules"]["function_knowledge"] = function_detail
  62. # 2. 收集 multi_search 的详情
  63. multi_detail = self._collect_multi_search_detail(cache_dir)
  64. if multi_detail:
  65. execution_record["execution"]["modules"]["multi_search"] = multi_detail
  66. # 3. 收集 llm_search 的详情
  67. llm_detail = self._collect_llm_search_detail(cache_dir)
  68. if llm_detail:
  69. execution_record["execution"]["modules"]["llm_search"] = llm_detail
  70. # 4.设置结果信息
  71. result_detail = self._collect_result_detail(cache_dir)
  72. if result_detail:
  73. execution_record["result"] = result_detail
  74. # 5. 计算总结信息
  75. self._calculate_summary(execution_record)
  76. logger.info("✓ 执行记录收集完成")
  77. logger.info("=" * 60)
  78. except Exception as e:
  79. logger.error(f"✗ 收集执行记录失败: {e}")
  80. execution_record["metadata"]["errors"].append(str(e))
  81. return execution_record
  82. def _collect_function_knowledge_detail(self, cache_dir: str) -> Dict[str, Any]:
  83. """收集function_knowledge模块的详情"""
  84. detail_file = os.path.join(cache_dir, 'function_knowledge', 'execution_detail.json')
  85. if os.path.exists(detail_file):
  86. try:
  87. with open(detail_file, 'r', encoding='utf-8') as f:
  88. detail = json.load(f)
  89. logger.info(" ✓ 收集 function_knowledge 详情")
  90. return detail
  91. except Exception as e:
  92. logger.error(f" ✗ 读取 function_knowledge 详情失败: {e}")
  93. return None
  94. def _collect_multi_search_detail(self, cache_dir: str) -> Dict[str, Any]:
  95. """收集multi_search模块的详情"""
  96. detail_file = os.path.join(cache_dir, 'multi_search', 'execution_detail.json')
  97. if os.path.exists(detail_file):
  98. try:
  99. with open(detail_file, 'r', encoding='utf-8') as f:
  100. detail = json.load(f)
  101. logger.info(" ✓ 收集 multi_search 详情")
  102. return detail
  103. except Exception as e:
  104. logger.error(f" ✗ 读取 multi_search 详情失败: {e}")
  105. return None
  106. def _collect_llm_search_detail(self, cache_dir: str) -> Dict[str, Any]:
  107. """收集llm_search模块的详情"""
  108. detail_file = os.path.join(cache_dir, 'llm_search', 'execution_detail.json')
  109. if os.path.exists(detail_file):
  110. try:
  111. with open(detail_file, 'r', encoding='utf-8') as f:
  112. detail = json.load(f)
  113. logger.info(" ✓ 收集 llm_search 详情")
  114. return detail
  115. except Exception as e:
  116. logger.error(f" ✗ 读取 llm_search 详情失败: {e}")
  117. return None
  118. def _collect_result_detail(self, cache_dir: str) -> Dict[str, Any]:
  119. """收集result模块的详情"""
  120. detail_file = os.path.join(cache_dir, 'function_knowledge', 'tool_result.json')
  121. if os.path.exists(detail_file):
  122. try:
  123. with open(detail_file, 'r', encoding='utf-8') as f:
  124. detail = json.load(f)
  125. logger.info(" ✓ 收集 result 详情")
  126. return detail
  127. except Exception as e:
  128. logger.error(f" ✗ 读取 result 详情失败: {e}")
  129. return None
  130. def _calculate_summary(self, execution_record: Dict[str, Any]):
  131. """计算总结信息"""
  132. total_time = 0
  133. cache_hits = []
  134. # 遍历所有模块
  135. for module_name, module_detail in execution_record["execution"]["modules"].items():
  136. if "execution_time" in module_detail:
  137. total_time += module_detail["execution_time"]
  138. if "cache_hits" in module_detail:
  139. cache_hits.extend([f"{module_name}/{hit}" for hit in module_detail["cache_hits"]])
  140. execution_record["metadata"]["execution_time"] = total_time
  141. execution_record["metadata"]["cache_hits"] = cache_hits
  142. def _create_empty_record(self, input_info: Dict[str, Any]) -> Dict[str, Any]:
  143. """创建空的执行记录"""
  144. return {
  145. "input": input_info,
  146. "execution": {
  147. "steps": [],
  148. "modules": {}
  149. },
  150. "result": {
  151. "type": "error",
  152. "content": "缓存目录不存在",
  153. "raw_data": None
  154. },
  155. "metadata": {
  156. "execution_time": 0,
  157. "cache_hits": [],
  158. "errors": ["缓存目录不存在"]
  159. }
  160. }
  161. def save_execution_record(self, cache_key: str, execution_record: Dict[str, Any]) -> str:
  162. """
  163. 保存执行记录到文件
  164. Args:
  165. cache_key: 缓存键
  166. execution_record: 执行记录
  167. Returns:
  168. str: 保存的文件路径
  169. """
  170. question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12]
  171. cache_dir = os.path.join(self.base_cache_dir, question_hash)
  172. os.makedirs(cache_dir, exist_ok=True)
  173. output_file = os.path.join(cache_dir, 'execution_record.json')
  174. try:
  175. with open(output_file, 'w', encoding='utf-8') as f:
  176. json.dump(execution_record, f, ensure_ascii=False, indent=2)
  177. logger.info(f"✓ 执行记录已保存: {output_file}")
  178. return output_file
  179. except Exception as e:
  180. logger.error(f"✗ 保存执行记录失败: {e}")
  181. raise
  182. def collect_and_save_execution_record(cache_key: str, input_info: Dict[str, Any]) -> Dict[str, Any]:
  183. """
  184. 便捷函数:收集并保存执行记录
  185. Args:
  186. cache_key: 缓存键
  187. input_info: 输入信息
  188. Returns:
  189. dict: 完整的执行记录
  190. """
  191. collector = ExecutionCollector()
  192. execution_record = collector.collect_execution_record(cache_key, input_info)
  193. collector.save_execution_record(cache_key, execution_record)
  194. return execution_record
  195. if __name__ == "__main__":
  196. # 测试
  197. import time
  198. cache_key = "测试问题||无||测试人设"
  199. input_info = {
  200. "question": "测试问题",
  201. "post_info": "无",
  202. "persona_info": "测试人设",
  203. "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
  204. }
  205. record = collect_and_save_execution_record(cache_key, input_info)
  206. print(json.dumps(record, ensure_ascii=False, indent=2))