aggregate_queries.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # 聚合query.json中的数据,生成所有可能的组合并存储到数据库
  2. # 参考set_querys.py的逻辑
  3. import json
  4. import os
  5. import sys
  6. from datetime import datetime
  7. # 添加项目根目录到Python路径
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from utils.mysql_db import MysqlHelper
  10. from loguru import logger
  11. def load_query_json():
  12. """加载query.json文件"""
  13. try:
  14. query_file_path = os.path.join(os.path.dirname(__file__), "query.json")
  15. with open(query_file_path, 'r', encoding='utf-8') as f:
  16. data = json.load(f)
  17. logger.info("成功加载query.json文件")
  18. return data
  19. except Exception as e:
  20. logger.error(f"加载query.json文件失败: {e}")
  21. return None
  22. def generate_combinations(query_data):
  23. """生成所有可能的组合"""
  24. combinations = []
  25. # 获取query.json中的各种类型
  26. content_formats = [item["name"] for item in query_data.get("content_format", [])]
  27. stages = [item["name"] for item in query_data.get("stage", [])]
  28. content_types = [item["name"] for item in query_data.get("content_type", [])]
  29. logger.info(f"发现 {len(content_formats)} 种内容格式: {content_formats}")
  30. logger.info(f"发现 {len(stages)} 个阶段: {stages}")
  31. logger.info(f"发现 {len(content_types)} 种内容类型: {content_types}")
  32. # 生成所有可能的组合
  33. for content_format in content_formats:
  34. for stage in stages:
  35. for content_type in content_types:
  36. # 生成拼接的字符串
  37. query_word = f"{content_format}{stage}{content_type}"
  38. # 创建JSON结构
  39. combination = {
  40. "content_format": content_format,
  41. "stage": stage,
  42. "content_type": content_type,
  43. "query_word": query_word,
  44. "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  45. }
  46. combinations.append(combination)
  47. logger.info(f"生成了 {len(combinations)} 个组合")
  48. return combinations
  49. def save_to_database(combinations):
  50. """将组合结果保存到knowledge_content_query表中"""
  51. try:
  52. # 先清空表(可选,根据需求决定)
  53. # clear_sql = "DELETE FROM knowledge_content_query"
  54. # MysqlHelper.update_values(clear_sql)
  55. # 插入数据的SQL语句,允许重复记录
  56. insert_sql = """
  57. INSERT INTO knowledge_content_query
  58. (stage, content_type, content_format, query_word, status, channel, create_time)
  59. VALUES (%s, %s, %s, %s, %s, %s, %s)
  60. """
  61. success_count = 0
  62. total_count = len(combinations)
  63. logger.info(f"开始插入 {total_count} 条记录到数据库...")
  64. for i, combo in enumerate(combinations, 1):
  65. params = (
  66. combo["stage"],
  67. combo["content_type"],
  68. combo["content_format"],
  69. combo["query_word"],
  70. 1, # 1: 未处理 0: 已处理
  71. 2, # 1: 小红书 2: 抖音
  72. combo["create_time"]
  73. )
  74. print(params)
  75. result = MysqlHelper.update_values(insert_sql, params)
  76. if result is not None:
  77. success_count += 1
  78. # 每插入100条记录输出一次进度
  79. if i % 100 == 0:
  80. logger.info(f"已插入 {i}/{total_count} 条记录")
  81. else:
  82. logger.error(f"插入失败: {combo}")
  83. logger.info(f"成功插入 {success_count}/{total_count} 条记录到数据库")
  84. return success_count
  85. except Exception as e:
  86. logger.error(f"保存到数据库失败: {e}")
  87. return 0
  88. def print_combinations(combinations):
  89. """打印所有组合结果"""
  90. try:
  91. print(f"\n=== 生成了 {len(combinations)} 个组合 ===\n")
  92. # 打印前10个组合的JSON结构作为示例
  93. for i, combo in enumerate(combinations[:10], 1):
  94. print(f"{i:3d}. {json.dumps(combo, ensure_ascii=False, indent=2)}")
  95. if len(combinations) > 10:
  96. print(f"... 还有 {len(combinations) - 10} 个组合")
  97. print(f"\n=== 总共 {len(combinations)} 个组合 ===")
  98. return True
  99. except Exception as e:
  100. logger.error(f"打印组合结果失败: {e}")
  101. return False
  102. def print_statistics(combinations):
  103. """打印统计信息"""
  104. try:
  105. print(f"\n=== 统计信息 ===")
  106. # 按内容格式统计
  107. content_format_stats = {}
  108. stage_stats = {}
  109. content_type_stats = {}
  110. for combo in combinations:
  111. content_format_stats[combo["content_format"]] = content_format_stats.get(combo["content_format"], 0) + 1
  112. stage_stats[combo["stage"]] = stage_stats.get(combo["stage"], 0) + 1
  113. content_type_stats[combo["content_type"]] = content_type_stats.get(combo["content_type"], 0) + 1
  114. print(f"内容格式分布:")
  115. for fmt, count in content_format_stats.items():
  116. print(f" {fmt}: {count} 个组合")
  117. print(f"\n阶段分布:")
  118. for stage, count in stage_stats.items():
  119. print(f" {stage}: {count} 个组合")
  120. print(f"\n内容类型分布:")
  121. for ctype, count in content_type_stats.items():
  122. print(f" {ctype}: {count} 个组合")
  123. return True
  124. except Exception as e:
  125. logger.error(f"打印统计信息失败: {e}")
  126. return False
  127. def main():
  128. """主函数"""
  129. logger.info("开始执行query.json聚合")
  130. # 1. 加载query.json文件
  131. query_data = load_query_json()
  132. if not query_data:
  133. logger.error("无法加载query.json文件,程序退出")
  134. return
  135. # 2. 生成组合
  136. combinations = generate_combinations(query_data)
  137. # 3. 打印组合结果示例
  138. if print_combinations(combinations):
  139. logger.info("组合生成成功")
  140. # 4. 打印统计信息
  141. print_statistics(combinations)
  142. # 5. 保存到数据库
  143. logger.info("开始保存到数据库...")
  144. saved_count = save_to_database(combinations)
  145. if saved_count > 0:
  146. logger.info(f"程序执行成功完成,共保存 {saved_count} 条记录到数据库")
  147. else:
  148. logger.error("数据库保存失败")
  149. else:
  150. logger.error("程序执行失败")
  151. if __name__ == "__main__":
  152. main()