match_inspiration_features.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 灵感点特征匹配脚本
  5. 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配,
  6. 使用 relation_analyzer 模块分析特征之间的语义关系。
  7. """
  8. import json
  9. import asyncio
  10. from pathlib import Path
  11. from typing import Dict, List
  12. import sys
  13. # 添加项目根目录到路径
  14. project_root = Path(__file__).parent.parent.parent
  15. sys.path.insert(0, str(project_root))
  16. from lib.semantic_similarity import compare_phrases
  17. # 全局并发限制
  18. MAX_CONCURRENT_REQUESTS = 20
  19. semaphore = None
  20. def get_semaphore():
  21. """获取全局信号量"""
  22. global semaphore
  23. if semaphore is None:
  24. semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
  25. return semaphore
  26. async def match_single_pair(
  27. feature_name: str,
  28. persona_name: str,
  29. category_mapping: Dict = None,
  30. model_name: str = None
  31. ) -> Dict:
  32. """
  33. 匹配单个特征对(带并发限制)
  34. Args:
  35. feature_name: 要匹配的特征名称
  36. persona_name: 人设特征名称
  37. category_mapping: 特征分类映射字典
  38. model_name: 使用的模型名称
  39. Returns:
  40. 单个匹配结果,格式:
  41. {
  42. "人设特征名称": "xxx",
  43. "特征类型": "标签",
  44. "特征分类": ["分类1", "分类2"],
  45. "匹配结果": {
  46. "相似度": 0.75,
  47. "说明": "..."
  48. }
  49. }
  50. """
  51. sem = get_semaphore()
  52. async with sem:
  53. print(f" 匹配: {feature_name} <-> {persona_name}")
  54. similarity_result = await compare_phrases(
  55. phrase_a=feature_name,
  56. phrase_b=persona_name,
  57. )
  58. # 判断该特征是标签还是分类
  59. feature_type = "分类" # 默认为分类
  60. categories = []
  61. if category_mapping:
  62. # 先在标签特征中查找(灵感点、关键点、目的点)
  63. is_tag_feature = False
  64. for ft in ["灵感点", "关键点", "目的点"]:
  65. if ft in category_mapping:
  66. type_mapping = category_mapping[ft]
  67. if persona_name in type_mapping:
  68. # 找到了,说明是标签特征
  69. feature_type = "标签"
  70. categories = type_mapping[persona_name].get("所属分类", [])
  71. is_tag_feature = True
  72. break
  73. # 如果不是标签特征,检查是否是分类特征
  74. if not is_tag_feature:
  75. # 收集所有分类
  76. all_categories = set()
  77. for ft in ["灵感点", "关键点", "目的点"]:
  78. if ft in category_mapping:
  79. for fname, fdata in category_mapping[ft].items():
  80. cats = fdata.get("所属分类", [])
  81. all_categories.update(cats)
  82. # 如果当前特征名在分类列表中,则是分类特征
  83. if persona_name in all_categories:
  84. feature_type = "分类"
  85. categories = [] # 分类特征本身没有所属分类
  86. # 去重分类
  87. unique_categories = list(dict.fromkeys(categories))
  88. return {
  89. "人设特征名称": persona_name,
  90. "特征类型": feature_type,
  91. "特征分类": unique_categories,
  92. "匹配结果": similarity_result
  93. }
  94. async def match_feature_with_persona(
  95. feature_name: str,
  96. persona_features: List[Dict],
  97. category_mapping: Dict = None,
  98. model_name: str = None
  99. ) -> List[Dict]:
  100. """
  101. 将一个特征与人设特征列表进行匹配(并发执行)
  102. Args:
  103. feature_name: 要匹配的特征名称
  104. persona_features: 人设特征列表
  105. category_mapping: 特征分类映射字典
  106. model_name: 使用的模型名称
  107. Returns:
  108. 匹配结果列表
  109. """
  110. # 创建所有匹配任务
  111. tasks = [
  112. match_single_pair(feature_name, persona_feature["特征名称"], category_mapping, model_name)
  113. for persona_feature in persona_features
  114. ]
  115. # 并发执行所有匹配
  116. match_results = await asyncio.gather(*tasks)
  117. return list(match_results)
  118. async def match_single_feature(
  119. feature_item: Dict,
  120. persona_features: List[Dict],
  121. category_mapping: Dict = None,
  122. model_name: str = None
  123. ) -> Dict:
  124. """
  125. 匹配单个特征与所有人设特征
  126. Args:
  127. feature_item: 特征信息(包含"特征名称"和"权重")
  128. persona_features: 人设特征列表
  129. category_mapping: 特征分类映射字典
  130. model_name: 使用的模型名称
  131. Returns:
  132. 特征匹配结果
  133. """
  134. feature_name = feature_item.get("特征名称", "")
  135. feature_weight = feature_item.get("权重", 1.0)
  136. print(f" 特征: {feature_name} (权重: {feature_weight})")
  137. match_results = await match_feature_with_persona(
  138. feature_name=feature_name,
  139. persona_features=persona_features,
  140. category_mapping=category_mapping,
  141. model_name=model_name
  142. )
  143. return {
  144. "特征名称": feature_name,
  145. "权重": feature_weight,
  146. "匹配结果": match_results
  147. }
  148. async def process_single_inspiration_point(
  149. inspiration_point: Dict,
  150. persona_features: List[Dict],
  151. category_mapping: Dict = None,
  152. model_name: str = None
  153. ) -> Dict:
  154. """
  155. 处理单个灵感点的特征匹配(并发执行)
  156. Args:
  157. inspiration_point: 灵感点数据
  158. persona_features: 人设灵感特征列表
  159. category_mapping: 特征分类映射字典
  160. model_name: 使用的模型名称
  161. Returns:
  162. 包含 how 步骤列表的灵感点数据
  163. """
  164. point_name = inspiration_point.get("名称", "")
  165. feature_list = inspiration_point.get("特征列表", [])
  166. print(f" 处理灵感点: {point_name}")
  167. print(f" 特征数量: {len(feature_list)}")
  168. # 并发匹配所有特征
  169. tasks = [
  170. match_single_feature(feature_item, persona_features, category_mapping, model_name)
  171. for feature_item in feature_list
  172. ]
  173. feature_match_results = await asyncio.gather(*tasks)
  174. # 构建 how 步骤
  175. how_step = {
  176. "步骤名称": "灵感特征分别匹配人设特征",
  177. "特征列表": list(feature_match_results)
  178. }
  179. # 返回更新后的灵感点
  180. result = inspiration_point.copy()
  181. result["how步骤列表"] = [how_step]
  182. return result
  183. async def process_single_task(
  184. task: Dict,
  185. task_index: int,
  186. total_tasks: int,
  187. persona_inspiration_features: List[Dict],
  188. category_mapping: Dict = None,
  189. model_name: str = None
  190. ) -> Dict:
  191. """
  192. 处理单个任务
  193. Args:
  194. task: 任务数据
  195. task_index: 任务索引(从1开始)
  196. total_tasks: 总任务数
  197. persona_inspiration_features: 人设灵感特征列表
  198. category_mapping: 特征分类映射字典
  199. model_name: 使用的模型名称
  200. Returns:
  201. 包含 how 解构结果的任务
  202. """
  203. post_id = task.get("帖子id", "")
  204. print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}")
  205. # 获取灵感点列表
  206. what_result = task.get("what解构结果", {})
  207. inspiration_list = what_result.get("灵感点列表", [])
  208. print(f" 灵感点数量: {len(inspiration_list)}")
  209. # 并发处理所有灵感点
  210. tasks = [
  211. process_single_inspiration_point(
  212. inspiration_point=inspiration_point,
  213. persona_features=persona_inspiration_features,
  214. category_mapping=category_mapping,
  215. model_name=model_name
  216. )
  217. for inspiration_point in inspiration_list
  218. ]
  219. updated_inspiration_list = await asyncio.gather(*tasks)
  220. # 构建 how 解构结果
  221. how_result = {
  222. "灵感点列表": list(updated_inspiration_list)
  223. }
  224. # 更新任务
  225. updated_task = task.copy()
  226. updated_task["how解构结果"] = how_result
  227. return updated_task
  228. async def process_task_list(
  229. task_list: List[Dict],
  230. persona_features_dict: Dict,
  231. category_mapping: Dict = None,
  232. model_name: str = None
  233. ) -> List[Dict]:
  234. """
  235. 处理整个解构任务列表(并发执行)
  236. Args:
  237. task_list: 解构任务列表
  238. persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点)
  239. category_mapping: 特征分类映射字典
  240. model_name: 使用的模型名称
  241. Returns:
  242. 包含 how 解构结果的任务列表
  243. """
  244. # 获取标签特征列表
  245. persona_inspiration_features = persona_features_dict.get("灵感点", [])
  246. print(f"人设标签特征数量: {len(persona_inspiration_features)}")
  247. # 从分类映射中提取所有唯一的分类作为分类特征(仅从灵感点中提取)
  248. category_features = []
  249. if category_mapping:
  250. all_categories = set()
  251. # 只从灵感点中提取分类
  252. if "灵感点" in category_mapping:
  253. for _, feature_data in category_mapping["灵感点"].items():
  254. categories = feature_data.get("所属分类", [])
  255. all_categories.update(categories)
  256. # 转换为特征格式
  257. category_features = [{"特征名称": cat} for cat in sorted(all_categories)]
  258. print(f"人设分类特征数量: {len(category_features)}")
  259. # 合并标签特征和分类特征
  260. all_features = persona_inspiration_features + category_features
  261. print(f"总特征数量(标签+分类): {len(all_features)}")
  262. # 并发处理所有任务
  263. tasks = [
  264. process_single_task(
  265. task=task,
  266. task_index=i,
  267. total_tasks=len(task_list),
  268. persona_inspiration_features=all_features,
  269. category_mapping=category_mapping,
  270. model_name=model_name
  271. )
  272. for i, task in enumerate(task_list, 1)
  273. ]
  274. updated_task_list = await asyncio.gather(*tasks)
  275. return list(updated_task_list)
  276. async def main():
  277. """主函数"""
  278. # 输入输出路径
  279. script_dir = Path(__file__).parent
  280. project_root = script_dir.parent.parent
  281. data_dir = project_root / "data" / "data_1118"
  282. task_list_file = data_dir / "当前帖子_解构任务列表.json"
  283. persona_features_file = data_dir / "特征名称_帖子来源.json"
  284. category_mapping_file = data_dir / "特征名称_分类映射.json"
  285. output_dir = data_dir / "当前帖子_how解构结果"
  286. # 创建输出目录
  287. output_dir.mkdir(parents=True, exist_ok=True)
  288. print(f"读取解构任务列表: {task_list_file}")
  289. with open(task_list_file, "r", encoding="utf-8") as f:
  290. task_list_data = json.load(f)
  291. print(f"读取人设特征: {persona_features_file}")
  292. with open(persona_features_file, "r", encoding="utf-8") as f:
  293. persona_features_data = json.load(f)
  294. print(f"读取特征分类映射: {category_mapping_file}")
  295. with open(category_mapping_file, "r", encoding="utf-8") as f:
  296. category_mapping = json.load(f)
  297. # 获取任务列表
  298. task_list = task_list_data.get("解构任务列表", [])
  299. print(f"\n总任务数: {len(task_list)}")
  300. # 处理任务列表
  301. updated_task_list = await process_task_list(
  302. task_list=task_list,
  303. persona_features_dict=persona_features_data,
  304. category_mapping=category_mapping,
  305. model_name=None # 使用默认模型
  306. )
  307. # 分文件保存结果
  308. print(f"\n保存结果到: {output_dir}")
  309. for task in updated_task_list:
  310. post_id = task.get("帖子id", "unknown")
  311. output_file = output_dir / f"{post_id}_how.json"
  312. print(f" 保存: {output_file.name}")
  313. with open(output_file, "w", encoding="utf-8") as f:
  314. json.dump(task, f, ensure_ascii=False, indent=4)
  315. print("\n完成!")
  316. # 打印统计信息
  317. total_inspiration_points = sum(
  318. len(task["how解构结果"]["灵感点列表"])
  319. for task in updated_task_list
  320. )
  321. total_features = sum(
  322. len(point["特征列表"])
  323. for task in updated_task_list
  324. for point in task["how解构结果"]["灵感点列表"]
  325. )
  326. print(f"\n统计:")
  327. print(f" 处理的帖子数: {len(updated_task_list)}")
  328. print(f" 处理的灵感点数: {total_inspiration_points}")
  329. print(f" 处理的特征数: {total_features}")
  330. if __name__ == "__main__":
  331. asyncio.run(main())