| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 灵感点特征匹配脚本
- 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配,
- 使用 relation_analyzer 模块分析特征之间的语义关系。
- """
- import json
- import asyncio
- from pathlib import Path
- from typing import Dict, List
- import sys
- from datetime import datetime
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from lib.semantic_similarity import compare_phrases
- # 全局并发限制
- MAX_CONCURRENT_REQUESTS = 100
- semaphore = None
- # 进度跟踪
- class ProgressTracker:
- """进度跟踪器"""
- def __init__(self, total: int):
- self.total = total
- self.completed = 0
- self.start_time = datetime.now()
- self.last_update_time = datetime.now()
- self.last_completed = 0
- def update(self, count: int = 1):
- """更新进度"""
- self.completed += count
- current_time = datetime.now()
- # 每秒最多更新一次,或者达到总数时更新
- if (current_time - self.last_update_time).total_seconds() >= 1.0 or self.completed >= self.total:
- self.display()
- self.last_update_time = current_time
- self.last_completed = self.completed
- def display(self):
- """显示进度"""
- if self.total == 0:
- return
- percentage = (self.completed / self.total) * 100
- elapsed = (datetime.now() - self.start_time).total_seconds()
- # 计算速度和预估剩余时间
- if elapsed > 0:
- speed = self.completed / elapsed
- if speed > 0:
- remaining = (self.total - self.completed) / speed
- eta_str = f", 预计剩余: {int(remaining)}秒"
- else:
- eta_str = ""
- else:
- eta_str = ""
- bar_length = 40
- filled_length = int(bar_length * self.completed / self.total)
- bar = '█' * filled_length + '░' * (bar_length - filled_length)
- print(f"\r 进度: [{bar}] {self.completed}/{self.total} ({percentage:.1f}%){eta_str}", end='', flush=True)
- # 完成时换行
- if self.completed >= self.total:
- print()
- # 全局进度跟踪器
- progress_tracker = None
- def get_semaphore():
- """获取全局信号量"""
- global semaphore
- if semaphore is None:
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
- return semaphore
- async def match_single_pair(
- feature_name: str,
- persona_name: str,
- persona_feature_level: str,
- category_mapping: Dict = None,
- model_name: str = None
- ) -> Dict:
- """
- 匹配单个特征对(带并发限制)
- Args:
- feature_name: 要匹配的特征名称
- persona_name: 人设特征名称
- persona_feature_level: 人设特征层级(灵感点/关键点/目的点)
- category_mapping: 特征分类映射字典
- model_name: 使用的模型名称
- Returns:
- 单个匹配结果,格式:
- {
- "人设特征名称": "xxx",
- "人设特征层级": "灵感点",
- "特征类型": "标签",
- "特征分类": ["分类1", "分类2"],
- "匹配结果": {
- "相似度": 0.75,
- "说明": "..."
- }
- }
- """
- global progress_tracker
- sem = get_semaphore()
- async with sem:
- similarity_result = await compare_phrases(
- phrase_a=feature_name,
- phrase_b=persona_name,
- )
- # 更新进度
- if progress_tracker:
- progress_tracker.update(1)
- # 判断该特征是标签还是分类
- feature_type = "分类" # 默认为分类
- categories = []
- if category_mapping:
- # 先在标签特征中查找(灵感点、关键点、目的点)
- is_tag_feature = False
- for ft in ["灵感点", "关键点", "目的点"]:
- if ft in category_mapping:
- type_mapping = category_mapping[ft]
- if persona_name in type_mapping:
- # 找到了,说明是标签特征
- feature_type = "标签"
- categories = type_mapping[persona_name].get("所属分类", [])
- is_tag_feature = True
- break
- # 如果不是标签特征,检查是否是分类特征
- if not is_tag_feature:
- # 收集所有分类
- all_categories = set()
- for ft in ["灵感点", "关键点", "目的点"]:
- if ft in category_mapping:
- for fname, fdata in category_mapping[ft].items():
- cats = fdata.get("所属分类", [])
- all_categories.update(cats)
- # 如果当前特征名在分类列表中,则是分类特征
- if persona_name in all_categories:
- feature_type = "分类"
- categories = [] # 分类特征本身没有所属分类
- # 去重分类
- unique_categories = list(dict.fromkeys(categories))
- return {
- "人设特征名称": persona_name,
- "人设特征层级": persona_feature_level,
- "特征类型": feature_type,
- "特征分类": unique_categories,
- "匹配结果": similarity_result
- }
- async def match_feature_with_persona(
- feature_name: str,
- persona_features: List[Dict],
- category_mapping: Dict = None,
- model_name: str = None
- ) -> List[Dict]:
- """
- 将一个特征与人设特征列表进行匹配(并发执行)
- Args:
- feature_name: 要匹配的特征名称
- persona_features: 人设特征列表(包含"特征名称"和"人设特征层级")
- category_mapping: 特征分类映射字典
- model_name: 使用的模型名称
- Returns:
- 匹配结果列表
- """
- # 创建所有匹配任务
- tasks = [
- match_single_pair(
- feature_name,
- persona_feature["特征名称"],
- persona_feature["人设特征层级"],
- category_mapping,
- model_name
- )
- for persona_feature in persona_features
- ]
- # 并发执行所有匹配
- match_results = await asyncio.gather(*tasks)
- return list(match_results)
- async def match_single_feature(
- feature_item: Dict,
- persona_features: List[Dict],
- category_mapping: Dict = None,
- model_name: str = None
- ) -> Dict:
- """
- 匹配单个特征与所有人设特征
- Args:
- feature_item: 特征信息(包含"特征名称"和"权重")
- persona_features: 人设特征列表
- category_mapping: 特征分类映射字典
- model_name: 使用的模型名称
- Returns:
- 特征匹配结果
- """
- feature_name = feature_item.get("特征名称", "")
- feature_weight = feature_item.get("权重", 1.0)
- match_results = await match_feature_with_persona(
- feature_name=feature_name,
- persona_features=persona_features,
- category_mapping=category_mapping,
- model_name=model_name
- )
- return {
- "特征名称": feature_name,
- "权重": feature_weight,
- "匹配结果": match_results
- }
- async def process_single_point(
- point: Dict,
- point_type: str,
- persona_features: List[Dict],
- category_mapping: Dict = None,
- model_name: str = None
- ) -> Dict:
- """
- 处理单个点(灵感点/关键点/目的点)的特征匹配(并发执行)
- Args:
- point: 点数据(灵感点/关键点/目的点)
- point_type: 点类型("灵感点"/"关键点"/"目的点")
- persona_features: 人设特征列表
- category_mapping: 特征分类映射字典
- model_name: 使用的模型名称
- Returns:
- 包含 how 步骤列表的点数据
- """
- point_name = point.get("名称", "")
- feature_list = point.get("特征列表", [])
- # 并发匹配所有特征
- tasks = [
- match_single_feature(feature_item, persona_features, category_mapping, model_name)
- for feature_item in feature_list
- ]
- feature_match_results = await asyncio.gather(*tasks)
- # 构建 how 步骤(根据点类型生成步骤名称)
- step_name_mapping = {
- "灵感点": "灵感特征分别匹配人设特征",
- "关键点": "关键特征分别匹配人设特征",
- "目的点": "目的特征分别匹配人设特征"
- }
- how_step = {
- "步骤名称": step_name_mapping.get(point_type, f"{point_type}特征分别匹配人设特征"),
- "特征列表": list(feature_match_results)
- }
- # 返回更新后的点
- result = point.copy()
- result["how步骤列表"] = [how_step]
- return result
- async def process_single_task(
- task: Dict,
- task_index: int,
- total_tasks: int,
- all_persona_features: List[Dict],
- category_mapping: Dict = None,
- model_name: str = None
- ) -> Dict:
- """
- 处理单个任务
- Args:
- task: 任务数据
- task_index: 任务索引(从1开始)
- total_tasks: 总任务数
- all_persona_features: 所有人设特征列表(包含三种层级)
- category_mapping: 特征分类映射字典
- model_name: 使用的模型名称
- Returns:
- 包含 how 解构结果的任务
- """
- post_id = task.get("帖子id", "")
- print(f"\n[{task_index}/{total_tasks}] 处理帖子: {post_id}")
- # 获取 what 解构结果
- what_result = task.get("what解构结果", {})
- # 构建 how 解构结果
- how_result = {}
- # 处理灵感点、关键点和目的点
- for point_type in ["灵感点", "关键点", "目的点"]:
- point_list_key = f"{point_type}列表"
- point_list = what_result.get(point_list_key, [])
- if point_list:
- # 并发处理所有点
- tasks = [
- process_single_point(
- point=point,
- point_type=point_type,
- persona_features=all_persona_features,
- category_mapping=category_mapping,
- model_name=model_name
- )
- for point in point_list
- ]
- updated_point_list = await asyncio.gather(*tasks)
- # 添加到 how 解构结果
- how_result[point_list_key] = list(updated_point_list)
- # 更新任务
- updated_task = task.copy()
- updated_task["how解构结果"] = how_result
- return updated_task
- async def process_task_list(
- task_list: List[Dict],
- persona_features_dict: Dict,
- category_mapping: Dict = None,
- model_name: str = None
- ) -> List[Dict]:
- """
- 处理整个解构任务列表(并发执行)
- Args:
- task_list: 解构任务列表
- persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点)
- category_mapping: 特征分类映射字典
- model_name: 使用的模型名称
- Returns:
- 包含 how 解构结果的任务列表
- """
- global progress_tracker
- # 合并三种人设特征(灵感点、关键点、目的点)
- all_features = []
- for feature_type in ["灵感点", "关键点", "目的点"]:
- # 获取该类型的标签特征
- type_features = persona_features_dict.get(feature_type, [])
- # 为每个特征添加层级信息
- for feature in type_features:
- feature_with_level = feature.copy()
- feature_with_level["人设特征层级"] = feature_type
- all_features.append(feature_with_level)
- print(f"人设{feature_type}标签特征数量: {len(type_features)}")
- # 从分类映射中提取该类型的分类特征
- if category_mapping and feature_type in category_mapping:
- type_categories = set()
- for _, feature_data in category_mapping[feature_type].items():
- categories = feature_data.get("所属分类", [])
- type_categories.update(categories)
- # 转换为特征格式并添加层级信息
- for cat in sorted(type_categories):
- all_features.append({
- "特征名称": cat,
- "人设特征层级": feature_type
- })
- print(f"人设{feature_type}分类特征数量: {len(type_categories)}")
- print(f"总特征数量(三种类型的标签+分类): {len(all_features)}")
- # 计算总匹配任务数(灵感点、关键点和目的点)
- total_match_count = 0
- for task in task_list:
- what_result = task.get("what解构结果", {})
- for point_type in ["灵感点", "关键点", "目的点"]:
- point_list = what_result.get(f"{point_type}列表", [])
- for point in point_list:
- feature_count = len(point.get("特征列表", []))
- total_match_count += feature_count * len(all_features)
- print(f"处理灵感点、关键点和目的点特征")
- print(f"总匹配任务数: {total_match_count:,}")
- print()
- # 初始化全局进度跟踪器
- progress_tracker = ProgressTracker(total_match_count)
- # 并发处理所有任务
- tasks = [
- process_single_task(
- task=task,
- task_index=i,
- total_tasks=len(task_list),
- all_persona_features=all_features,
- category_mapping=category_mapping,
- model_name=model_name
- )
- for i, task in enumerate(task_list, 1)
- ]
- updated_task_list = await asyncio.gather(*tasks)
- return list(updated_task_list)
- async def main():
- """主函数"""
- # 输入输出路径
- script_dir = Path(__file__).parent
- project_root = script_dir.parent.parent
- data_dir = project_root / "data" / "data_1118"
- task_list_file = data_dir / "当前帖子_解构任务列表.json"
- persona_features_file = data_dir / "特征名称_帖子来源.json"
- category_mapping_file = data_dir / "特征名称_分类映射.json"
- output_dir = data_dir / "当前帖子_how解构结果"
- # 创建输出目录
- output_dir.mkdir(parents=True, exist_ok=True)
- print(f"读取解构任务列表: {task_list_file}")
- with open(task_list_file, "r", encoding="utf-8") as f:
- task_list_data = json.load(f)
- print(f"读取人设特征: {persona_features_file}")
- with open(persona_features_file, "r", encoding="utf-8") as f:
- persona_features_data = json.load(f)
- print(f"读取特征分类映射: {category_mapping_file}")
- with open(category_mapping_file, "r", encoding="utf-8") as f:
- category_mapping = json.load(f)
- # 获取任务列表
- task_list = task_list_data.get("解构任务列表", [])
- print(f"\n总任务数: {len(task_list)}")
- # 处理任务列表
- updated_task_list = await process_task_list(
- task_list=task_list,
- persona_features_dict=persona_features_data,
- category_mapping=category_mapping,
- model_name=None # 使用默认模型
- )
- # 分文件保存结果
- print(f"\n保存结果到: {output_dir}")
- for task in updated_task_list:
- post_id = task.get("帖子id", "unknown")
- output_file = output_dir / f"{post_id}_how.json"
- print(f" 保存: {output_file.name}")
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(task, f, ensure_ascii=False, indent=4)
- print("\n完成!")
- # 打印统计信息
- total_inspiration_points = 0
- total_key_points = 0
- total_purpose_points = 0
- total_inspiration_features = 0
- total_key_features = 0
- total_purpose_features = 0
- for task in updated_task_list:
- how_result = task.get("how解构结果", {})
- # 统计灵感点
- inspiration_list = how_result.get("灵感点列表", [])
- total_inspiration_points += len(inspiration_list)
- for point in inspiration_list:
- total_inspiration_features += len(point.get("特征列表", []))
- # 统计关键点
- key_list = how_result.get("关键点列表", [])
- total_key_points += len(key_list)
- for point in key_list:
- total_key_features += len(point.get("特征列表", []))
- # 统计目的点
- purpose_list = how_result.get("目的点列表", [])
- total_purpose_points += len(purpose_list)
- for point in purpose_list:
- total_purpose_features += len(point.get("特征列表", []))
- print(f"\n统计:")
- print(f" 处理的帖子数: {len(updated_task_list)}")
- print(f" 处理的灵感点数: {total_inspiration_points}")
- print(f" 处理的灵感点特征数: {total_inspiration_features}")
- print(f" 处理的关键点数: {total_key_points}")
- print(f" 处理的关键点特征数: {total_key_features}")
- print(f" 处理的目的点数: {total_purpose_points}")
- print(f" 处理的目的点特征数: {total_purpose_features}")
- if __name__ == "__main__":
- asyncio.run(main())
|