#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 v3(特征组合匹配版本) 从解构任务列表中提取灵感点的特征列表,与人设历史的特征组合进行匹配。 匹配时考虑组合中每个特征的分类信息,使用待设计的组合匹配模块。 """ import json import asyncio from pathlib import Path from typing import Dict, List, Optional import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import trace from agents.tracing.create import custom_span from lib.my_trace import set_trace from lib.hierarchical_match_analyzer import hierarchical_match # 全局并发限制 MAX_CONCURRENT_REQUESTS = 20 semaphore = None def get_semaphore(): """获取全局信号量""" global semaphore if semaphore is None: semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) return semaphore def load_feature_categories(categories_file: Path) -> Dict: """ 加载特征分类映射 Args: categories_file: 特征名称_分类映射.json 文件路径 Returns: 特征分类字典 """ with open(categories_file, "r", encoding="utf-8") as f: return json.load(f) def enrich_persona_combinations_with_categories( persona_combinations: List[Dict], feature_categories: Dict, point_type: str ) -> List[Dict]: """ 为人设特征组合添加分类信息 Args: persona_combinations: 人设特征组合列表 feature_categories: 特征分类映射字典 point_type: 点类型 ("灵感点", "目的点", "关键点") Returns: enriched_combinations: 增强后的组合列表,每个组合包含特征及其分类 """ enriched_combinations = [] # 获取该点类型的分类映射 type_categories = feature_categories.get(point_type, {}) for combo in persona_combinations: feature_list = combo.get("特征组合", []) # 为每个特征添加分类信息 enriched_features = [] for feature_name in feature_list: categories = type_categories.get(feature_name, {}).get("所属分类", []) enriched_features.append({ "特征名称": feature_name, "所属分类": categories }) # 构建增强后的组合 enriched_combo = { "特征组合": enriched_features, # 带分类的特征列表 "原始特征组合": feature_list, # 保留原始特征名称列表 "特征来源": combo.get("特征来源", []) } enriched_combinations.append(enriched_combo) return enriched_combinations async def match_feature_list_with_combination( current_feature_list: List[str], persona_combination: Dict, model_name: Optional[str] = None ) -> Dict: """ 将当前点的特征列表与一个人设历史组合进行分层匹配 使用分层匹配策略: 1. 优先匹配灵感点标签(特征名称) 2. 无标签匹配时,匹配第一层分类 3. 仍无结果时,匹配第二层上位分类 4. 对每个候选进行推理难度打分 Args: current_feature_list: 当前点的特征列表,如 ["立冬", "教资查分", "时间巧合"] persona_combination: 人设历史组合(带分类信息),格式如: { "特征组合": [ {"特征名称": "猫孩子", "所属分类": ["宠物亲子化", "宠物情感", "实质"]}, {"特征名称": "被拿捏住的无奈感", "所属分类": ["宠物关系主导", "宠物情感", "实质"]} ], "原始特征组合": ["猫孩子", "被拿捏住的无奈感"], "特征来源": [...] } model_name: 使用的模型名称 Returns: { "人设特征组合": [...], "匹配结果": { "最终得分": 0.85, "匹配层级": "第一层分类匹配", "匹配结果": "宠物情感", "综合说明": "...", "分层详情": {...} }, "人设特征来源": [...] } """ sem = get_semaphore() async with sem: # 调用分层匹配模块 match_result = await hierarchical_match( current_features=current_feature_list, persona_combination=persona_combination["特征组合"], model_name=model_name ) # 构建返回结果 result = { "人设特征组合": persona_combination["原始特征组合"], "匹配结果": { "最终得分": match_result["最终得分"], "匹配层级": match_result["匹配层级"], "匹配结果": match_result["匹配结果"], "综合说明": match_result["综合说明"], "分层详情": match_result["分层结果"] }, "人设特征来源": persona_combination["特征来源"] } return result async def match_inspiration_point_with_combinations( current_feature_list: List[str], persona_combinations: List[Dict], model_name: Optional[str] = None ) -> List[Dict]: """ 将当前点的特征列表与所有人设特征组合进行匹配 Args: current_feature_list: 当前点的特征列表 persona_combinations: 人设特征组合列表(已包含分类信息) model_name: 使用的模型名称 Returns: 匹配结果列表(按分数降序排序) """ print(f" 批量匹配: {current_feature_list} <-> {len(persona_combinations)}个人设特征组合") # 并发匹配所有组合 tasks = [ match_feature_list_with_combination( current_feature_list=current_feature_list, persona_combination=combo, model_name=model_name ) for combo in persona_combinations ] match_results = await asyncio.gather(*tasks) # 按最终得分降序排序 match_results.sort(key=lambda x: x["匹配结果"]["最终得分"], reverse=True) return match_results async def process_single_inspiration_point( inspiration_point: Dict, persona_combinations: List[Dict], model_name: Optional[str] = None ) -> Dict: """ 处理单个灵感点的特征组合匹配 Args: inspiration_point: 灵感点数据,包含特征列表 persona_combinations: 人设特征组合列表(已包含分类信息) model_name: 使用的模型名称 Returns: 包含 how 步骤列表的灵感点数据 """ point_name = inspiration_point.get("名称", "") feature_list = inspiration_point.get("特征列表", []) print(f" 处理灵感点: {point_name}") print(f" 特征列表: {feature_list}") # 使用 custom_span 标识灵感点处理 with custom_span( name=f"处理灵感点: {point_name}", data={ "灵感点": point_name, "特征列表": feature_list, "人设组合数量": len(persona_combinations) } ): # 将特征列表与所有人设组合进行匹配 match_results = await match_inspiration_point_with_combinations( current_feature_list=feature_list, persona_combinations=persona_combinations, model_name=model_name ) # 构建 how 步骤 how_step = { "步骤名称": "灵感特征列表批量匹配人设特征组合", "当前特征列表": feature_list, "匹配结果": match_results } # 返回更新后的灵感点 result = inspiration_point.copy() result["how步骤列表"] = [how_step] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, persona_combinations: List[Dict], model_name: Optional[str] = None ) -> Dict: """ 处理单个任务 Args: task: 任务数据 task_index: 任务索引(从1开始) total_tasks: 总任务数 persona_combinations: 人设特征组合列表(已包含分类信息) model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务 """ post_id = task.get("帖子id", "") print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}") # 获取灵感点列表 what_result = task.get("what解构结果", {}) inspiration_list = what_result.get("灵感点列表", []) print(f" 灵感点数量: {len(inspiration_list)}") # 并发处理所有灵感点 tasks = [ process_single_inspiration_point( inspiration_point=inspiration_point, persona_combinations=persona_combinations, model_name=model_name ) for inspiration_point in inspiration_list ] updated_inspiration_list = await asyncio.gather(*tasks) # 构建 how 解构结果 how_result = { "灵感点列表": list(updated_inspiration_list) } # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_combinations: List[Dict], model_name: Optional[str] = None, current_time: Optional[str] = None, log_url: Optional[str] = None ) -> List[Dict]: """ 处理整个解构任务列表(并发执行) Args: task_list: 解构任务列表 persona_combinations: 人设特征组合列表(已包含分类信息) model_name: 使用的模型名称 current_time: 当前时间戳 log_url: 日志链接 Returns: 包含 how 解构结果的任务列表 """ print(f"人设灵感特征组合数量: {len(persona_combinations)}") # 使用 custom_span 标识整个处理流程 with custom_span( name="特征组合批量匹配 v3 - 所有任务", data={ "任务总数": len(task_list), "人设组合数量": len(persona_combinations), "current_time": current_time, "log_url": log_url } ): # 并发处理所有任务 tasks = [ process_single_task( task=task, task_index=i, total_tasks=len(task_list), persona_combinations=persona_combinations, model_name=model_name ) for i, task in enumerate(task_list, 1) ] updated_task_list = await asyncio.gather(*tasks) return list(updated_task_list) async def main(current_time: Optional[str] = None, log_url: Optional[str] = None): """主函数 Args: current_time: 当前时间戳(从外部传入) log_url: 日志链接(从外部传入) """ # 输入输出路径 script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1118" task_list_file = data_dir / "当前帖子_解构任务列表.json" persona_combinations_file = data_dir / "特征组合_帖子来源.json" feature_categories_file = data_dir / "特征名称_分类映射.json" output_dir = data_dir / "当前帖子_how解构结果_v3" # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) # 获取模型名称 from lib.client import MODEL_NAME model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_") print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征组合: {persona_combinations_file}") with open(persona_combinations_file, "r", encoding="utf-8") as f: persona_combinations_data = json.load(f) print(f"读取特征分类映射: {feature_categories_file}") feature_categories = load_feature_categories(feature_categories_file) # 获取任务列表 task_list = task_list_data.get("解构任务列表", []) print(f"\n总任务数: {len(task_list)}") print(f"使用模型: {MODEL_NAME}\n") # 为人设特征组合添加分类信息(只处理灵感点) persona_inspiration_combinations_raw = persona_combinations_data.get("灵感点", []) persona_inspiration_combinations = enrich_persona_combinations_with_categories( persona_combinations=persona_inspiration_combinations_raw, feature_categories=feature_categories, point_type="灵感点" ) print(f"灵感点特征组合数量: {len(persona_inspiration_combinations)}") print(f"示例组合 (前3个):") for i, combo in enumerate(persona_inspiration_combinations[:3], 1): print(f" {i}. 原始组合: {combo['原始特征组合']}") print(f" 带分类: {combo['特征组合']}") print() # 处理任务列表 updated_task_list = await process_task_list( task_list=task_list, persona_combinations=persona_inspiration_combinations, model_name=None, # 使用默认模型 current_time=current_time, log_url=log_url ) # 分文件保存结果 print(f"\n保存结果到: {output_dir}") for task in updated_task_list: post_id = task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how_v3_{model_name_short}.json" # 在每个任务中添加元数据 task["元数据"] = { "current_time": current_time, "log_url": log_url, "version": "v3_combination_match", "model": MODEL_NAME, "说明": "v3版本: 使用特征列表匹配人设特征组合(带分类信息)" } print(f" 保存: {output_file.name}") with open(output_file, "w", encoding="utf-8") as f: json.dump(task, f, ensure_ascii=False, indent=4) print("\n完成!") # 打印统计信息 total_inspiration_points = sum( len(task["how解构结果"]["灵感点列表"]) for task in updated_task_list ) total_matches = sum( len(point["how步骤列表"][0]["匹配结果"]) for task in updated_task_list for point in task["how解构结果"]["灵感点列表"] ) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 生成的匹配结果数: {total_matches}") if log_url: print(f"\nTrace: {log_url}\n") if __name__ == "__main__": # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("灵感特征组合批量匹配 v3"): asyncio.run(main(current_time, log_url))