# 聚合query.json中的数据,生成所有可能的组合并存储到数据库 # 参考set_querys.py的逻辑 import json import os import sys from datetime import datetime # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from loguru import logger def load_query_json(): """加载query.json文件""" try: query_file_path = os.path.join(os.path.dirname(__file__), "query.json") with open(query_file_path, 'r', encoding='utf-8') as f: data = json.load(f) logger.info("成功加载query.json文件") return data except Exception as e: logger.error(f"加载query.json文件失败: {e}") return None def generate_combinations(query_data): """生成所有可能的组合""" combinations = [] # 获取query.json中的各种类型 content_formats = [item["name"] for item in query_data.get("content_format", [])] stages = [item["name"] for item in query_data.get("stage", [])] content_types = [item["name"] for item in query_data.get("content_type", [])] logger.info(f"发现 {len(content_formats)} 种内容格式: {content_formats}") logger.info(f"发现 {len(stages)} 个阶段: {stages}") logger.info(f"发现 {len(content_types)} 种内容类型: {content_types}") # 生成所有可能的组合 for content_format in content_formats: for stage in stages: for content_type in content_types: # 生成拼接的字符串 query_word = f"{content_format}{stage}{content_type}" # 创建JSON结构 combination = { "content_format": content_format, "stage": stage, "content_type": content_type, "query_word": query_word, "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } combinations.append(combination) logger.info(f"生成了 {len(combinations)} 个组合") return combinations def save_to_database(combinations): """将组合结果保存到knowledge_content_query表中""" try: # 先清空表(可选,根据需求决定) # clear_sql = "DELETE FROM knowledge_content_query" # MysqlHelper.update_values(clear_sql) # 插入数据的SQL语句 insert_sql = """ INSERT INTO knowledge_content_query (stage, content_type, content_format, query_word, status, create_time) VALUES (%s, %s, %s, %s, %s, %s) """ success_count = 0 total_count = len(combinations) logger.info(f"开始插入 {total_count} 条记录到数据库...") for i, combo in enumerate(combinations, 1): params = ( combo["stage"], combo["content_type"], combo["content_format"], combo["query_word"], 0, # status设为0 combo["create_time"] ) result = MysqlHelper.update_values(insert_sql, params) if result is not None: success_count += 1 # 每插入100条记录输出一次进度 if i % 100 == 0: logger.info(f"已插入 {i}/{total_count} 条记录") else: logger.error(f"插入失败: {combo}") logger.info(f"成功插入 {success_count}/{total_count} 条记录到数据库") return success_count except Exception as e: logger.error(f"保存到数据库失败: {e}") return 0 def print_combinations(combinations): """打印所有组合结果""" try: print(f"\n=== 生成了 {len(combinations)} 个组合 ===\n") # 打印前10个组合的JSON结构作为示例 for i, combo in enumerate(combinations[:10], 1): print(f"{i:3d}. {json.dumps(combo, ensure_ascii=False, indent=2)}") if len(combinations) > 10: print(f"... 还有 {len(combinations) - 10} 个组合") print(f"\n=== 总共 {len(combinations)} 个组合 ===") return True except Exception as e: logger.error(f"打印组合结果失败: {e}") return False def print_statistics(combinations): """打印统计信息""" try: print(f"\n=== 统计信息 ===") # 按内容格式统计 content_format_stats = {} stage_stats = {} content_type_stats = {} for combo in combinations: content_format_stats[combo["content_format"]] = content_format_stats.get(combo["content_format"], 0) + 1 stage_stats[combo["stage"]] = stage_stats.get(combo["stage"], 0) + 1 content_type_stats[combo["content_type"]] = content_type_stats.get(combo["content_type"], 0) + 1 print(f"内容格式分布:") for fmt, count in content_format_stats.items(): print(f" {fmt}: {count} 个组合") print(f"\n阶段分布:") for stage, count in stage_stats.items(): print(f" {stage}: {count} 个组合") print(f"\n内容类型分布:") for ctype, count in content_type_stats.items(): print(f" {ctype}: {count} 个组合") return True except Exception as e: logger.error(f"打印统计信息失败: {e}") return False def main(): """主函数""" logger.info("开始执行query.json聚合") # 1. 加载query.json文件 query_data = load_query_json() if not query_data: logger.error("无法加载query.json文件,程序退出") return # 2. 生成组合 combinations = generate_combinations(query_data) # 3. 打印组合结果示例 if print_combinations(combinations): logger.info("组合生成成功") # 4. 打印统计信息 print_statistics(combinations) # 5. 保存到数据库 logger.info("开始保存到数据库...") saved_count = save_to_database(combinations) if saved_count > 0: logger.info(f"程序执行成功完成,共保存 {saved_count} 条记录到数据库") else: logger.error("数据库保存失败") else: logger.error("程序执行失败") if __name__ == "__main__": main()