#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配, 使用 relation_analyzer 模块分析特征之间的语义关系。 """ import json import asyncio from pathlib import Path from typing import Dict, List import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from lib.semantic_similarity import compare_phrases # 全局并发限制 MAX_CONCURRENT_REQUESTS = 20 semaphore = None def get_semaphore(): """获取全局信号量""" global semaphore if semaphore is None: semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) return semaphore async def match_single_pair( feature_name: str, persona_name: str, category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 匹配单个特征对(带并发限制) Args: feature_name: 要匹配的特征名称 persona_name: 人设特征名称 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 单个匹配结果,格式: { "人设特征名称": "xxx", "特征类型": "标签", "特征分类": ["分类1", "分类2"], "匹配结果": { "相似度": 0.75, "说明": "..." } } """ sem = get_semaphore() async with sem: print(f" 匹配: {feature_name} <-> {persona_name}") similarity_result = await compare_phrases( phrase_a=feature_name, phrase_b=persona_name, ) # 判断该特征是标签还是分类 feature_type = "分类" # 默认为分类 categories = [] if category_mapping: # 先在标签特征中查找(灵感点、关键点、目的点) is_tag_feature = False for ft in ["灵感点", "关键点", "目的点"]: if ft in category_mapping: type_mapping = category_mapping[ft] if persona_name in type_mapping: # 找到了,说明是标签特征 feature_type = "标签" categories = type_mapping[persona_name].get("所属分类", []) is_tag_feature = True break # 如果不是标签特征,检查是否是分类特征 if not is_tag_feature: # 收集所有分类 all_categories = set() for ft in ["灵感点", "关键点", "目的点"]: if ft in category_mapping: for fname, fdata in category_mapping[ft].items(): cats = fdata.get("所属分类", []) all_categories.update(cats) # 如果当前特征名在分类列表中,则是分类特征 if persona_name in all_categories: feature_type = "分类" categories = [] # 分类特征本身没有所属分类 # 去重分类 unique_categories = list(dict.fromkeys(categories)) return { "人设特征名称": persona_name, "特征类型": feature_type, "特征分类": unique_categories, "匹配结果": similarity_result } async def match_feature_with_persona( feature_name: str, persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> List[Dict]: """ 将一个特征与人设特征列表进行匹配(并发执行) Args: feature_name: 要匹配的特征名称 persona_features: 人设特征列表 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 匹配结果列表 """ # 创建所有匹配任务 tasks = [ match_single_pair(feature_name, persona_feature["特征名称"], category_mapping, model_name) for persona_feature in persona_features ] # 并发执行所有匹配 match_results = await asyncio.gather(*tasks) return list(match_results) async def match_single_feature( feature_name: str, persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 匹配单个特征与所有人设特征 Args: feature_name: 特征名称 persona_features: 人设特征列表 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 特征匹配结果 """ print(f" 特征: {feature_name}") match_results = await match_feature_with_persona( feature_name=feature_name, persona_features=persona_features, category_mapping=category_mapping, model_name=model_name ) return { "特征名称": feature_name, "匹配结果": match_results } async def process_single_inspiration_point( inspiration_point: Dict, persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 处理单个灵感点的特征匹配(并发执行) Args: inspiration_point: 灵感点数据 persona_features: 人设灵感特征列表 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 步骤列表的灵感点数据 """ point_name = inspiration_point.get("名称", "") feature_list = inspiration_point.get("特征列表", []) print(f" 处理灵感点: {point_name}") print(f" 特征数量: {len(feature_list)}") # 并发匹配所有特征 tasks = [ match_single_feature(feature_name, persona_features, category_mapping, model_name) for feature_name in feature_list ] feature_match_results = await asyncio.gather(*tasks) # 构建 how 步骤 how_step = { "步骤名称": "灵感特征分别匹配人设特征", "特征列表": list(feature_match_results) } # 返回更新后的灵感点 result = inspiration_point.copy() result["how步骤列表"] = [how_step] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, persona_inspiration_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 处理单个任务 Args: task: 任务数据 task_index: 任务索引(从1开始) total_tasks: 总任务数 persona_inspiration_features: 人设灵感特征列表 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务 """ post_id = task.get("帖子id", "") print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}") # 获取灵感点列表 what_result = task.get("what解构结果", {}) inspiration_list = what_result.get("灵感点列表", []) print(f" 灵感点数量: {len(inspiration_list)}") # 并发处理所有灵感点 tasks = [ process_single_inspiration_point( inspiration_point=inspiration_point, persona_features=persona_inspiration_features, category_mapping=category_mapping, model_name=model_name ) for inspiration_point in inspiration_list ] updated_inspiration_list = await asyncio.gather(*tasks) # 构建 how 解构结果 how_result = { "灵感点列表": list(updated_inspiration_list) } # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_features_dict: Dict, category_mapping: Dict = None, model_name: str = None ) -> List[Dict]: """ 处理整个解构任务列表(并发执行) Args: task_list: 解构任务列表 persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点) category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务列表 """ # 获取标签特征列表 persona_inspiration_features = persona_features_dict.get("灵感点", []) print(f"人设标签特征数量: {len(persona_inspiration_features)}") # 从分类映射中提取所有唯一的分类作为分类特征(仅从灵感点中提取) category_features = [] if category_mapping: all_categories = set() # 只从灵感点中提取分类 if "灵感点" in category_mapping: for _, feature_data in category_mapping["灵感点"].items(): categories = feature_data.get("所属分类", []) all_categories.update(categories) # 转换为特征格式 category_features = [{"特征名称": cat} for cat in sorted(all_categories)] print(f"人设分类特征数量: {len(category_features)}") # 合并标签特征和分类特征 all_features = persona_inspiration_features + category_features print(f"总特征数量(标签+分类): {len(all_features)}") # 并发处理所有任务 tasks = [ process_single_task( task=task, task_index=i, total_tasks=len(task_list), persona_inspiration_features=all_features, category_mapping=category_mapping, model_name=model_name ) for i, task in enumerate(task_list, 1) ] updated_task_list = await asyncio.gather(*tasks) return list(updated_task_list) async def main(): """主函数""" # 输入输出路径 script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1118" task_list_file = data_dir / "当前帖子_解构任务列表.json" persona_features_file = data_dir / "特征名称_帖子来源.json" category_mapping_file = data_dir / "特征名称_分类映射.json" output_dir = data_dir / "当前帖子_how解构结果" # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征: {persona_features_file}") with open(persona_features_file, "r", encoding="utf-8") as f: persona_features_data = json.load(f) print(f"读取特征分类映射: {category_mapping_file}") with open(category_mapping_file, "r", encoding="utf-8") as f: category_mapping = json.load(f) # 获取任务列表 task_list = task_list_data.get("解构任务列表", []) print(f"\n总任务数: {len(task_list)}") # 处理任务列表 updated_task_list = await process_task_list( task_list=task_list, persona_features_dict=persona_features_data, category_mapping=category_mapping, model_name=None # 使用默认模型 ) # 分文件保存结果 print(f"\n保存结果到: {output_dir}") for task in updated_task_list: post_id = task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how.json" print(f" 保存: {output_file.name}") with open(output_file, "w", encoding="utf-8") as f: json.dump(task, f, ensure_ascii=False, indent=4) print("\n完成!") # 打印统计信息 total_inspiration_points = sum( len(task["how解构结果"]["灵感点列表"]) for task in updated_task_list ) total_features = sum( len(point["特征列表"]) for task in updated_task_list for point in task["how解构结果"]["灵感点列表"] ) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 处理的特征数: {total_features}") if __name__ == "__main__": asyncio.run(main())