#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配, 使用 relation_analyzer 模块分析特征之间的语义关系。 """ import json import asyncio from pathlib import Path from typing import Dict, List import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from lib.relation_analyzer import analyze_relation # 全局并发限制 MAX_CONCURRENT_REQUESTS = 20 semaphore = None def get_semaphore(): """获取全局信号量""" global semaphore if semaphore is None: semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) return semaphore async def match_single_pair( feature_name: str, persona_name: str, model_name: str = None ) -> Dict: """ 匹配单个特征对(带并发限制) Args: feature_name: 要匹配的特征名称 persona_name: 人设特征名称 model_name: 使用的模型名称 Returns: 单个匹配结果 """ sem = get_semaphore() async with sem: print(f" 匹配: {feature_name} <-> {persona_name}") relation_result = await analyze_relation( phrase_a=feature_name, phrase_b=persona_name, model_name=model_name ) return { "人设特征名称": persona_name, "匹配结果": relation_result } async def match_feature_with_persona( feature_name: str, persona_features: List[Dict], model_name: str = None ) -> List[Dict]: """ 将一个特征与人设特征列表进行匹配(并发执行) Args: feature_name: 要匹配的特征名称 persona_features: 人设特征列表 model_name: 使用的模型名称 Returns: 匹配结果列表 """ # 创建所有匹配任务 tasks = [ match_single_pair(feature_name, persona_feature["特征名称"], model_name) for persona_feature in persona_features ] # 并发执行所有匹配 match_results = await asyncio.gather(*tasks) return list(match_results) async def match_single_feature( feature_name: str, persona_features: List[Dict], model_name: str = None ) -> Dict: """ 匹配单个特征与所有人设特征 Args: feature_name: 特征名称 persona_features: 人设特征列表 model_name: 使用的模型名称 Returns: 特征匹配结果 """ print(f" 特征: {feature_name}") match_results = await match_feature_with_persona( feature_name=feature_name, persona_features=persona_features, model_name=model_name ) return { "特征名称": feature_name, "匹配结果": match_results } async def process_single_inspiration_point( inspiration_point: Dict, persona_features: List[Dict], model_name: str = None ) -> Dict: """ 处理单个灵感点的特征匹配(并发执行) Args: inspiration_point: 灵感点数据 persona_features: 人设灵感特征列表 model_name: 使用的模型名称 Returns: 包含 how 步骤列表的灵感点数据 """ point_name = inspiration_point.get("名称", "") feature_list = inspiration_point.get("特征列表", []) print(f" 处理灵感点: {point_name}") print(f" 特征数量: {len(feature_list)}") # 并发匹配所有特征 tasks = [ match_single_feature(feature_name, persona_features, model_name) for feature_name in feature_list ] feature_match_results = await asyncio.gather(*tasks) # 构建 how 步骤 how_step = { "步骤名称": "灵感特征分别匹配人设特征", "特征列表": list(feature_match_results) } # 返回更新后的灵感点 result = inspiration_point.copy() result["how步骤列表"] = [how_step] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, persona_inspiration_features: List[Dict], model_name: str = None ) -> Dict: """ 处理单个任务 Args: task: 任务数据 task_index: 任务索引(从1开始) total_tasks: 总任务数 persona_inspiration_features: 人设灵感特征列表 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务 """ post_id = task.get("帖子id", "") print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}") # 获取灵感点列表 what_result = task.get("what解构结果", {}) inspiration_list = what_result.get("灵感点列表", []) print(f" 灵感点数量: {len(inspiration_list)}") # 并发处理所有灵感点 tasks = [ process_single_inspiration_point( inspiration_point=inspiration_point, persona_features=persona_inspiration_features, model_name=model_name ) for inspiration_point in inspiration_list ] updated_inspiration_list = await asyncio.gather(*tasks) # 构建 how 解构结果 how_result = { "灵感点列表": list(updated_inspiration_list) } # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_features_dict: Dict, model_name: str = None ) -> List[Dict]: """ 处理整个解构任务列表(并发执行) Args: task_list: 解构任务列表 persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点) model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务列表 """ persona_inspiration_features = persona_features_dict.get("灵感点", []) print(f"人设灵感特征数量: {len(persona_inspiration_features)}") # 并发处理所有任务 tasks = [ process_single_task( task=task, task_index=i, total_tasks=len(task_list), persona_inspiration_features=persona_inspiration_features, model_name=model_name ) for i, task in enumerate(task_list, 1) ] updated_task_list = await asyncio.gather(*tasks) return list(updated_task_list) async def main(): """主函数""" # 输入输出路径 script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1118" task_list_file = data_dir / "当前帖子_解构任务列表.json" persona_features_file = data_dir / "特征名称_帖子来源.json" output_dir = data_dir / "当前帖子_how解构结果" # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征: {persona_features_file}") with open(persona_features_file, "r", encoding="utf-8") as f: persona_features_data = json.load(f) # 获取任务列表 task_list = task_list_data.get("解构任务列表", []) print(f"\n总任务数: {len(task_list)}") # 处理任务列表 updated_task_list = await process_task_list( task_list=task_list, persona_features_dict=persona_features_data, model_name=None # 使用默认模型 ) # 分文件保存结果 print(f"\n保存结果到: {output_dir}") for task in updated_task_list: post_id = task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how.json" print(f" 保存: {output_file.name}") with open(output_file, "w", encoding="utf-8") as f: json.dump(task, f, ensure_ascii=False, indent=4) print("\n完成!") # 打印统计信息 total_inspiration_points = sum( len(task["how解构结果"]["灵感点列表"]) for task in updated_task_list ) total_features = sum( len(point["特征列表"]) for task in updated_task_list for point in task["how解构结果"]["灵感点列表"] ) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 处理的特征数: {total_features}") if __name__ == "__main__": asyncio.run(main())