#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配, 使用 relation_analyzer 模块分析特征之间的语义关系。 """ import json import asyncio from pathlib import Path from typing import Dict, List import sys from tqdm import tqdm # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from lib.hybrid_similarity import compare_phrases_cartesian from script.data_processing.path_config import PathConfig # 全局进度条 progress_bar = None async def process_single_point( point: Dict, point_type: str, persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 处理单个点 - 使用笛卡尔积批量计算(优化版) Args: point: 点数据(灵感点/关键点/目的点) point_type: 点类型("灵感点"/"关键点"/"目的点") persona_features: 人设特征列表 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 步骤列表的点数据 """ global progress_bar point_name = point.get("名称", "") feature_list = point.get("特征列表", []) # 如果没有特征,直接返回 if not feature_list or not persona_features: result = point.copy() result["how步骤列表"] = [] return result # 提取特征名称和人设名称列表 feature_names = [f.get("特征名称", "") for f in feature_list] persona_names = [pf["特征名称"] for pf in persona_features] # 定义进度回调函数 def on_llm_progress(count: int): """LLM完成一个任务时的回调""" if progress_bar: progress_bar.update(count) # 核心优化:使用混合模型笛卡尔积一次计算M×N # max_concurrent 控制的是底层 LLM 的全局并发数 similarity_results = await compare_phrases_cartesian( feature_names, # M个特征 persona_names, # N个人设 max_concurrent=100, # LLM最大并发数(全局共享) progress_callback=on_llm_progress # 传递进度回调 ) # similarity_results[i][j] = {"相似度": float, "说明": str} # 构建匹配结果(使用模块返回的完整结果) feature_match_results = [] for i, feature_item in enumerate(feature_list): feature_name = feature_item.get("特征名称", "") feature_weight = feature_item.get("权重", 1.0) # 该特征与所有人设的匹配结果 match_results = [] for j, persona_feature in enumerate(persona_features): persona_name = persona_feature["特征名称"] persona_level = persona_feature["人设特征层级"] # 直接使用模块返回的完整结果 similarity_result = similarity_results[i][j] # 判断特征类型和分类 feature_type = "分类" # 默认为分类 categories = [] if category_mapping: # 先在标签特征中查找 is_tag_feature = False for ft in ["灵感点", "关键点", "目的点"]: if ft in category_mapping: type_mapping = category_mapping[ft] if persona_name in type_mapping: feature_type = "标签" categories = type_mapping[persona_name].get("所属分类", []) is_tag_feature = True break # 如果不是标签特征,检查是否是分类特征 if not is_tag_feature: all_categories = set() for ft in ["灵感点", "关键点", "目的点"]: if ft in category_mapping: for _fname, fdata in category_mapping[ft].items(): cats = fdata.get("所属分类", []) all_categories.update(cats) if persona_name in all_categories: feature_type = "分类" categories = [] # 去重分类 unique_categories = list(dict.fromkeys(categories)) match_result = { "人设特征名称": persona_name, "人设特征层级": persona_level, "特征类型": feature_type, "特征分类": unique_categories, "匹配结果": similarity_result # 直接使用模块返回的结果 } match_results.append(match_result) feature_match_results.append({ "特征名称": feature_name, "权重": feature_weight, "匹配结果": match_results }) # 构建 how 步骤(保持不变) step_name_mapping = { "灵感点": "灵感特征分别匹配人设特征", "关键点": "关键特征分别匹配人设特征", "目的点": "目的特征分别匹配人设特征" } how_step = { "步骤名称": step_name_mapping.get(point_type, f"{point_type}特征分别匹配人设特征"), "特征列表": list(feature_match_results) } result = point.copy() result["how步骤列表"] = [how_step] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, all_persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 处理单个任务 Args: task: 任务数据 task_index: 任务索引(从1开始) total_tasks: 总任务数 all_persona_features: 所有人设特征列表(包含三种层级) category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务 """ global progress_bar post_id = task.get("帖子id", "") # 获取 what 解构结果 what_result = task.get("what解构结果", {}) # 计算当前帖子的总匹配任务数 current_task_match_count = 0 for point_type in ["灵感点", "关键点", "目的点"]: point_list = what_result.get(f"{point_type}列表", []) for point in point_list: feature_count = len(point.get("特征列表", [])) current_task_match_count += feature_count * len(all_persona_features) # 创建当前帖子的进度条 progress_bar = tqdm( total=current_task_match_count, desc=f"[{task_index}/{total_tasks}] {post_id}", unit="匹配", ncols=100 ) # 构建 how 解构结果 how_result = {} # 串行处理灵感点、关键点和目的点 for point_type in ["灵感点", "关键点", "目的点"]: point_list_key = f"{point_type}列表" point_list = what_result.get(point_list_key, []) if point_list: updated_point_list = [] # 串行处理每个点 for point in point_list: result = await process_single_point( point=point, point_type=point_type, persona_features=all_persona_features, category_mapping=category_mapping, model_name=model_name ) updated_point_list.append(result) # 添加到 how 解构结果 how_result[point_list_key] = updated_point_list # 关闭当前帖子的进度条 if progress_bar: progress_bar.close() # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_features_dict: Dict, category_mapping: Dict = None, model_name: str = None, output_dir: Path = None ) -> List[Dict]: """ 处理整个解构任务列表(串行执行,每个帖子处理完立即保存) Args: task_list: 解构任务列表 persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点) category_mapping: 特征分类映射字典 model_name: 使用的模型名称 output_dir: 输出目录(如果提供,每个帖子处理完立即保存) Returns: 包含 how 解构结果的任务列表 """ # 合并三种人设特征(灵感点、关键点、目的点) all_features = [] for feature_type in ["灵感点", "关键点", "目的点"]: # 获取该类型的标签特征 type_features = persona_features_dict.get(feature_type, []) # 为每个特征添加层级信息 for feature in type_features: feature_with_level = feature.copy() feature_with_level["人设特征层级"] = feature_type all_features.append(feature_with_level) print(f"人设{feature_type}标签特征数量: {len(type_features)}") # 从分类映射中提取该类型的分类特征 if category_mapping and feature_type in category_mapping: type_categories = set() for _, feature_data in category_mapping[feature_type].items(): categories = feature_data.get("所属分类", []) type_categories.update(categories) # 转换为特征格式并添加层级信息 for cat in sorted(type_categories): all_features.append({ "特征名称": cat, "人设特征层级": feature_type }) print(f"人设{feature_type}分类特征数量: {len(type_categories)}") print(f"总特征数量(三种类型的标签+分类): {len(all_features)}") # 计算总匹配任务数(灵感点、关键点和目的点) total_match_count = 0 for task in task_list: what_result = task.get("what解构结果", {}) for point_type in ["灵感点", "关键点", "目的点"]: point_list = what_result.get(f"{point_type}列表", []) for point in point_list: feature_count = len(point.get("特征列表", [])) total_match_count += feature_count * len(all_features) print(f"处理灵感点、关键点和目的点特征") print(f"总匹配任务数: {total_match_count:,}") print() # 串行处理所有任务(一个接一个,每个处理完立即保存) updated_task_list = [] for i, task in enumerate(task_list, 1): updated_task = await process_single_task( task=task, task_index=i, total_tasks=len(task_list), all_persona_features=all_features, category_mapping=category_mapping, model_name=model_name ) updated_task_list.append(updated_task) # 立即保存当前帖子的结果 if output_dir: post_id = updated_task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(updated_task, f, ensure_ascii=False, indent=4) print(f" ✓ 已保存: {output_file.name}") return updated_task_list async def main(): """主函数""" # 使用路径配置 config = PathConfig() # 确保输出目录存在 config.ensure_dirs() # 获取路径 task_list_file = config.task_list_file persona_features_file = config.feature_source_mapping_file category_mapping_file = config.feature_category_mapping_file output_dir = config.how_results_dir print(f"账号: {config.account_name}") print(f"任务列表文件: {task_list_file}") print(f"人设特征文件: {persona_features_file}") print(f"分类映射文件: {category_mapping_file}") print(f"输出目录: {output_dir}") print() print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征: {persona_features_file}") with open(persona_features_file, "r", encoding="utf-8") as f: persona_features_data = json.load(f) print(f"读取特征分类映射: {category_mapping_file}") with open(category_mapping_file, "r", encoding="utf-8") as f: category_mapping = json.load(f) # 获取任务列表 task_list = task_list_data.get("解构任务列表", []) print(f"总任务数: {len(task_list)}") # 处理任务列表(每个帖子处理完立即保存) updated_task_list = await process_task_list( task_list=task_list, persona_features_dict=persona_features_data, category_mapping=category_mapping, model_name=None, # 使用默认模型 output_dir=output_dir # 传递输出目录,启用即时保存 ) print("\n完成!") # 打印统计信息 total_inspiration_points = 0 total_key_points = 0 total_purpose_points = 0 total_inspiration_features = 0 total_key_features = 0 total_purpose_features = 0 for task in updated_task_list: how_result = task.get("how解构结果", {}) # 统计灵感点 inspiration_list = how_result.get("灵感点列表", []) total_inspiration_points += len(inspiration_list) for point in inspiration_list: total_inspiration_features += len(point.get("特征列表", [])) # 统计关键点 key_list = how_result.get("关键点列表", []) total_key_points += len(key_list) for point in key_list: total_key_features += len(point.get("特征列表", [])) # 统计目的点 purpose_list = how_result.get("目的点列表", []) total_purpose_points += len(purpose_list) for point in purpose_list: total_purpose_features += len(point.get("特征列表", [])) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 处理的灵感点特征数: {total_inspiration_features}") print(f" 处理的关键点数: {total_key_points}") print(f" 处理的关键点特征数: {total_key_features}") print(f" 处理的目的点数: {total_purpose_points}") print(f" 处理的目的点特征数: {total_purpose_features}") if __name__ == "__main__": asyncio.run(main())