| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 灵感点特征匹配脚本 v3(特征组合匹配版本)
- 从解构任务列表中提取灵感点的特征列表,与人设历史的特征组合进行匹配。
- 匹配时考虑组合中每个特征的分类信息,使用待设计的组合匹配模块。
- """
- import json
- import asyncio
- from pathlib import Path
- from typing import Dict, List, Optional
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from agents import trace
- from agents.tracing.create import custom_span
- from lib.my_trace import set_trace
- from lib.hierarchical_match_analyzer import hierarchical_match
- # 全局并发限制
- MAX_CONCURRENT_REQUESTS = 20
- semaphore = None
- def get_semaphore():
- """获取全局信号量"""
- global semaphore
- if semaphore is None:
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
- return semaphore
- def load_feature_categories(categories_file: Path) -> Dict:
- """
- 加载特征分类映射
- Args:
- categories_file: 特征名称_分类映射.json 文件路径
- Returns:
- 特征分类字典
- """
- with open(categories_file, "r", encoding="utf-8") as f:
- return json.load(f)
- def enrich_persona_combinations_with_categories(
- persona_combinations: List[Dict],
- feature_categories: Dict,
- point_type: str
- ) -> List[Dict]:
- """
- 为人设特征组合添加分类信息
- Args:
- persona_combinations: 人设特征组合列表
- feature_categories: 特征分类映射字典
- point_type: 点类型 ("灵感点", "目的点", "关键点")
- Returns:
- enriched_combinations: 增强后的组合列表,每个组合包含特征及其分类
- """
- enriched_combinations = []
- # 获取该点类型的分类映射
- type_categories = feature_categories.get(point_type, {})
- for combo in persona_combinations:
- feature_list = combo.get("特征组合", [])
- # 为每个特征添加分类信息
- enriched_features = []
- for feature_name in feature_list:
- categories = type_categories.get(feature_name, {}).get("所属分类", [])
- enriched_features.append({
- "特征名称": feature_name,
- "所属分类": categories
- })
- # 构建增强后的组合
- enriched_combo = {
- "特征组合": enriched_features, # 带分类的特征列表
- "原始特征组合": feature_list, # 保留原始特征名称列表
- "特征来源": combo.get("特征来源", [])
- }
- enriched_combinations.append(enriched_combo)
- return enriched_combinations
- async def match_feature_list_with_combination(
- current_feature_list: List[str],
- persona_combination: Dict,
- model_name: Optional[str] = None
- ) -> Dict:
- """
- 将当前点的特征列表与一个人设历史组合进行分层匹配
- 使用分层匹配策略:
- 1. 优先匹配灵感点标签(特征名称)
- 2. 无标签匹配时,匹配第一层分类
- 3. 仍无结果时,匹配第二层上位分类
- 4. 对每个候选进行推理难度打分
- Args:
- current_feature_list: 当前点的特征列表,如 ["立冬", "教资查分", "时间巧合"]
- persona_combination: 人设历史组合(带分类信息),格式如:
- {
- "特征组合": [
- {"特征名称": "猫孩子", "所属分类": ["宠物亲子化", "宠物情感", "实质"]},
- {"特征名称": "被拿捏住的无奈感", "所属分类": ["宠物关系主导", "宠物情感", "实质"]}
- ],
- "原始特征组合": ["猫孩子", "被拿捏住的无奈感"],
- "特征来源": [...]
- }
- model_name: 使用的模型名称
- Returns:
- {
- "人设特征组合": [...],
- "匹配结果": {
- "最终得分": 0.85,
- "匹配层级": "第一层分类匹配",
- "匹配结果": "宠物情感",
- "综合说明": "...",
- "分层详情": {...}
- },
- "人设特征来源": [...]
- }
- """
- sem = get_semaphore()
- async with sem:
- # 调用分层匹配模块
- match_result = await hierarchical_match(
- current_features=current_feature_list,
- persona_combination=persona_combination["特征组合"],
- model_name=model_name
- )
- # 构建返回结果
- result = {
- "人设特征组合": persona_combination["原始特征组合"],
- "匹配结果": {
- "最终得分": match_result["最终得分"],
- "匹配层级": match_result["匹配层级"],
- "匹配结果": match_result["匹配结果"],
- "综合说明": match_result["综合说明"],
- "分层详情": match_result["分层结果"]
- },
- "人设特征来源": persona_combination["特征来源"]
- }
- return result
- async def match_inspiration_point_with_combinations(
- current_feature_list: List[str],
- persona_combinations: List[Dict],
- model_name: Optional[str] = None
- ) -> List[Dict]:
- """
- 将当前点的特征列表与所有人设特征组合进行匹配
- Args:
- current_feature_list: 当前点的特征列表
- persona_combinations: 人设特征组合列表(已包含分类信息)
- model_name: 使用的模型名称
- Returns:
- 匹配结果列表(按分数降序排序)
- """
- print(f" 批量匹配: {current_feature_list} <-> {len(persona_combinations)}个人设特征组合")
- # 并发匹配所有组合
- tasks = [
- match_feature_list_with_combination(
- current_feature_list=current_feature_list,
- persona_combination=combo,
- model_name=model_name
- )
- for combo in persona_combinations
- ]
- match_results = await asyncio.gather(*tasks)
- # 按最终得分降序排序
- match_results.sort(key=lambda x: x["匹配结果"]["最终得分"], reverse=True)
- return match_results
- async def process_single_inspiration_point(
- inspiration_point: Dict,
- persona_combinations: List[Dict],
- model_name: Optional[str] = None
- ) -> Dict:
- """
- 处理单个灵感点的特征组合匹配
- Args:
- inspiration_point: 灵感点数据,包含特征列表
- persona_combinations: 人设特征组合列表(已包含分类信息)
- model_name: 使用的模型名称
- Returns:
- 包含 how 步骤列表的灵感点数据
- """
- point_name = inspiration_point.get("名称", "")
- feature_list = inspiration_point.get("特征列表", [])
- print(f" 处理灵感点: {point_name}")
- print(f" 特征列表: {feature_list}")
- # 使用 custom_span 标识灵感点处理
- with custom_span(
- name=f"处理灵感点: {point_name}",
- data={
- "灵感点": point_name,
- "特征列表": feature_list,
- "人设组合数量": len(persona_combinations)
- }
- ):
- # 将特征列表与所有人设组合进行匹配
- match_results = await match_inspiration_point_with_combinations(
- current_feature_list=feature_list,
- persona_combinations=persona_combinations,
- model_name=model_name
- )
- # 构建 how 步骤
- how_step = {
- "步骤名称": "灵感特征列表批量匹配人设特征组合",
- "当前特征列表": feature_list,
- "匹配结果": match_results
- }
- # 返回更新后的灵感点
- result = inspiration_point.copy()
- result["how步骤列表"] = [how_step]
- return result
- async def process_single_task(
- task: Dict,
- task_index: int,
- total_tasks: int,
- persona_combinations: List[Dict],
- model_name: Optional[str] = None
- ) -> Dict:
- """
- 处理单个任务
- Args:
- task: 任务数据
- task_index: 任务索引(从1开始)
- total_tasks: 总任务数
- persona_combinations: 人设特征组合列表(已包含分类信息)
- model_name: 使用的模型名称
- Returns:
- 包含 how 解构结果的任务
- """
- post_id = task.get("帖子id", "")
- print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}")
- # 获取灵感点列表
- what_result = task.get("what解构结果", {})
- inspiration_list = what_result.get("灵感点列表", [])
- print(f" 灵感点数量: {len(inspiration_list)}")
- # 并发处理所有灵感点
- tasks = [
- process_single_inspiration_point(
- inspiration_point=inspiration_point,
- persona_combinations=persona_combinations,
- model_name=model_name
- )
- for inspiration_point in inspiration_list
- ]
- updated_inspiration_list = await asyncio.gather(*tasks)
- # 构建 how 解构结果
- how_result = {
- "灵感点列表": list(updated_inspiration_list)
- }
- # 更新任务
- updated_task = task.copy()
- updated_task["how解构结果"] = how_result
- return updated_task
- async def process_task_list(
- task_list: List[Dict],
- persona_combinations: List[Dict],
- model_name: Optional[str] = None,
- current_time: Optional[str] = None,
- log_url: Optional[str] = None
- ) -> List[Dict]:
- """
- 处理整个解构任务列表(并发执行)
- Args:
- task_list: 解构任务列表
- persona_combinations: 人设特征组合列表(已包含分类信息)
- model_name: 使用的模型名称
- current_time: 当前时间戳
- log_url: 日志链接
- Returns:
- 包含 how 解构结果的任务列表
- """
- print(f"人设灵感特征组合数量: {len(persona_combinations)}")
- # 使用 custom_span 标识整个处理流程
- with custom_span(
- name="特征组合批量匹配 v3 - 所有任务",
- data={
- "任务总数": len(task_list),
- "人设组合数量": len(persona_combinations),
- "current_time": current_time,
- "log_url": log_url
- }
- ):
- # 并发处理所有任务
- tasks = [
- process_single_task(
- task=task,
- task_index=i,
- total_tasks=len(task_list),
- persona_combinations=persona_combinations,
- model_name=model_name
- )
- for i, task in enumerate(task_list, 1)
- ]
- updated_task_list = await asyncio.gather(*tasks)
- return list(updated_task_list)
- async def main(current_time: Optional[str] = None, log_url: Optional[str] = None):
- """主函数
- Args:
- current_time: 当前时间戳(从外部传入)
- log_url: 日志链接(从外部传入)
- """
- # 输入输出路径
- script_dir = Path(__file__).parent
- project_root = script_dir.parent.parent
- data_dir = project_root / "data" / "data_1118"
- task_list_file = data_dir / "当前帖子_解构任务列表.json"
- persona_combinations_file = data_dir / "特征组合_帖子来源.json"
- feature_categories_file = data_dir / "特征名称_分类映射.json"
- output_dir = data_dir / "当前帖子_how解构结果_v3"
- # 创建输出目录
- output_dir.mkdir(parents=True, exist_ok=True)
- # 获取模型名称
- from lib.client import MODEL_NAME
- model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_")
- print(f"读取解构任务列表: {task_list_file}")
- with open(task_list_file, "r", encoding="utf-8") as f:
- task_list_data = json.load(f)
- print(f"读取人设特征组合: {persona_combinations_file}")
- with open(persona_combinations_file, "r", encoding="utf-8") as f:
- persona_combinations_data = json.load(f)
- print(f"读取特征分类映射: {feature_categories_file}")
- feature_categories = load_feature_categories(feature_categories_file)
- # 获取任务列表
- task_list = task_list_data.get("解构任务列表", [])
- print(f"\n总任务数: {len(task_list)}")
- print(f"使用模型: {MODEL_NAME}\n")
- # 为人设特征组合添加分类信息(只处理灵感点)
- persona_inspiration_combinations_raw = persona_combinations_data.get("灵感点", [])
- persona_inspiration_combinations = enrich_persona_combinations_with_categories(
- persona_combinations=persona_inspiration_combinations_raw,
- feature_categories=feature_categories,
- point_type="灵感点"
- )
- print(f"灵感点特征组合数量: {len(persona_inspiration_combinations)}")
- print(f"示例组合 (前3个):")
- for i, combo in enumerate(persona_inspiration_combinations[:3], 1):
- print(f" {i}. 原始组合: {combo['原始特征组合']}")
- print(f" 带分类: {combo['特征组合']}")
- print()
- # 处理任务列表
- updated_task_list = await process_task_list(
- task_list=task_list,
- persona_combinations=persona_inspiration_combinations,
- model_name=None, # 使用默认模型
- current_time=current_time,
- log_url=log_url
- )
- # 分文件保存结果
- print(f"\n保存结果到: {output_dir}")
- for task in updated_task_list:
- post_id = task.get("帖子id", "unknown")
- output_file = output_dir / f"{post_id}_how_v3_{model_name_short}.json"
- # 在每个任务中添加元数据
- task["元数据"] = {
- "current_time": current_time,
- "log_url": log_url,
- "version": "v3_combination_match",
- "model": MODEL_NAME,
- "说明": "v3版本: 使用特征列表匹配人设特征组合(带分类信息)"
- }
- print(f" 保存: {output_file.name}")
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(task, f, ensure_ascii=False, indent=4)
- print("\n完成!")
- # 打印统计信息
- total_inspiration_points = sum(
- len(task["how解构结果"]["灵感点列表"])
- for task in updated_task_list
- )
- total_matches = sum(
- len(point["how步骤列表"][0]["匹配结果"])
- for task in updated_task_list
- for point in task["how解构结果"]["灵感点列表"]
- )
- print(f"\n统计:")
- print(f" 处理的帖子数: {len(updated_task_list)}")
- print(f" 处理的灵感点数: {total_inspiration_points}")
- print(f" 生成的匹配结果数: {total_matches}")
- if log_url:
- print(f"\nTrace: {log_url}\n")
- if __name__ == "__main__":
- # 设置 trace
- current_time, log_url = set_trace()
- # 使用 trace 上下文包裹整个执行流程
- with trace("灵感特征组合批量匹配 v3"):
- asyncio.run(main(current_time, log_url))
|