match_inspiration_features_v2.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 灵感点特征匹配脚本 v2(批量匹配版本)
  5. 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配,
  6. 使用 batch_match_analyzer 模块进行批量匹配分析,确保同一特征对所有人设特征的评分可比。
  7. """
  8. import json
  9. import asyncio
  10. from pathlib import Path
  11. from typing import Dict, List
  12. import sys
  13. # 添加项目根目录到路径
  14. project_root = Path(__file__).parent.parent.parent
  15. sys.path.insert(0, str(project_root))
  16. from agents import trace
  17. from agents.tracing.create import custom_span
  18. from lib.my_trace import set_trace
  19. from lib.batch_match_analyzer import analyze_batch_match
  20. # 全局并发限制
  21. MAX_CONCURRENT_REQUESTS = 20
  22. semaphore = None
  23. def get_semaphore():
  24. """获取全局信号量"""
  25. global semaphore
  26. if semaphore is None:
  27. semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
  28. return semaphore
  29. async def match_feature_with_persona_batch(
  30. feature_name: str,
  31. persona_features: List[Dict],
  32. model_name: str = None
  33. ) -> List[Dict]:
  34. """
  35. 将一个特征与人设特征列表进行批量匹配(一次调用)
  36. Args:
  37. feature_name: 要匹配的特征名称
  38. persona_features: 人设特征列表
  39. model_name: 使用的模型名称
  40. Returns:
  41. 匹配结果列表(按分数降序排序)
  42. """
  43. sem = get_semaphore()
  44. async with sem:
  45. print(f" 批量匹配: {feature_name} <-> {len(persona_features)}个人设特征")
  46. # 提取人设特征名称列表
  47. persona_names = [pf["特征名称"] for pf in persona_features]
  48. # 批量分析匹配度
  49. batch_results = await analyze_batch_match(
  50. phrase_a=feature_name,
  51. phrase_b_list=persona_names,
  52. model_name=model_name
  53. )
  54. # 转换为原有格式
  55. match_results = [
  56. {
  57. "人设特征名称": result["特征"],
  58. "匹配结果": {
  59. "分数": result["分数"],
  60. "说明": result["说明"]
  61. }
  62. }
  63. for result in batch_results
  64. ]
  65. # 按分数降序排序
  66. match_results.sort(key=lambda x: x["匹配结果"]["分数"], reverse=True)
  67. return match_results
  68. async def match_single_feature(
  69. feature_name: str,
  70. persona_features: List[Dict],
  71. model_name: str = None
  72. ) -> Dict:
  73. """
  74. 匹配单个特征与所有人设特征
  75. Args:
  76. feature_name: 特征名称
  77. persona_features: 人设特征列表
  78. model_name: 使用的模型名称
  79. Returns:
  80. 特征匹配结果
  81. """
  82. print(f" 特征: {feature_name}")
  83. match_results = await match_feature_with_persona_batch(
  84. feature_name=feature_name,
  85. persona_features=persona_features,
  86. model_name=model_name
  87. )
  88. return {
  89. "特征名称": feature_name,
  90. "匹配结果": match_results
  91. }
  92. async def process_single_inspiration_point(
  93. inspiration_point: Dict,
  94. persona_features: List[Dict],
  95. model_name: str = None
  96. ) -> Dict:
  97. """
  98. 处理单个灵感点的特征匹配(并发执行)
  99. Args:
  100. inspiration_point: 灵感点数据
  101. persona_features: 人设灵感特征列表
  102. model_name: 使用的模型名称
  103. Returns:
  104. 包含 how 步骤列表的灵感点数据
  105. """
  106. point_name = inspiration_point.get("名称", "")
  107. feature_list = inspiration_point.get("特征列表", [])
  108. print(f" 处理灵感点: {point_name}")
  109. print(f" 特征数量: {len(feature_list)}")
  110. # 使用 custom_span 标识灵感点处理
  111. with custom_span(
  112. name=f"处理灵感点: {point_name}",
  113. data={
  114. "灵感点": point_name,
  115. "特征数量": len(feature_list),
  116. "人设特征数量": len(persona_features)
  117. }
  118. ):
  119. # 并发匹配所有特征(每个特征批量匹配所有人设特征)
  120. tasks = [
  121. match_single_feature(feature_name, persona_features, model_name)
  122. for feature_name in feature_list
  123. ]
  124. feature_match_results = await asyncio.gather(*tasks)
  125. # 构建 how 步骤
  126. how_step = {
  127. "步骤名称": "灵感特征批量匹配人设特征",
  128. "特征列表": list(feature_match_results)
  129. }
  130. # 返回更新后的灵感点
  131. result = inspiration_point.copy()
  132. result["how步骤列表"] = [how_step]
  133. return result
  134. async def process_single_task(
  135. task: Dict,
  136. task_index: int,
  137. total_tasks: int,
  138. persona_inspiration_features: List[Dict],
  139. model_name: str = None
  140. ) -> Dict:
  141. """
  142. 处理单个任务
  143. Args:
  144. task: 任务数据
  145. task_index: 任务索引(从1开始)
  146. total_tasks: 总任务数
  147. persona_inspiration_features: 人设灵感特征列表
  148. model_name: 使用的模型名称
  149. Returns:
  150. 包含 how 解构结果的任务
  151. """
  152. post_id = task.get("帖子id", "")
  153. print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}")
  154. # 获取灵感点列表
  155. what_result = task.get("what解构结果", {})
  156. inspiration_list = what_result.get("灵感点列表", [])
  157. print(f" 灵感点数量: {len(inspiration_list)}")
  158. # 并发处理所有灵感点
  159. tasks = [
  160. process_single_inspiration_point(
  161. inspiration_point=inspiration_point,
  162. persona_features=persona_inspiration_features,
  163. model_name=model_name
  164. )
  165. for inspiration_point in inspiration_list
  166. ]
  167. updated_inspiration_list = await asyncio.gather(*tasks)
  168. # 构建 how 解构结果
  169. how_result = {
  170. "灵感点列表": list(updated_inspiration_list)
  171. }
  172. # 更新任务
  173. updated_task = task.copy()
  174. updated_task["how解构结果"] = how_result
  175. return updated_task
  176. async def process_task_list(
  177. task_list: List[Dict],
  178. persona_features_dict: Dict,
  179. model_name: str = None,
  180. current_time: str = None,
  181. log_url: str = None
  182. ) -> List[Dict]:
  183. """
  184. 处理整个解构任务列表(并发执行)
  185. Args:
  186. task_list: 解构任务列表
  187. persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点)
  188. model_name: 使用的模型名称
  189. current_time: 当前时间戳
  190. log_url: 日志链接
  191. Returns:
  192. 包含 how 解构结果的任务列表
  193. """
  194. persona_inspiration_features = persona_features_dict.get("灵感点", [])
  195. print(f"人设灵感特征数量: {len(persona_inspiration_features)}")
  196. # 使用 custom_span 标识整个处理流程
  197. with custom_span(
  198. name="批量匹配分析 v2 - 所有任务",
  199. data={
  200. "任务总数": len(task_list),
  201. "人设特征数量": len(persona_inspiration_features),
  202. "current_time": current_time,
  203. "log_url": log_url
  204. }
  205. ):
  206. # 并发处理所有任务
  207. tasks = [
  208. process_single_task(
  209. task=task,
  210. task_index=i,
  211. total_tasks=len(task_list),
  212. persona_inspiration_features=persona_inspiration_features,
  213. model_name=model_name
  214. )
  215. for i, task in enumerate(task_list, 1)
  216. ]
  217. updated_task_list = await asyncio.gather(*tasks)
  218. return list(updated_task_list)
  219. async def main(current_time: str = None, log_url: str = None):
  220. """主函数
  221. Args:
  222. current_time: 当前时间戳(从外部传入)
  223. log_url: 日志链接(从外部传入)
  224. """
  225. # 输入输出路径
  226. script_dir = Path(__file__).parent
  227. project_root = script_dir.parent.parent
  228. data_dir = project_root / "data" / "data_1118"
  229. task_list_file = data_dir / "当前帖子_解构任务列表.json"
  230. persona_features_file = data_dir / "特征名称_帖子来源.json"
  231. output_dir = data_dir / "当前帖子_how解构结果_v2"
  232. # 创建输出目录
  233. output_dir.mkdir(parents=True, exist_ok=True)
  234. # 获取模型名称
  235. from lib.client import MODEL_NAME
  236. model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_")
  237. print(f"读取解构任务列表: {task_list_file}")
  238. with open(task_list_file, "r", encoding="utf-8") as f:
  239. task_list_data = json.load(f)
  240. print(f"读取人设特征: {persona_features_file}")
  241. with open(persona_features_file, "r", encoding="utf-8") as f:
  242. persona_features_data = json.load(f)
  243. # 获取任务列表
  244. task_list = task_list_data.get("解构任务列表", [])
  245. print(f"\n总任务数: {len(task_list)}")
  246. print(f"使用模型: {MODEL_NAME}\n")
  247. # 处理任务列表
  248. updated_task_list = await process_task_list(
  249. task_list=task_list,
  250. persona_features_dict=persona_features_data,
  251. model_name=None, # 使用默认模型
  252. current_time=current_time,
  253. log_url=log_url
  254. )
  255. # 分文件保存结果
  256. print(f"\n保存结果到: {output_dir}")
  257. for task in updated_task_list:
  258. post_id = task.get("帖子id", "unknown")
  259. output_file = output_dir / f"{post_id}_how_v2_{model_name_short}.json"
  260. # 在每个任务中添加元数据
  261. task["元数据"] = {
  262. "current_time": current_time,
  263. "log_url": log_url,
  264. "version": "v2_batch",
  265. "model": MODEL_NAME
  266. }
  267. print(f" 保存: {output_file.name}")
  268. with open(output_file, "w", encoding="utf-8") as f:
  269. json.dump(task, f, ensure_ascii=False, indent=4)
  270. print("\n完成!")
  271. # 打印统计信息
  272. total_inspiration_points = sum(
  273. len(task["how解构结果"]["灵感点列表"])
  274. for task in updated_task_list
  275. )
  276. total_features = sum(
  277. len(point["特征列表"])
  278. for task in updated_task_list
  279. for point in task["how解构结果"]["灵感点列表"]
  280. )
  281. print(f"\n统计:")
  282. print(f" 处理的帖子数: {len(updated_task_list)}")
  283. print(f" 处理的灵感点数: {total_inspiration_points}")
  284. print(f" 处理的特征数: {total_features}")
  285. if log_url:
  286. print(f"\nTrace: {log_url}\n")
  287. if __name__ == "__main__":
  288. # 设置 trace
  289. current_time, log_url = set_trace()
  290. # 使用 trace 上下文包裹整个执行流程
  291. with trace("灵感特征批量匹配 v2"):
  292. asyncio.run(main(current_time, log_url))