match_inspiration_features_v3.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 灵感点特征匹配脚本 v3(特征组合匹配版本)
  5. 从解构任务列表中提取灵感点的特征列表,与人设历史的特征组合进行匹配。
  6. 匹配时考虑组合中每个特征的分类信息,使用待设计的组合匹配模块。
  7. """
  8. import json
  9. import asyncio
  10. from pathlib import Path
  11. from typing import Dict, List, Optional
  12. import sys
  13. # 添加项目根目录到路径
  14. project_root = Path(__file__).parent.parent.parent
  15. sys.path.insert(0, str(project_root))
  16. from agents import trace
  17. from agents.tracing.create import custom_span
  18. from lib.my_trace import set_trace
  19. from lib.hierarchical_match_analyzer import hierarchical_match
  20. # 全局并发限制
  21. MAX_CONCURRENT_REQUESTS = 20
  22. semaphore = None
  23. def get_semaphore():
  24. """获取全局信号量"""
  25. global semaphore
  26. if semaphore is None:
  27. semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
  28. return semaphore
  29. def load_feature_categories(categories_file: Path) -> Dict:
  30. """
  31. 加载特征分类映射
  32. Args:
  33. categories_file: 特征名称_分类映射.json 文件路径
  34. Returns:
  35. 特征分类字典
  36. """
  37. with open(categories_file, "r", encoding="utf-8") as f:
  38. return json.load(f)
  39. def enrich_persona_combinations_with_categories(
  40. persona_combinations: List[Dict],
  41. feature_categories: Dict,
  42. point_type: str
  43. ) -> List[Dict]:
  44. """
  45. 为人设特征组合添加分类信息
  46. Args:
  47. persona_combinations: 人设特征组合列表
  48. feature_categories: 特征分类映射字典
  49. point_type: 点类型 ("灵感点", "目的点", "关键点")
  50. Returns:
  51. enriched_combinations: 增强后的组合列表,每个组合包含特征及其分类
  52. """
  53. enriched_combinations = []
  54. # 获取该点类型的分类映射
  55. type_categories = feature_categories.get(point_type, {})
  56. for combo in persona_combinations:
  57. feature_list = combo.get("特征组合", [])
  58. # 为每个特征添加分类信息
  59. enriched_features = []
  60. for feature_name in feature_list:
  61. categories = type_categories.get(feature_name, {}).get("所属分类", [])
  62. enriched_features.append({
  63. "特征名称": feature_name,
  64. "所属分类": categories
  65. })
  66. # 构建增强后的组合
  67. enriched_combo = {
  68. "特征组合": enriched_features, # 带分类的特征列表
  69. "原始特征组合": feature_list, # 保留原始特征名称列表
  70. "特征来源": combo.get("特征来源", [])
  71. }
  72. enriched_combinations.append(enriched_combo)
  73. return enriched_combinations
  74. async def match_feature_list_with_combination(
  75. current_feature_list: List[str],
  76. persona_combination: Dict,
  77. model_name: Optional[str] = None
  78. ) -> Dict:
  79. """
  80. 将当前点的特征列表与一个人设历史组合进行分层匹配
  81. 使用分层匹配策略:
  82. 1. 优先匹配灵感点标签(特征名称)
  83. 2. 无标签匹配时,匹配第一层分类
  84. 3. 仍无结果时,匹配第二层上位分类
  85. 4. 对每个候选进行推理难度打分
  86. Args:
  87. current_feature_list: 当前点的特征列表,如 ["立冬", "教资查分", "时间巧合"]
  88. persona_combination: 人设历史组合(带分类信息),格式如:
  89. {
  90. "特征组合": [
  91. {"特征名称": "猫孩子", "所属分类": ["宠物亲子化", "宠物情感", "实质"]},
  92. {"特征名称": "被拿捏住的无奈感", "所属分类": ["宠物关系主导", "宠物情感", "实质"]}
  93. ],
  94. "原始特征组合": ["猫孩子", "被拿捏住的无奈感"],
  95. "特征来源": [...]
  96. }
  97. model_name: 使用的模型名称
  98. Returns:
  99. {
  100. "人设特征组合": [...],
  101. "匹配结果": {
  102. "最终得分": 0.85,
  103. "匹配层级": "第一层分类匹配",
  104. "匹配结果": "宠物情感",
  105. "综合说明": "...",
  106. "分层详情": {...}
  107. },
  108. "人设特征来源": [...]
  109. }
  110. """
  111. sem = get_semaphore()
  112. async with sem:
  113. # 调用分层匹配模块
  114. match_result = await hierarchical_match(
  115. current_features=current_feature_list,
  116. persona_combination=persona_combination["特征组合"],
  117. model_name=model_name
  118. )
  119. # 构建返回结果
  120. result = {
  121. "人设特征组合": persona_combination["原始特征组合"],
  122. "匹配结果": {
  123. "最终得分": match_result["最终得分"],
  124. "匹配层级": match_result["匹配层级"],
  125. "匹配结果": match_result["匹配结果"],
  126. "综合说明": match_result["综合说明"],
  127. "分层详情": match_result["分层结果"]
  128. },
  129. "人设特征来源": persona_combination["特征来源"]
  130. }
  131. return result
  132. async def match_inspiration_point_with_combinations(
  133. current_feature_list: List[str],
  134. persona_combinations: List[Dict],
  135. model_name: Optional[str] = None
  136. ) -> List[Dict]:
  137. """
  138. 将当前点的特征列表与所有人设特征组合进行匹配
  139. Args:
  140. current_feature_list: 当前点的特征列表
  141. persona_combinations: 人设特征组合列表(已包含分类信息)
  142. model_name: 使用的模型名称
  143. Returns:
  144. 匹配结果列表(按分数降序排序)
  145. """
  146. print(f" 批量匹配: {current_feature_list} <-> {len(persona_combinations)}个人设特征组合")
  147. # 并发匹配所有组合
  148. tasks = [
  149. match_feature_list_with_combination(
  150. current_feature_list=current_feature_list,
  151. persona_combination=combo,
  152. model_name=model_name
  153. )
  154. for combo in persona_combinations
  155. ]
  156. match_results = await asyncio.gather(*tasks)
  157. # 按最终得分降序排序
  158. match_results.sort(key=lambda x: x["匹配结果"]["最终得分"], reverse=True)
  159. return match_results
  160. async def process_single_inspiration_point(
  161. inspiration_point: Dict,
  162. persona_combinations: List[Dict],
  163. model_name: Optional[str] = None
  164. ) -> Dict:
  165. """
  166. 处理单个灵感点的特征组合匹配
  167. Args:
  168. inspiration_point: 灵感点数据,包含特征列表
  169. persona_combinations: 人设特征组合列表(已包含分类信息)
  170. model_name: 使用的模型名称
  171. Returns:
  172. 包含 how 步骤列表的灵感点数据
  173. """
  174. point_name = inspiration_point.get("名称", "")
  175. feature_list = inspiration_point.get("特征列表", [])
  176. print(f" 处理灵感点: {point_name}")
  177. print(f" 特征列表: {feature_list}")
  178. # 使用 custom_span 标识灵感点处理
  179. with custom_span(
  180. name=f"处理灵感点: {point_name}",
  181. data={
  182. "灵感点": point_name,
  183. "特征列表": feature_list,
  184. "人设组合数量": len(persona_combinations)
  185. }
  186. ):
  187. # 将特征列表与所有人设组合进行匹配
  188. match_results = await match_inspiration_point_with_combinations(
  189. current_feature_list=feature_list,
  190. persona_combinations=persona_combinations,
  191. model_name=model_name
  192. )
  193. # 构建 how 步骤
  194. how_step = {
  195. "步骤名称": "灵感特征列表批量匹配人设特征组合",
  196. "当前特征列表": feature_list,
  197. "匹配结果": match_results
  198. }
  199. # 返回更新后的灵感点
  200. result = inspiration_point.copy()
  201. result["how步骤列表"] = [how_step]
  202. return result
  203. async def process_single_task(
  204. task: Dict,
  205. task_index: int,
  206. total_tasks: int,
  207. persona_combinations: List[Dict],
  208. model_name: Optional[str] = None
  209. ) -> Dict:
  210. """
  211. 处理单个任务
  212. Args:
  213. task: 任务数据
  214. task_index: 任务索引(从1开始)
  215. total_tasks: 总任务数
  216. persona_combinations: 人设特征组合列表(已包含分类信息)
  217. model_name: 使用的模型名称
  218. Returns:
  219. 包含 how 解构结果的任务
  220. """
  221. post_id = task.get("帖子id", "")
  222. print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}")
  223. # 获取灵感点列表
  224. what_result = task.get("what解构结果", {})
  225. inspiration_list = what_result.get("灵感点列表", [])
  226. print(f" 灵感点数量: {len(inspiration_list)}")
  227. # 并发处理所有灵感点
  228. tasks = [
  229. process_single_inspiration_point(
  230. inspiration_point=inspiration_point,
  231. persona_combinations=persona_combinations,
  232. model_name=model_name
  233. )
  234. for inspiration_point in inspiration_list
  235. ]
  236. updated_inspiration_list = await asyncio.gather(*tasks)
  237. # 构建 how 解构结果
  238. how_result = {
  239. "灵感点列表": list(updated_inspiration_list)
  240. }
  241. # 更新任务
  242. updated_task = task.copy()
  243. updated_task["how解构结果"] = how_result
  244. return updated_task
  245. async def process_task_list(
  246. task_list: List[Dict],
  247. persona_combinations: List[Dict],
  248. model_name: Optional[str] = None,
  249. current_time: Optional[str] = None,
  250. log_url: Optional[str] = None
  251. ) -> List[Dict]:
  252. """
  253. 处理整个解构任务列表(并发执行)
  254. Args:
  255. task_list: 解构任务列表
  256. persona_combinations: 人设特征组合列表(已包含分类信息)
  257. model_name: 使用的模型名称
  258. current_time: 当前时间戳
  259. log_url: 日志链接
  260. Returns:
  261. 包含 how 解构结果的任务列表
  262. """
  263. print(f"人设灵感特征组合数量: {len(persona_combinations)}")
  264. # 使用 custom_span 标识整个处理流程
  265. with custom_span(
  266. name="特征组合批量匹配 v3 - 所有任务",
  267. data={
  268. "任务总数": len(task_list),
  269. "人设组合数量": len(persona_combinations),
  270. "current_time": current_time,
  271. "log_url": log_url
  272. }
  273. ):
  274. # 并发处理所有任务
  275. tasks = [
  276. process_single_task(
  277. task=task,
  278. task_index=i,
  279. total_tasks=len(task_list),
  280. persona_combinations=persona_combinations,
  281. model_name=model_name
  282. )
  283. for i, task in enumerate(task_list, 1)
  284. ]
  285. updated_task_list = await asyncio.gather(*tasks)
  286. return list(updated_task_list)
  287. async def main(current_time: Optional[str] = None, log_url: Optional[str] = None):
  288. """主函数
  289. Args:
  290. current_time: 当前时间戳(从外部传入)
  291. log_url: 日志链接(从外部传入)
  292. """
  293. # 输入输出路径
  294. script_dir = Path(__file__).parent
  295. project_root = script_dir.parent.parent
  296. data_dir = project_root / "data" / "data_1118"
  297. task_list_file = data_dir / "当前帖子_解构任务列表.json"
  298. persona_combinations_file = data_dir / "特征组合_帖子来源.json"
  299. feature_categories_file = data_dir / "特征名称_分类映射.json"
  300. output_dir = data_dir / "当前帖子_how解构结果_v3"
  301. # 创建输出目录
  302. output_dir.mkdir(parents=True, exist_ok=True)
  303. # 获取模型名称
  304. from lib.client import MODEL_NAME
  305. model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_")
  306. print(f"读取解构任务列表: {task_list_file}")
  307. with open(task_list_file, "r", encoding="utf-8") as f:
  308. task_list_data = json.load(f)
  309. print(f"读取人设特征组合: {persona_combinations_file}")
  310. with open(persona_combinations_file, "r", encoding="utf-8") as f:
  311. persona_combinations_data = json.load(f)
  312. print(f"读取特征分类映射: {feature_categories_file}")
  313. feature_categories = load_feature_categories(feature_categories_file)
  314. # 获取任务列表
  315. task_list = task_list_data.get("解构任务列表", [])
  316. print(f"\n总任务数: {len(task_list)}")
  317. print(f"使用模型: {MODEL_NAME}\n")
  318. # 为人设特征组合添加分类信息(只处理灵感点)
  319. persona_inspiration_combinations_raw = persona_combinations_data.get("灵感点", [])
  320. persona_inspiration_combinations = enrich_persona_combinations_with_categories(
  321. persona_combinations=persona_inspiration_combinations_raw,
  322. feature_categories=feature_categories,
  323. point_type="灵感点"
  324. )
  325. print(f"灵感点特征组合数量: {len(persona_inspiration_combinations)}")
  326. print(f"示例组合 (前3个):")
  327. for i, combo in enumerate(persona_inspiration_combinations[:3], 1):
  328. print(f" {i}. 原始组合: {combo['原始特征组合']}")
  329. print(f" 带分类: {combo['特征组合']}")
  330. print()
  331. # 处理任务列表
  332. updated_task_list = await process_task_list(
  333. task_list=task_list,
  334. persona_combinations=persona_inspiration_combinations,
  335. model_name=None, # 使用默认模型
  336. current_time=current_time,
  337. log_url=log_url
  338. )
  339. # 分文件保存结果
  340. print(f"\n保存结果到: {output_dir}")
  341. for task in updated_task_list:
  342. post_id = task.get("帖子id", "unknown")
  343. output_file = output_dir / f"{post_id}_how_v3_{model_name_short}.json"
  344. # 在每个任务中添加元数据
  345. task["元数据"] = {
  346. "current_time": current_time,
  347. "log_url": log_url,
  348. "version": "v3_combination_match",
  349. "model": MODEL_NAME,
  350. "说明": "v3版本: 使用特征列表匹配人设特征组合(带分类信息)"
  351. }
  352. print(f" 保存: {output_file.name}")
  353. with open(output_file, "w", encoding="utf-8") as f:
  354. json.dump(task, f, ensure_ascii=False, indent=4)
  355. print("\n完成!")
  356. # 打印统计信息
  357. total_inspiration_points = sum(
  358. len(task["how解构结果"]["灵感点列表"])
  359. for task in updated_task_list
  360. )
  361. total_matches = sum(
  362. len(point["how步骤列表"][0]["匹配结果"])
  363. for task in updated_task_list
  364. for point in task["how解构结果"]["灵感点列表"]
  365. )
  366. print(f"\n统计:")
  367. print(f" 处理的帖子数: {len(updated_task_list)}")
  368. print(f" 处理的灵感点数: {total_inspiration_points}")
  369. print(f" 生成的匹配结果数: {total_matches}")
  370. if log_url:
  371. print(f"\nTrace: {log_url}\n")
  372. if __name__ == "__main__":
  373. # 设置 trace
  374. current_time, log_url = set_trace()
  375. # 使用 trace 上下文包裹整个执行流程
  376. with trace("灵感特征组合批量匹配 v3"):
  377. asyncio.run(main(current_time, log_url))