aggregate_queries.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # 聚合query.json中的数据,生成所有可能的组合并存储到数据库
  2. # 参考set_querys.py的逻辑
  3. import json
  4. import os
  5. import sys
  6. from datetime import datetime
  7. # 添加项目根目录到Python路径
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from utils.mysql_db import MysqlHelper
  10. from loguru import logger
  11. def load_query_json():
  12. """加载query.json文件"""
  13. try:
  14. query_file_path = os.path.join(os.path.dirname(__file__), "query.json")
  15. with open(query_file_path, 'r', encoding='utf-8') as f:
  16. data = json.load(f)
  17. logger.info("成功加载query.json文件")
  18. return data
  19. except Exception as e:
  20. logger.error(f"加载query.json文件失败: {e}")
  21. return None
  22. def generate_combinations(query_data):
  23. """生成所有可能的组合"""
  24. combinations = []
  25. # 获取query.json中的各种类型
  26. content_formats = [item["name"] for item in query_data.get("content_format", [])]
  27. stages = [item["name"] for item in query_data.get("stage", [])]
  28. content_types = [item["name"] for item in query_data.get("content_type", [])]
  29. logger.info(f"发现 {len(content_formats)} 种内容格式: {content_formats}")
  30. logger.info(f"发现 {len(stages)} 个阶段: {stages}")
  31. logger.info(f"发现 {len(content_types)} 种内容类型: {content_types}")
  32. # 生成所有可能的组合
  33. for content_format in content_formats:
  34. for stage in stages:
  35. for content_type in content_types:
  36. # 生成拼接的字符串
  37. query_word = f"{content_format}{stage}{content_type}"
  38. # 创建JSON结构
  39. combination = {
  40. "content_format": content_format,
  41. "stage": stage,
  42. "content_type": content_type,
  43. "query_word": query_word,
  44. "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  45. }
  46. combinations.append(combination)
  47. logger.info(f"生成了 {len(combinations)} 个组合")
  48. return combinations
  49. def save_to_database(combinations):
  50. """将组合结果保存到knowledge_content_query表中"""
  51. try:
  52. # 先清空表(可选,根据需求决定)
  53. # clear_sql = "DELETE FROM knowledge_content_query"
  54. # MysqlHelper.update_values(clear_sql)
  55. # 插入数据的SQL语句
  56. insert_sql = """
  57. INSERT INTO knowledge_content_query
  58. (stage, content_type, content_format, query_word, status, create_time)
  59. VALUES (%s, %s, %s, %s, %s, %s)
  60. """
  61. success_count = 0
  62. total_count = len(combinations)
  63. logger.info(f"开始插入 {total_count} 条记录到数据库...")
  64. for i, combo in enumerate(combinations, 1):
  65. params = (
  66. combo["stage"],
  67. combo["content_type"],
  68. combo["content_format"],
  69. combo["query_word"],
  70. 0, # status设为0
  71. combo["create_time"]
  72. )
  73. result = MysqlHelper.update_values(insert_sql, params)
  74. if result is not None:
  75. success_count += 1
  76. # 每插入100条记录输出一次进度
  77. if i % 100 == 0:
  78. logger.info(f"已插入 {i}/{total_count} 条记录")
  79. else:
  80. logger.error(f"插入失败: {combo}")
  81. logger.info(f"成功插入 {success_count}/{total_count} 条记录到数据库")
  82. return success_count
  83. except Exception as e:
  84. logger.error(f"保存到数据库失败: {e}")
  85. return 0
  86. def print_combinations(combinations):
  87. """打印所有组合结果"""
  88. try:
  89. print(f"\n=== 生成了 {len(combinations)} 个组合 ===\n")
  90. # 打印前10个组合的JSON结构作为示例
  91. for i, combo in enumerate(combinations[:10], 1):
  92. print(f"{i:3d}. {json.dumps(combo, ensure_ascii=False, indent=2)}")
  93. if len(combinations) > 10:
  94. print(f"... 还有 {len(combinations) - 10} 个组合")
  95. print(f"\n=== 总共 {len(combinations)} 个组合 ===")
  96. return True
  97. except Exception as e:
  98. logger.error(f"打印组合结果失败: {e}")
  99. return False
  100. def print_statistics(combinations):
  101. """打印统计信息"""
  102. try:
  103. print(f"\n=== 统计信息 ===")
  104. # 按内容格式统计
  105. content_format_stats = {}
  106. stage_stats = {}
  107. content_type_stats = {}
  108. for combo in combinations:
  109. content_format_stats[combo["content_format"]] = content_format_stats.get(combo["content_format"], 0) + 1
  110. stage_stats[combo["stage"]] = stage_stats.get(combo["stage"], 0) + 1
  111. content_type_stats[combo["content_type"]] = content_type_stats.get(combo["content_type"], 0) + 1
  112. print(f"内容格式分布:")
  113. for fmt, count in content_format_stats.items():
  114. print(f" {fmt}: {count} 个组合")
  115. print(f"\n阶段分布:")
  116. for stage, count in stage_stats.items():
  117. print(f" {stage}: {count} 个组合")
  118. print(f"\n内容类型分布:")
  119. for ctype, count in content_type_stats.items():
  120. print(f" {ctype}: {count} 个组合")
  121. return True
  122. except Exception as e:
  123. logger.error(f"打印统计信息失败: {e}")
  124. return False
  125. def main():
  126. """主函数"""
  127. logger.info("开始执行query.json聚合")
  128. # 1. 加载query.json文件
  129. query_data = load_query_json()
  130. if not query_data:
  131. logger.error("无法加载query.json文件,程序退出")
  132. return
  133. # 2. 生成组合
  134. combinations = generate_combinations(query_data)
  135. # 3. 打印组合结果示例
  136. if print_combinations(combinations):
  137. logger.info("组合生成成功")
  138. # 4. 打印统计信息
  139. print_statistics(combinations)
  140. # 5. 保存到数据库
  141. logger.info("开始保存到数据库...")
  142. saved_count = save_to_database(combinations)
  143. if saved_count > 0:
  144. logger.info(f"程序执行成功完成,共保存 {saved_count} 条记录到数据库")
  145. else:
  146. logger.error("数据库保存失败")
  147. else:
  148. logger.error("程序执行失败")
  149. if __name__ == "__main__":
  150. main()