| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 为关联特征生成检索词并去重
- 读取 associated_tags_results.json,为每个特征生成组合检索词,
- 并在同一结果项内去重。
- """
- import json
- import logging
- from pathlib import Path
- from typing import Dict, List, Any, Set
- import argparse
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- logger = logging.getLogger(__name__)
- class SearchWordGenerator:
- """检索词生成器"""
- def __init__(self, input_path: str):
- """
- 初始化生成器
- Args:
- input_path: 输入JSON文件路径
- """
- self.input_path = input_path
- self.data = self._load_json(input_path)
- self.stats = {
- '处理的结果项数': 0,
- '生成的总组合词数': 0,
- '唯一组合词数': 0,
- '重复过滤的词数': 0,
- '每项详情': []
- }
- def _load_json(self, file_path: str) -> List[Dict]:
- """加载JSON文件"""
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"加载文件 {file_path} 失败: {e}")
- raise
- def generate_search_words(self) -> List[Dict[str, Any]]:
- """
- 为所有结果项生成检索词
- Returns:
- 增强后的数据列表
- """
- logger.info("=" * 60)
- logger.info("开始生成检索词")
- logger.info("=" * 60)
- enhanced_data = []
- for idx, result in enumerate(self.data, 1):
- logger.info(f"\n处理第 {idx}/{len(self.data)} 个结果项")
- # 获取基础词(人设特征名称)
- base_word = result.get('最高匹配信息', {}).get('人设特征名称', '')
- original_feature = result.get('原始特征名称', '')
- logger.info(f" 原始特征: {original_feature}")
- logger.info(f" 人设特征名称(基础词): {base_word}")
- if not base_word:
- logger.warning(f" 警告:未找到人设特征名称,跳过")
- enhanced_data.append(result)
- continue
- # 用于去重的集合(在当前结果项范围内)
- seen_words: Set[str] = set()
- item_stats = {
- '原始特征': original_feature,
- '人设特征名称': base_word,
- '总特征数': 0,
- '唯一组合词数': 0,
- '重复词数': 0,
- '组合词列表': []
- }
- # 遍历所有关联
- associations = result.get('找到的关联', [])
- for assoc_idx, assoc in enumerate(associations):
- target_path = assoc.get('目标分类路径', '')
- features = assoc.get('特征列表', [])
- logger.info(f" 处理关联 {assoc_idx + 1}/{len(associations)}: {target_path}")
- logger.info(f" 特征数: {len(features)}")
- # 遍历特征列表
- for feature in features:
- feature_name = feature.get('特征名称', '')
- item_stats['总特征数'] += 1
- if not feature_name:
- feature['search_word'] = None
- continue
- # 生成组合词
- search_word = f"{base_word} {feature_name}"
- # 检查是否重复
- if search_word not in seen_words:
- # 首次出现,填充
- feature['search_word'] = search_word
- seen_words.add(search_word)
- item_stats['唯一组合词数'] += 1
- item_stats['组合词列表'].append(search_word)
- logger.debug(f" + 新增: {search_word}")
- else:
- # 重复,留空
- feature['search_word'] = None
- item_stats['重复词数'] += 1
- logger.debug(f" - 重复(留空): {search_word}")
- # 记录统计
- logger.info(f" 完成:总特征 {item_stats['总特征数']} 个,"
- f"唯一组合词 {item_stats['唯一组合词数']} 个,"
- f"重复 {item_stats['重复词数']} 个")
- self.stats['处理的结果项数'] += 1
- self.stats['生成的总组合词数'] += item_stats['总特征数']
- self.stats['唯一组合词数'] += item_stats['唯一组合词数']
- self.stats['重复过滤的词数'] += item_stats['重复词数']
- self.stats['每项详情'].append(item_stats)
- enhanced_data.append(result)
- logger.info("\n" + "=" * 60)
- logger.info("生成完成")
- logger.info("=" * 60)
- logger.info(f"处理的结果项数: {self.stats['处理的结果项数']}")
- logger.info(f"生成的总组合词数: {self.stats['生成的总组合词数']}")
- logger.info(f"唯一组合词数: {self.stats['唯一组合词数']}")
- logger.info(f"重复过滤的词数: {self.stats['重复过滤的词数']}")
- return enhanced_data
- def save_results(self, enhanced_data: List[Dict[str, Any]], output_path: str):
- """保存增强后的数据"""
- try:
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(enhanced_data, f, ensure_ascii=False, indent=2)
- logger.info(f"增强数据已保存到: {output_path}")
- except Exception as e:
- logger.error(f"保存结果失败: {e}")
- raise
- def save_stats(self, stats_path: str):
- """保存统计信息"""
- try:
- with open(stats_path, 'w', encoding='utf-8') as f:
- json.dump(self.stats, f, ensure_ascii=False, indent=2)
- logger.info(f"统计信息已保存到: {stats_path}")
- except Exception as e:
- logger.error(f"保存统计信息失败: {e}")
- raise
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='为关联特征生成检索词并去重')
- parser.add_argument(
- '--input',
- default='associated_tags_results.json',
- help='输入JSON文件路径(默认: associated_tags_results.json)'
- )
- parser.add_argument(
- '--output',
- default='associated_tags_results_with_search.json',
- help='输出JSON文件路径(默认: associated_tags_results_with_search.json)'
- )
- parser.add_argument(
- '--stats',
- default='search_words_stats.json',
- help='统计信息输出路径(默认: search_words_stats.json)'
- )
- parser.add_argument(
- '--debug',
- action='store_true',
- help='启用调试日志'
- )
- args = parser.parse_args()
- # 设置日志级别
- if args.debug:
- logger.setLevel(logging.DEBUG)
- # 创建生成器
- generator = SearchWordGenerator(input_path=args.input)
- # 生成检索词
- enhanced_data = generator.generate_search_words()
- # 保存结果
- generator.save_results(enhanced_data, args.output)
- generator.save_stats(args.stats)
- # 输出汇总
- logger.info("\n" + "=" * 60)
- logger.info("处理完成汇总")
- logger.info("=" * 60)
- logger.info(f"输入文件: {args.input}")
- logger.info(f"输出文件: {args.output}")
- logger.info(f"统计文件: {args.stats}")
- logger.info(f"")
- logger.info(f"处理结果:")
- logger.info(f" - 结果项数: {generator.stats['处理的结果项数']}")
- logger.info(f" - 总特征数: {generator.stats['生成的总组合词数']}")
- logger.info(f" - 唯一组合词: {generator.stats['唯一组合词数']}")
- logger.info(f" - 重复过滤: {generator.stats['重复过滤的词数']}")
- logger.info(f" - 去重率: {generator.stats['重复过滤的词数'] / max(generator.stats['生成的总组合词数'], 1) * 100:.1f}%")
- if __name__ == '__main__':
- main()
|