| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 灵感点特征匹配脚本 v4(统一匹配版本)
- 使用单个prompt同时完成标签匹配和分类匹配,不分步骤执行。
- 一次LLM调用完成所有层级的评估。
- """
- import json
- import asyncio
- from pathlib import Path
- from typing import Dict, List, Optional
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from agents import trace
- from agents.tracing.create import custom_span
- from lib.my_trace import set_trace
- from lib.unified_match_analyzer import unified_match
- # 全局并发限制
- MAX_CONCURRENT_REQUESTS = 20
- semaphore = None
- def get_semaphore():
- """获取全局信号量"""
- global semaphore
- if semaphore is None:
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
- return semaphore
- def load_feature_categories(categories_file: Path) -> Dict:
- """加载特征分类映射"""
- with open(categories_file, "r", encoding="utf-8") as f:
- return json.load(f)
- def enrich_persona_combinations_with_categories(
- persona_combinations: List[Dict],
- feature_categories: Dict,
- point_type: str
- ) -> List[Dict]:
- """为人设特征组合添加分类信息"""
- enriched_combinations = []
- type_categories = feature_categories.get(point_type, {})
- for combo in persona_combinations:
- feature_list = combo.get("特征组合", [])
- # 为每个特征添加分类信息
- enriched_features = []
- for feature_name in feature_list:
- categories = type_categories.get(feature_name, {}).get("所属分类", [])
- enriched_features.append({
- "特征名称": feature_name,
- "所属分类": categories
- })
- enriched_combo = {
- "特征组合": enriched_features,
- "原始特征组合": feature_list,
- "特征来源": combo.get("特征来源", [])
- }
- enriched_combinations.append(enriched_combo)
- return enriched_combinations
- async def match_tag_list_with_combination(
- current_tag_list: List[str],
- persona_combination: Dict,
- model_name: Optional[str] = None
- ) -> Dict:
- """
- 使用统一匹配将当前点的标签列表与一个人设历史组合进行匹配
- 一次LLM调用完成标签匹配和分类匹配的评估
- Returns:
- {
- "人设标签组合": [...],
- "当前标签匹配结果": [
- {"当前标签": "立冬", "最终得分": 0.7, "匹配层级": "...", ...},
- {"当前标签": "教资查分", "最终得分": 0.6, ...},
- ...
- ],
- "人设标签来源": [...]
- }
- """
- sem = get_semaphore()
- async with sem:
- # 调用统一匹配模块(返回每个当前标签的匹配结果)
- tag_match_results = await unified_match(
- current_tags=current_tag_list,
- persona_combination=persona_combination["特征组合"],
- model_name=model_name
- )
- # 构建返回结果
- result = {
- "人设标签组合": persona_combination["原始特征组合"],
- "当前标签匹配结果": tag_match_results, # 每个当前标签的匹配结果
- "人设标签来源": persona_combination["特征来源"]
- }
- return result
- async def match_inspiration_point_with_combinations(
- current_feature_list: List[str],
- persona_combinations: List[Dict],
- model_name: Optional[str] = None
- ) -> List[Dict]:
- """将当前点的特征列表与所有人设特征组合进行匹配"""
- print(f" 批量匹配: 当前{len(current_feature_list)}个标签 {current_feature_list} vs {len(persona_combinations)}个人设组合")
- # 并发匹配所有组合
- tasks = [
- match_tag_list_with_combination(
- current_tag_list=current_feature_list,
- persona_combination=combo,
- model_name=model_name
- )
- for combo in persona_combinations
- ]
- match_results = await asyncio.gather(*tasks)
- # 过滤和修复无效结果
- valid_results = []
- for result in match_results:
- # 确保result是dict
- if not isinstance(result, dict):
- print(f"警告: 跳过无效结果 (不是字典): {type(result)}")
- continue
- # 确保有当前标签匹配结果字段
- tag_results = result.get("当前标签匹配结果")
- if tag_results is None:
- print(f"警告: 结果缺少当前标签匹配结果字段")
- continue
- # 确保当前标签匹配结果是list
- if not isinstance(tag_results, list):
- print(f"警告: 当前标签匹配结果不是列表: {type(tag_results)}")
- continue
- # 计算该人设组合的加权平均得分
- weighted_scores = []
- for tag_result in tag_results:
- if isinstance(tag_result, dict):
- match_result = tag_result.get("匹配结果", {})
- match_type = match_result.get("匹配类型")
- similarity = match_result.get("语义相似度", 0)
- # 根据匹配类型设置权重
- if match_type == "标签匹配":
- weight = 1.0
- elif match_type == "分类匹配":
- weight = 0.5
- else: # 无匹配
- weight = 1.0 # 无匹配也使用1.0权重,因为相似度已经是0
- weighted_score = similarity * weight
- weighted_scores.append(weighted_score)
- avg_score = sum(weighted_scores) / len(weighted_scores) if weighted_scores else 0
- result["组合平均得分"] = avg_score
- # 添加精简结果字段
- result["精简结果"] = {
- "人设标签组合": result.get("人设标签组合", []),
- "组合平均得分": avg_score,
- "各标签得分": [
- {
- "标签": tag_res.get("当前标签"),
- "原始相似度": tag_res.get("匹配结果", {}).get("语义相似度", 0),
- "匹配类型": tag_res.get("匹配结果", {}).get("匹配类型"),
- "权重": 1.0 if tag_res.get("匹配结果", {}).get("匹配类型") == "标签匹配" else 0.5 if tag_res.get("匹配结果", {}).get("匹配类型") == "分类匹配" else 1.0,
- "加权得分": tag_res.get("匹配结果", {}).get("语义相似度", 0) * (1.0 if tag_res.get("匹配结果", {}).get("匹配类型") == "标签匹配" else 0.5 if tag_res.get("匹配结果", {}).get("匹配类型") == "分类匹配" else 1.0),
- "匹配到": tag_res.get("匹配结果", {}).get("匹配到")
- }
- for tag_res in tag_results if isinstance(tag_res, dict)
- ]
- }
- valid_results.append(result)
- # 按组合平均得分降序排序
- valid_results.sort(
- key=lambda x: x.get("组合平均得分", 0),
- reverse=True
- )
- return valid_results
- async def process_single_inspiration_point(
- inspiration_point: Dict,
- persona_combinations: List[Dict],
- model_name: Optional[str] = None
- ) -> Dict:
- """处理单个灵感点的特征组合匹配"""
- point_name = inspiration_point.get("名称", "")
- feature_list = inspiration_point.get("特征列表", [])
- print(f" 处理灵感点: {point_name}")
- print(f" 特征列表: {feature_list}")
- with custom_span(
- name=f"处理灵感点: {point_name}",
- data={
- "灵感点": point_name,
- "特征列表": feature_list,
- "人设组合数量": len(persona_combinations)
- }
- ):
- # 将特征列表与所有人设组合进行匹配
- match_results = await match_inspiration_point_with_combinations(
- current_feature_list=feature_list,
- persona_combinations=persona_combinations,
- model_name=model_name
- )
- # 构建完整版 how 步骤
- how_step = {
- "步骤名称": "灵感特征列表统一匹配人设特征组合 (v4)",
- "当前特征列表": feature_list,
- "匹配结果": match_results
- }
- # 构建精简版 how 步骤(只包含精简结果)
- how_step_simplified = {
- "步骤名称": "灵感特征列表统一匹配人设特征组合 (v4) - 精简版",
- "当前特征列表": feature_list,
- "匹配结果": [
- match.get("精简结果", {})
- for match in match_results
- ]
- }
- # 返回更新后的灵感点
- result = inspiration_point.copy()
- result["how步骤列表"] = [how_step]
- result["how步骤列表_精简版"] = [how_step_simplified]
- return result
- async def process_single_task(
- task: Dict,
- task_index: int,
- total_tasks: int,
- persona_combinations: List[Dict],
- model_name: Optional[str] = None
- ) -> Dict:
- """处理单个任务"""
- post_id = task.get("帖子id", "")
- print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}")
- what_result = task.get("what解构结果", {})
- inspiration_list = what_result.get("灵感点列表", [])
- print(f" 灵感点数量: {len(inspiration_list)}")
- # 并发处理所有灵感点
- tasks = [
- process_single_inspiration_point(
- inspiration_point=inspiration_point,
- persona_combinations=persona_combinations,
- model_name=model_name
- )
- for inspiration_point in inspiration_list
- ]
- updated_inspiration_list = await asyncio.gather(*tasks)
- # 构建 how 解构结果
- how_result = {
- "灵感点列表": list(updated_inspiration_list)
- }
- # 更新任务
- updated_task = task.copy()
- updated_task["how解构结果"] = how_result
- return updated_task
- async def process_task_list(
- task_list: List[Dict],
- persona_combinations: List[Dict],
- model_name: Optional[str] = None,
- current_time: Optional[str] = None,
- log_url: Optional[str] = None
- ) -> List[Dict]:
- """处理整个解构任务列表(并发执行)"""
- print(f"人设灵感特征组合数量: {len(persona_combinations)}")
- with custom_span(
- name="统一匹配 v4 - 所有任务",
- data={
- "任务总数": len(task_list),
- "人设组合数量": len(persona_combinations),
- "current_time": current_time,
- "log_url": log_url
- }
- ):
- # 并发处理所有任务
- tasks = [
- process_single_task(
- task=task,
- task_index=i,
- total_tasks=len(task_list),
- persona_combinations=persona_combinations,
- model_name=model_name
- )
- for i, task in enumerate(task_list, 1)
- ]
- updated_task_list = await asyncio.gather(*tasks)
- return list(updated_task_list)
- async def main(current_time: Optional[str] = None, log_url: Optional[str] = None):
- """主函数"""
- # 输入输出路径
- script_dir = Path(__file__).parent
- project_root = script_dir.parent.parent
- data_dir = project_root / "data" / "data_1118"
- task_list_file = data_dir / "当前帖子_解构任务列表.json"
- persona_combinations_file = data_dir / "特征组合_帖子来源.json"
- feature_categories_file = data_dir / "特征名称_分类映射.json"
- output_dir = data_dir / "当前帖子_how解构结果_v4"
- # 创建输出目录
- output_dir.mkdir(parents=True, exist_ok=True)
- # 获取模型名称
- from lib.client import MODEL_NAME
- model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_")
- print(f"读取解构任务列表: {task_list_file}")
- with open(task_list_file, "r", encoding="utf-8") as f:
- task_list_data = json.load(f)
- print(f"读取人设特征组合: {persona_combinations_file}")
- with open(persona_combinations_file, "r", encoding="utf-8") as f:
- persona_combinations_data = json.load(f)
- print(f"读取特征分类映射: {feature_categories_file}")
- feature_categories = load_feature_categories(feature_categories_file)
- # 获取任务列表 - 处理所有帖子
- task_list = task_list_data.get("解构任务列表", [])
- print(f"\n总任务数: {len(task_list)}")
- print(f"使用模型: {MODEL_NAME}\n")
- # 为人设特征组合添加分类信息(只处理灵感点)- 使用所有组合
- persona_inspiration_combinations_raw = persona_combinations_data.get("灵感点", [])
- persona_inspiration_combinations = enrich_persona_combinations_with_categories(
- persona_combinations=persona_inspiration_combinations_raw,
- feature_categories=feature_categories,
- point_type="灵感点"
- )
- print(f"灵感点特征组合数量: {len(persona_inspiration_combinations)}")
- print(f"示例组合 (前2个):")
- for i, combo in enumerate(persona_inspiration_combinations[:2], 1):
- print(f" {i}. 原始组合: {combo['原始特征组合']}")
- print(f" 带分类: {combo['特征组合'][:2]}...") # 只显示前2个特征
- print()
- # 处理任务列表
- updated_task_list = await process_task_list(
- task_list=task_list,
- persona_combinations=persona_inspiration_combinations,
- model_name=None,
- current_time=current_time,
- log_url=log_url
- )
- # 分文件保存结果
- print(f"\n保存结果到: {output_dir}")
- for task in updated_task_list:
- post_id = task.get("帖子id", "unknown")
- output_file = output_dir / f"{post_id}_how_v4_{model_name_short}.json"
- # 在每个任务中添加元数据
- task["元数据"] = {
- "current_time": current_time,
- "log_url": log_url,
- "version": "v4_unified_match",
- "model": MODEL_NAME,
- "说明": "v4版本: 使用单个prompt统一完成标签匹配和分类匹配"
- }
- print(f" 保存: {output_file.name}")
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(task, f, ensure_ascii=False, indent=4)
- print("\n完成!")
- # 打印统计信息
- total_inspiration_points = sum(
- len(task["how解构结果"]["灵感点列表"])
- for task in updated_task_list
- )
- total_matches = sum(
- len(point["how步骤列表"][0]["匹配结果"])
- for task in updated_task_list
- for point in task["how解构结果"]["灵感点列表"]
- )
- print(f"\n统计:")
- print(f" 处理的帖子数: {len(updated_task_list)}")
- print(f" 处理的灵感点数: {total_inspiration_points}")
- print(f" 生成的匹配结果数: {total_matches}")
- if log_url:
- print(f"\nTrace: {log_url}\n")
- if __name__ == "__main__":
- # 设置 trace
- current_time, log_url = set_trace()
- # 使用 trace 上下文包裹整个执行流程
- with trace("灵感特征统一匹配 v4"):
- asyncio.run(main(current_time, log_url))
|