#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 v4(统一匹配版本) 使用单个prompt同时完成标签匹配和分类匹配,不分步骤执行。 一次LLM调用完成所有层级的评估。 """ import json import asyncio from pathlib import Path from typing import Dict, List, Optional import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import trace from agents.tracing.create import custom_span from lib.my_trace import set_trace from lib.unified_match_analyzer import unified_match # 全局并发限制 MAX_CONCURRENT_REQUESTS = 20 semaphore = None def get_semaphore(): """获取全局信号量""" global semaphore if semaphore is None: semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) return semaphore def load_feature_categories(categories_file: Path) -> Dict: """加载特征分类映射""" with open(categories_file, "r", encoding="utf-8") as f: return json.load(f) def enrich_persona_combinations_with_categories( persona_combinations: List[Dict], feature_categories: Dict, point_type: str ) -> List[Dict]: """为人设特征组合添加分类信息""" enriched_combinations = [] type_categories = feature_categories.get(point_type, {}) for combo in persona_combinations: feature_list = combo.get("特征组合", []) # 为每个特征添加分类信息 enriched_features = [] for feature_name in feature_list: categories = type_categories.get(feature_name, {}).get("所属分类", []) enriched_features.append({ "特征名称": feature_name, "所属分类": categories }) enriched_combo = { "特征组合": enriched_features, "原始特征组合": feature_list, "特征来源": combo.get("特征来源", []) } enriched_combinations.append(enriched_combo) return enriched_combinations async def match_tag_list_with_combination( current_tag_list: List[str], persona_combination: Dict, model_name: Optional[str] = None ) -> Dict: """ 使用统一匹配将当前点的标签列表与一个人设历史组合进行匹配 一次LLM调用完成标签匹配和分类匹配的评估 Returns: { "人设标签组合": [...], "当前标签匹配结果": [ {"当前标签": "立冬", "最终得分": 0.7, "匹配层级": "...", ...}, {"当前标签": "教资查分", "最终得分": 0.6, ...}, ... ], "人设标签来源": [...] } """ sem = get_semaphore() async with sem: # 调用统一匹配模块(返回每个当前标签的匹配结果) tag_match_results = await unified_match( current_tags=current_tag_list, persona_combination=persona_combination["特征组合"], model_name=model_name ) # 构建返回结果 result = { "人设标签组合": persona_combination["原始特征组合"], "当前标签匹配结果": tag_match_results, # 每个当前标签的匹配结果 "人设标签来源": persona_combination["特征来源"] } return result async def match_inspiration_point_with_combinations( current_feature_list: List[str], persona_combinations: List[Dict], model_name: Optional[str] = None ) -> List[Dict]: """将当前点的特征列表与所有人设特征组合进行匹配""" print(f" 批量匹配: 当前{len(current_feature_list)}个标签 {current_feature_list} vs {len(persona_combinations)}个人设组合") # 并发匹配所有组合 tasks = [ match_tag_list_with_combination( current_tag_list=current_feature_list, persona_combination=combo, model_name=model_name ) for combo in persona_combinations ] match_results = await asyncio.gather(*tasks) # 过滤和修复无效结果 valid_results = [] for result in match_results: # 确保result是dict if not isinstance(result, dict): print(f"警告: 跳过无效结果 (不是字典): {type(result)}") continue # 确保有当前标签匹配结果字段 tag_results = result.get("当前标签匹配结果") if tag_results is None: print(f"警告: 结果缺少当前标签匹配结果字段") continue # 确保当前标签匹配结果是list if not isinstance(tag_results, list): print(f"警告: 当前标签匹配结果不是列表: {type(tag_results)}") continue # 计算该人设组合的加权平均得分 weighted_scores = [] for tag_result in tag_results: if isinstance(tag_result, dict): match_result = tag_result.get("匹配结果", {}) match_type = match_result.get("匹配类型") similarity = match_result.get("语义相似度", 0) # 根据匹配类型设置权重 if match_type == "标签匹配": weight = 1.0 elif match_type == "分类匹配": weight = 0.5 else: # 无匹配 weight = 1.0 # 无匹配也使用1.0权重,因为相似度已经是0 weighted_score = similarity * weight weighted_scores.append(weighted_score) avg_score = sum(weighted_scores) / len(weighted_scores) if weighted_scores else 0 result["组合平均得分"] = avg_score # 添加精简结果字段 result["精简结果"] = { "人设标签组合": result.get("人设标签组合", []), "组合平均得分": avg_score, "各标签得分": [ { "标签": tag_res.get("当前标签"), "原始相似度": tag_res.get("匹配结果", {}).get("语义相似度", 0), "匹配类型": tag_res.get("匹配结果", {}).get("匹配类型"), "权重": 1.0 if tag_res.get("匹配结果", {}).get("匹配类型") == "标签匹配" else 0.5 if tag_res.get("匹配结果", {}).get("匹配类型") == "分类匹配" else 1.0, "加权得分": tag_res.get("匹配结果", {}).get("语义相似度", 0) * (1.0 if tag_res.get("匹配结果", {}).get("匹配类型") == "标签匹配" else 0.5 if tag_res.get("匹配结果", {}).get("匹配类型") == "分类匹配" else 1.0), "匹配到": tag_res.get("匹配结果", {}).get("匹配到") } for tag_res in tag_results if isinstance(tag_res, dict) ] } valid_results.append(result) # 按组合平均得分降序排序 valid_results.sort( key=lambda x: x.get("组合平均得分", 0), reverse=True ) return valid_results async def process_single_inspiration_point( inspiration_point: Dict, persona_combinations: List[Dict], model_name: Optional[str] = None ) -> Dict: """处理单个灵感点的特征组合匹配""" point_name = inspiration_point.get("名称", "") feature_list = inspiration_point.get("特征列表", []) print(f" 处理灵感点: {point_name}") print(f" 特征列表: {feature_list}") with custom_span( name=f"处理灵感点: {point_name}", data={ "灵感点": point_name, "特征列表": feature_list, "人设组合数量": len(persona_combinations) } ): # 将特征列表与所有人设组合进行匹配 match_results = await match_inspiration_point_with_combinations( current_feature_list=feature_list, persona_combinations=persona_combinations, model_name=model_name ) # 构建完整版 how 步骤 how_step = { "步骤名称": "灵感特征列表统一匹配人设特征组合 (v4)", "当前特征列表": feature_list, "匹配结果": match_results } # 构建精简版 how 步骤(只包含精简结果) how_step_simplified = { "步骤名称": "灵感特征列表统一匹配人设特征组合 (v4) - 精简版", "当前特征列表": feature_list, "匹配结果": [ match.get("精简结果", {}) for match in match_results ] } # 返回更新后的灵感点 result = inspiration_point.copy() result["how步骤列表"] = [how_step] result["how步骤列表_精简版"] = [how_step_simplified] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, persona_combinations: List[Dict], model_name: Optional[str] = None ) -> Dict: """处理单个任务""" post_id = task.get("帖子id", "") print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}") what_result = task.get("what解构结果", {}) inspiration_list = what_result.get("灵感点列表", []) print(f" 灵感点数量: {len(inspiration_list)}") # 并发处理所有灵感点 tasks = [ process_single_inspiration_point( inspiration_point=inspiration_point, persona_combinations=persona_combinations, model_name=model_name ) for inspiration_point in inspiration_list ] updated_inspiration_list = await asyncio.gather(*tasks) # 构建 how 解构结果 how_result = { "灵感点列表": list(updated_inspiration_list) } # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_combinations: List[Dict], model_name: Optional[str] = None, current_time: Optional[str] = None, log_url: Optional[str] = None ) -> List[Dict]: """处理整个解构任务列表(并发执行)""" print(f"人设灵感特征组合数量: {len(persona_combinations)}") with custom_span( name="统一匹配 v4 - 所有任务", data={ "任务总数": len(task_list), "人设组合数量": len(persona_combinations), "current_time": current_time, "log_url": log_url } ): # 并发处理所有任务 tasks = [ process_single_task( task=task, task_index=i, total_tasks=len(task_list), persona_combinations=persona_combinations, model_name=model_name ) for i, task in enumerate(task_list, 1) ] updated_task_list = await asyncio.gather(*tasks) return list(updated_task_list) async def main(current_time: Optional[str] = None, log_url: Optional[str] = None): """主函数""" # 输入输出路径 script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1118" task_list_file = data_dir / "当前帖子_解构任务列表.json" persona_combinations_file = data_dir / "特征组合_帖子来源.json" feature_categories_file = data_dir / "特征名称_分类映射.json" output_dir = data_dir / "当前帖子_how解构结果_v4" # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) # 获取模型名称 from lib.client import MODEL_NAME model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_") print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征组合: {persona_combinations_file}") with open(persona_combinations_file, "r", encoding="utf-8") as f: persona_combinations_data = json.load(f) print(f"读取特征分类映射: {feature_categories_file}") feature_categories = load_feature_categories(feature_categories_file) # 获取任务列表 - 处理所有帖子 task_list = task_list_data.get("解构任务列表", []) print(f"\n总任务数: {len(task_list)}") print(f"使用模型: {MODEL_NAME}\n") # 为人设特征组合添加分类信息(只处理灵感点)- 使用所有组合 persona_inspiration_combinations_raw = persona_combinations_data.get("灵感点", []) persona_inspiration_combinations = enrich_persona_combinations_with_categories( persona_combinations=persona_inspiration_combinations_raw, feature_categories=feature_categories, point_type="灵感点" ) print(f"灵感点特征组合数量: {len(persona_inspiration_combinations)}") print(f"示例组合 (前2个):") for i, combo in enumerate(persona_inspiration_combinations[:2], 1): print(f" {i}. 原始组合: {combo['原始特征组合']}") print(f" 带分类: {combo['特征组合'][:2]}...") # 只显示前2个特征 print() # 处理任务列表 updated_task_list = await process_task_list( task_list=task_list, persona_combinations=persona_inspiration_combinations, model_name=None, current_time=current_time, log_url=log_url ) # 分文件保存结果 print(f"\n保存结果到: {output_dir}") for task in updated_task_list: post_id = task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how_v4_{model_name_short}.json" # 在每个任务中添加元数据 task["元数据"] = { "current_time": current_time, "log_url": log_url, "version": "v4_unified_match", "model": MODEL_NAME, "说明": "v4版本: 使用单个prompt统一完成标签匹配和分类匹配" } print(f" 保存: {output_file.name}") with open(output_file, "w", encoding="utf-8") as f: json.dump(task, f, ensure_ascii=False, indent=4) print("\n完成!") # 打印统计信息 total_inspiration_points = sum( len(task["how解构结果"]["灵感点列表"]) for task in updated_task_list ) total_matches = sum( len(point["how步骤列表"][0]["匹配结果"]) for task in updated_task_list for point in task["how解构结果"]["灵感点列表"] ) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 生成的匹配结果数: {total_matches}") if log_url: print(f"\nTrace: {log_url}\n") if __name__ == "__main__": # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("灵感特征统一匹配 v4"): asyncio.run(main(current_time, log_url))