match_inspiration_features.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 灵感点特征匹配脚本
  5. 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配,
  6. 使用 relation_analyzer 模块分析特征之间的语义关系。
  7. """
  8. import json
  9. import asyncio
  10. from pathlib import Path
  11. from typing import Dict, List
  12. import sys
  13. # 添加项目根目录到路径
  14. project_root = Path(__file__).parent.parent.parent
  15. sys.path.insert(0, str(project_root))
  16. from lib.relation_analyzer import analyze_relation
  17. # 全局并发限制
  18. MAX_CONCURRENT_REQUESTS = 20
  19. semaphore = None
  20. def get_semaphore():
  21. """获取全局信号量"""
  22. global semaphore
  23. if semaphore is None:
  24. semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
  25. return semaphore
  26. async def match_single_pair(
  27. feature_name: str,
  28. persona_name: str,
  29. model_name: str = None
  30. ) -> Dict:
  31. """
  32. 匹配单个特征对(带并发限制)
  33. Args:
  34. feature_name: 要匹配的特征名称
  35. persona_name: 人设特征名称
  36. model_name: 使用的模型名称
  37. Returns:
  38. 单个匹配结果
  39. """
  40. sem = get_semaphore()
  41. async with sem:
  42. print(f" 匹配: {feature_name} <-> {persona_name}")
  43. relation_result = await analyze_relation(
  44. phrase_a=feature_name,
  45. phrase_b=persona_name,
  46. model_name=model_name
  47. )
  48. return {
  49. "人设特征名称": persona_name,
  50. "匹配结果": relation_result
  51. }
  52. async def match_feature_with_persona(
  53. feature_name: str,
  54. persona_features: List[Dict],
  55. model_name: str = None
  56. ) -> List[Dict]:
  57. """
  58. 将一个特征与人设特征列表进行匹配(并发执行)
  59. Args:
  60. feature_name: 要匹配的特征名称
  61. persona_features: 人设特征列表
  62. model_name: 使用的模型名称
  63. Returns:
  64. 匹配结果列表
  65. """
  66. # 创建所有匹配任务
  67. tasks = [
  68. match_single_pair(feature_name, persona_feature["特征名称"], model_name)
  69. for persona_feature in persona_features
  70. ]
  71. # 并发执行所有匹配
  72. match_results = await asyncio.gather(*tasks)
  73. return list(match_results)
  74. async def match_single_feature(
  75. feature_name: str,
  76. persona_features: List[Dict],
  77. model_name: str = None
  78. ) -> Dict:
  79. """
  80. 匹配单个特征与所有人设特征
  81. Args:
  82. feature_name: 特征名称
  83. persona_features: 人设特征列表
  84. model_name: 使用的模型名称
  85. Returns:
  86. 特征匹配结果
  87. """
  88. print(f" 特征: {feature_name}")
  89. match_results = await match_feature_with_persona(
  90. feature_name=feature_name,
  91. persona_features=persona_features,
  92. model_name=model_name
  93. )
  94. return {
  95. "特征名称": feature_name,
  96. "匹配结果": match_results
  97. }
  98. async def process_single_inspiration_point(
  99. inspiration_point: Dict,
  100. persona_features: List[Dict],
  101. model_name: str = None
  102. ) -> Dict:
  103. """
  104. 处理单个灵感点的特征匹配(并发执行)
  105. Args:
  106. inspiration_point: 灵感点数据
  107. persona_features: 人设灵感特征列表
  108. model_name: 使用的模型名称
  109. Returns:
  110. 包含 how 步骤列表的灵感点数据
  111. """
  112. point_name = inspiration_point.get("名称", "")
  113. feature_list = inspiration_point.get("特征列表", [])
  114. print(f" 处理灵感点: {point_name}")
  115. print(f" 特征数量: {len(feature_list)}")
  116. # 并发匹配所有特征
  117. tasks = [
  118. match_single_feature(feature_name, persona_features, model_name)
  119. for feature_name in feature_list
  120. ]
  121. feature_match_results = await asyncio.gather(*tasks)
  122. # 构建 how 步骤
  123. how_step = {
  124. "步骤名称": "灵感特征分别匹配人设特征",
  125. "特征列表": list(feature_match_results)
  126. }
  127. # 返回更新后的灵感点
  128. result = inspiration_point.copy()
  129. result["how步骤列表"] = [how_step]
  130. return result
  131. async def process_single_task(
  132. task: Dict,
  133. task_index: int,
  134. total_tasks: int,
  135. persona_inspiration_features: List[Dict],
  136. model_name: str = None
  137. ) -> Dict:
  138. """
  139. 处理单个任务
  140. Args:
  141. task: 任务数据
  142. task_index: 任务索引(从1开始)
  143. total_tasks: 总任务数
  144. persona_inspiration_features: 人设灵感特征列表
  145. model_name: 使用的模型名称
  146. Returns:
  147. 包含 how 解构结果的任务
  148. """
  149. post_id = task.get("帖子id", "")
  150. print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}")
  151. # 获取灵感点列表
  152. what_result = task.get("what解构结果", {})
  153. inspiration_list = what_result.get("灵感点列表", [])
  154. print(f" 灵感点数量: {len(inspiration_list)}")
  155. # 并发处理所有灵感点
  156. tasks = [
  157. process_single_inspiration_point(
  158. inspiration_point=inspiration_point,
  159. persona_features=persona_inspiration_features,
  160. model_name=model_name
  161. )
  162. for inspiration_point in inspiration_list
  163. ]
  164. updated_inspiration_list = await asyncio.gather(*tasks)
  165. # 构建 how 解构结果
  166. how_result = {
  167. "灵感点列表": list(updated_inspiration_list)
  168. }
  169. # 更新任务
  170. updated_task = task.copy()
  171. updated_task["how解构结果"] = how_result
  172. return updated_task
  173. async def process_task_list(
  174. task_list: List[Dict],
  175. persona_features_dict: Dict,
  176. model_name: str = None
  177. ) -> List[Dict]:
  178. """
  179. 处理整个解构任务列表(并发执行)
  180. Args:
  181. task_list: 解构任务列表
  182. persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点)
  183. model_name: 使用的模型名称
  184. Returns:
  185. 包含 how 解构结果的任务列表
  186. """
  187. persona_inspiration_features = persona_features_dict.get("灵感点", [])
  188. print(f"人设灵感特征数量: {len(persona_inspiration_features)}")
  189. # 并发处理所有任务
  190. tasks = [
  191. process_single_task(
  192. task=task,
  193. task_index=i,
  194. total_tasks=len(task_list),
  195. persona_inspiration_features=persona_inspiration_features,
  196. model_name=model_name
  197. )
  198. for i, task in enumerate(task_list, 1)
  199. ]
  200. updated_task_list = await asyncio.gather(*tasks)
  201. return list(updated_task_list)
  202. async def main():
  203. """主函数"""
  204. # 输入输出路径
  205. script_dir = Path(__file__).parent
  206. project_root = script_dir.parent.parent
  207. data_dir = project_root / "data" / "data_1117"
  208. task_list_file = data_dir / "当前帖子_解构任务列表.json"
  209. persona_features_file = data_dir / "特征名称_帖子来源.json"
  210. output_dir = data_dir / "当前帖子_how解构结果"
  211. # 创建输出目录
  212. output_dir.mkdir(parents=True, exist_ok=True)
  213. print(f"读取解构任务列表: {task_list_file}")
  214. with open(task_list_file, "r", encoding="utf-8") as f:
  215. task_list_data = json.load(f)
  216. print(f"读取人设特征: {persona_features_file}")
  217. with open(persona_features_file, "r", encoding="utf-8") as f:
  218. persona_features_data = json.load(f)
  219. # 获取任务列表
  220. task_list = task_list_data.get("解构任务列表", [])
  221. print(f"\n总任务数: {len(task_list)}")
  222. # 处理任务列表
  223. updated_task_list = await process_task_list(
  224. task_list=task_list,
  225. persona_features_dict=persona_features_data,
  226. model_name=None # 使用默认模型
  227. )
  228. # 分文件保存结果
  229. print(f"\n保存结果到: {output_dir}")
  230. for task in updated_task_list:
  231. post_id = task.get("帖子id", "unknown")
  232. output_file = output_dir / f"{post_id}_how.json"
  233. print(f" 保存: {output_file.name}")
  234. with open(output_file, "w", encoding="utf-8") as f:
  235. json.dump(task, f, ensure_ascii=False, indent=4)
  236. print("\n完成!")
  237. # 打印统计信息
  238. total_inspiration_points = sum(
  239. len(task["how解构结果"]["灵感点列表"])
  240. for task in updated_task_list
  241. )
  242. total_features = sum(
  243. len(point["特征列表"])
  244. for task in updated_task_list
  245. for point in task["how解构结果"]["灵感点列表"]
  246. )
  247. print(f"\n统计:")
  248. print(f" 处理的帖子数: {len(updated_task_list)}")
  249. print(f" 处理的灵感点数: {total_inspiration_points}")
  250. print(f" 处理的特征数: {total_features}")
  251. if __name__ == "__main__":
  252. asyncio.run(main())