#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配, 使用 relation_analyzer 模块分析特征之间的语义关系。 """ import json import asyncio from pathlib import Path from typing import Dict, List import sys from datetime import datetime # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from lib.hybrid_similarity import compare_phrases_cartesian from script.data_processing.path_config import PathConfig # 进度跟踪 class ProgressTracker: """进度跟踪器""" def __init__(self, total: int): self.total = total self.completed = 0 self.start_time = datetime.now() self.last_update_time = datetime.now() self.last_completed = 0 def update(self, count: int = 1): """更新进度""" self.completed += count current_time = datetime.now() # 每秒最多更新一次,或者达到总数时更新 if (current_time - self.last_update_time).total_seconds() >= 1.0 or self.completed >= self.total: self.display() self.last_update_time = current_time self.last_completed = self.completed def display(self): """显示进度""" if self.total == 0: return percentage = (self.completed / self.total) * 100 elapsed = (datetime.now() - self.start_time).total_seconds() # 计算速度和预估剩余时间 if elapsed > 0: speed = self.completed / elapsed if speed > 0: remaining = (self.total - self.completed) / speed eta_str = f", 预计剩余: {int(remaining)}秒" else: eta_str = "" else: eta_str = "" bar_length = 40 filled_length = int(bar_length * self.completed / self.total) bar = '█' * filled_length + '░' * (bar_length - filled_length) print(f"\r 进度: [{bar}] {self.completed}/{self.total} ({percentage:.1f}%){eta_str}", end='', flush=True) # 完成时换行 if self.completed >= self.total: print() # 全局进度跟踪器 progress_tracker = None async def process_single_point( point: Dict, point_type: str, persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 处理单个点 - 使用笛卡尔积批量计算(优化版) Args: point: 点数据(灵感点/关键点/目的点) point_type: 点类型("灵感点"/"关键点"/"目的点") persona_features: 人设特征列表 category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 步骤列表的点数据 """ global progress_tracker point_name = point.get("名称", "") feature_list = point.get("特征列表", []) # 如果没有特征,直接返回 if not feature_list or not persona_features: result = point.copy() result["how步骤列表"] = [] return result # 提取特征名称和人设名称列表 feature_names = [f.get("特征名称", "") for f in feature_list] persona_names = [pf["特征名称"] for pf in persona_features] # 核心优化:使用混合模型笛卡尔积一次计算M×N try: similarity_results = await compare_phrases_cartesian( feature_names, # M个特征 persona_names, # N个人设 max_concurrent=100 # LLM最大并发数 ) # similarity_results[i][j] = {"相似度": float, "说明": str} except Exception as e: print(f"\n⚠️ 混合模型调用失败: {e}") result = point.copy() result["how步骤列表"] = [] return result # 构建匹配结果(使用模块返回的完整结果) feature_match_results = [] for i, feature_item in enumerate(feature_list): feature_name = feature_item.get("特征名称", "") feature_weight = feature_item.get("权重", 1.0) # 该特征与所有人设的匹配结果 match_results = [] for j, persona_feature in enumerate(persona_features): persona_name = persona_feature["特征名称"] persona_level = persona_feature["人设特征层级"] # 直接使用模块返回的完整结果 similarity_result = similarity_results[i][j] # 判断特征类型和分类 feature_type = "分类" # 默认为分类 categories = [] if category_mapping: # 先在标签特征中查找 is_tag_feature = False for ft in ["灵感点", "关键点", "目的点"]: if ft in category_mapping: type_mapping = category_mapping[ft] if persona_name in type_mapping: feature_type = "标签" categories = type_mapping[persona_name].get("所属分类", []) is_tag_feature = True break # 如果不是标签特征,检查是否是分类特征 if not is_tag_feature: all_categories = set() for ft in ["灵感点", "关键点", "目的点"]: if ft in category_mapping: for fname, fdata in category_mapping[ft].items(): cats = fdata.get("所属分类", []) all_categories.update(cats) if persona_name in all_categories: feature_type = "分类" categories = [] # 去重分类 unique_categories = list(dict.fromkeys(categories)) match_result = { "人设特征名称": persona_name, "人设特征层级": persona_level, "特征类型": feature_type, "特征分类": unique_categories, "匹配结果": similarity_result # 直接使用模块返回的结果 } match_results.append(match_result) # 更新进度 if progress_tracker: progress_tracker.update(1) feature_match_results.append({ "特征名称": feature_name, "权重": feature_weight, "匹配结果": match_results }) # 构建 how 步骤(保持不变) step_name_mapping = { "灵感点": "灵感特征分别匹配人设特征", "关键点": "关键特征分别匹配人设特征", "目的点": "目的特征分别匹配人设特征" } how_step = { "步骤名称": step_name_mapping.get(point_type, f"{point_type}特征分别匹配人设特征"), "特征列表": list(feature_match_results) } result = point.copy() result["how步骤列表"] = [how_step] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, all_persona_features: List[Dict], category_mapping: Dict = None, model_name: str = None ) -> Dict: """ 处理单个任务 Args: task: 任务数据 task_index: 任务索引(从1开始) total_tasks: 总任务数 all_persona_features: 所有人设特征列表(包含三种层级) category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务 """ post_id = task.get("帖子id", "") print(f"\n[{task_index}/{total_tasks}] 处理帖子: {post_id}") # 获取 what 解构结果 what_result = task.get("what解构结果", {}) # 构建 how 解构结果 how_result = {} # 处理灵感点、关键点和目的点 for point_type in ["灵感点", "关键点", "目的点"]: point_list_key = f"{point_type}列表" point_list = what_result.get(point_list_key, []) if point_list: # 并发处理所有点 tasks = [ process_single_point( point=point, point_type=point_type, persona_features=all_persona_features, category_mapping=category_mapping, model_name=model_name ) for point in point_list ] updated_point_list = await asyncio.gather(*tasks) # 添加到 how 解构结果 how_result[point_list_key] = list(updated_point_list) # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_features_dict: Dict, category_mapping: Dict = None, model_name: str = None ) -> List[Dict]: """ 处理整个解构任务列表(并发执行) Args: task_list: 解构任务列表 persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点) category_mapping: 特征分类映射字典 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务列表 """ global progress_tracker # 合并三种人设特征(灵感点、关键点、目的点) all_features = [] for feature_type in ["灵感点", "关键点", "目的点"]: # 获取该类型的标签特征 type_features = persona_features_dict.get(feature_type, []) # 为每个特征添加层级信息 for feature in type_features: feature_with_level = feature.copy() feature_with_level["人设特征层级"] = feature_type all_features.append(feature_with_level) print(f"人设{feature_type}标签特征数量: {len(type_features)}") # 从分类映射中提取该类型的分类特征 if category_mapping and feature_type in category_mapping: type_categories = set() for _, feature_data in category_mapping[feature_type].items(): categories = feature_data.get("所属分类", []) type_categories.update(categories) # 转换为特征格式并添加层级信息 for cat in sorted(type_categories): all_features.append({ "特征名称": cat, "人设特征层级": feature_type }) print(f"人设{feature_type}分类特征数量: {len(type_categories)}") print(f"总特征数量(三种类型的标签+分类): {len(all_features)}") # 计算总匹配任务数(灵感点、关键点和目的点) total_match_count = 0 for task in task_list: what_result = task.get("what解构结果", {}) for point_type in ["灵感点", "关键点", "目的点"]: point_list = what_result.get(f"{point_type}列表", []) for point in point_list: feature_count = len(point.get("特征列表", [])) total_match_count += feature_count * len(all_features) print(f"处理灵感点、关键点和目的点特征") print(f"总匹配任务数: {total_match_count:,}") print() # 初始化全局进度跟踪器 progress_tracker = ProgressTracker(total_match_count) # 并发处理所有任务 tasks = [ process_single_task( task=task, task_index=i, total_tasks=len(task_list), all_persona_features=all_features, category_mapping=category_mapping, model_name=model_name ) for i, task in enumerate(task_list, 1) ] updated_task_list = await asyncio.gather(*tasks) return list(updated_task_list) async def main(): """主函数""" # 使用路径配置 config = PathConfig() # 确保输出目录存在 config.ensure_dirs() # 获取路径 task_list_file = config.task_list_file persona_features_file = config.feature_source_mapping_file category_mapping_file = config.feature_category_mapping_file output_dir = config.how_results_dir print(f"账号: {config.account_name}") print(f"任务列表文件: {task_list_file}") print(f"人设特征文件: {persona_features_file}") print(f"分类映射文件: {category_mapping_file}") print(f"输出目录: {output_dir}") print() print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征: {persona_features_file}") with open(persona_features_file, "r", encoding="utf-8") as f: persona_features_data = json.load(f) print(f"读取特征分类映射: {category_mapping_file}") with open(category_mapping_file, "r", encoding="utf-8") as f: category_mapping = json.load(f) # 获取任务列表 task_list = task_list_data.get("解构任务列表", []) print(f"总任务数: {len(task_list)}") # 处理任务列表 updated_task_list = await process_task_list( task_list=task_list, persona_features_dict=persona_features_data, category_mapping=category_mapping, model_name=None # 使用默认模型 ) # 分文件保存结果 print(f"\n保存结果到: {output_dir}") for task in updated_task_list: post_id = task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how.json" print(f" 保存: {output_file.name}") with open(output_file, "w", encoding="utf-8") as f: json.dump(task, f, ensure_ascii=False, indent=4) print("\n完成!") # 打印统计信息 total_inspiration_points = 0 total_key_points = 0 total_purpose_points = 0 total_inspiration_features = 0 total_key_features = 0 total_purpose_features = 0 for task in updated_task_list: how_result = task.get("how解构结果", {}) # 统计灵感点 inspiration_list = how_result.get("灵感点列表", []) total_inspiration_points += len(inspiration_list) for point in inspiration_list: total_inspiration_features += len(point.get("特征列表", [])) # 统计关键点 key_list = how_result.get("关键点列表", []) total_key_points += len(key_list) for point in key_list: total_key_features += len(point.get("特征列表", [])) # 统计目的点 purpose_list = how_result.get("目的点列表", []) total_purpose_points += len(purpose_list) for point in purpose_list: total_purpose_features += len(point.get("特征列表", [])) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 处理的灵感点特征数: {total_inspiration_features}") print(f" 处理的关键点数: {total_key_points}") print(f" 处理的关键点特征数: {total_key_features}") print(f" 处理的目的点数: {total_purpose_points}") print(f" 处理的目的点特征数: {total_purpose_features}") if __name__ == "__main__": asyncio.run(main())