analyze_first_step_v2.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 第一步分析脚本 v2
  5. 基于过滤后的 how 解构结果,分析哪些点最有可能是创作者的第一步(创作起点),
  6. 以及第一步的前序节点(第一步是怎么来的)。
  7. v2 新增功能:
  8. - 分析第一步的前序节点(已有人设 或 外部触发)
  9. - 提供搜索关键词(用于后续验证)
  10. - 严格约束:前序节点只能是已匹配的人设节点,搜索关键词只能来自节点名称
  11. 输入:intermediate/filtered_results/ 中的过滤结果
  12. 输出:第一步分析结果(带前序节点信息)
  13. """
  14. import asyncio
  15. import json
  16. from pathlib import Path
  17. from typing import Dict, List
  18. import sys
  19. # 添加项目根目录到路径
  20. project_root = Path(__file__).parent.parent.parent
  21. sys.path.insert(0, str(project_root))
  22. from agents import Agent, Runner, ModelSettings, trace
  23. from agents.tracing.create import custom_span
  24. from lib.client import get_model
  25. from lib.my_trace import set_trace_smith as set_trace
  26. from script.data_processing.path_config import PathConfig
  27. # 模型配置
  28. MODEL_NAME = "google/gemini-3-pro-preview"
  29. # MODEL_NAME = 'anthropic/claude-sonnet-4.5'
  30. agent = Agent(
  31. name="First Step Analyzer V2",
  32. model=get_model(MODEL_NAME),
  33. model_settings=ModelSettings(
  34. temperature=0.0,
  35. max_tokens=65536,
  36. ),
  37. tools=[],
  38. )
  39. def extract_points_from_filtered_result(how_result: Dict) -> List[Dict]:
  40. """
  41. 从过滤后的 how 解构结果中提取所有点的信息
  42. Args:
  43. how_result: how解构结果
  44. Returns:
  45. 点信息列表,不包含原始分类标签
  46. """
  47. points = []
  48. for point_type in ["灵感点", "关键点", "目的点"]:
  49. point_list_key = f"{point_type}列表"
  50. point_list = how_result.get(point_list_key, [])
  51. for point in point_list:
  52. point_name = point.get("名称", "")
  53. point_desc = point.get("描述", "")
  54. # 检查是否有匹配到人设特征
  55. has_match = False
  56. matched_feature = None
  57. similarity = None
  58. # 遍历 how 步骤列表中的特征
  59. for step in point.get("how步骤列表", []):
  60. for feature in step.get("特征列表", []):
  61. match_results = feature.get("匹配结果", [])
  62. if match_results: # 如果有匹配结果(top1)
  63. has_match = True
  64. match = match_results[0]
  65. matched_feature = match.get("人设特征名称", "")
  66. similarity = match.get("匹配结果", {}).get("相似度", 0)
  67. break
  68. if has_match:
  69. break
  70. point_info = {
  71. "名称": point_name,
  72. "描述": point_desc,
  73. "是否匹配到已有人设": has_match
  74. }
  75. if has_match:
  76. point_info["匹配的人设特征"] = matched_feature
  77. point_info["相似度"] = similarity
  78. points.append(point_info)
  79. return points
  80. def build_prompt(points: List[Dict]) -> str:
  81. """
  82. 构建分析 prompt (v2版本:增加前序节点分析)
  83. Args:
  84. points: 点信息列表
  85. Returns:
  86. prompt 文本
  87. """
  88. # 构建点的描述文本
  89. points_text = []
  90. matched_points_names = [] # 收集已匹配的点名称
  91. for i, point in enumerate(points, 1):
  92. text = f"{i}. {point['名称']}\n {point['描述']}"
  93. if point['是否匹配到已有人设']:
  94. text += f"\n [已匹配] 匹配到人设特征: {point['匹配的人设特征']} (相似度: {point['相似度']:.2f})"
  95. matched_points_names.append(point['名称'])
  96. else:
  97. text += "\n [未匹配] 未匹配到已有人设特征"
  98. points_text.append(text)
  99. points_section = "\n\n".join(points_text)
  100. matched_points_section = "\n".join([f"- {name}" for name in matched_points_names])
  101. return f'''
  102. 以下是一个内容创作的解构结果。这些点已经被分析和分类,但这个分类是分析维度,不代表真实的创作顺序。
  103. 请判断:在这些点中,哪些最有可能是创作者的"第一步"(创作起点),以及第一步的前序节点(第一步是怎么来的)。
  104. ## 判断标准
  105. **起点特征**:
  106. - 最先触发创作、不依赖其他点的节点
  107. - 可能是外部事件、时事热点、商业需求等
  108. - 起点可能有多个
  109. **前序节点规则**:
  110. - 每个第一步只有**一个**直接前序节点
  111. - 客观分析哪个前序节点最有可能引发第一步,给出前序概率
  112. - 前序节点只能是以下两种之一:
  113. 1. **已匹配的人设节点**:从下面的"已匹配的点"列表中选择
  114. 2. **外部触发**:纯外部事件触发
  115. - 选择前序概率最高的那个,不要预设倾向
  116. **搜索关键词规则**(重要!):
  117. - 如果前序是外部触发,必须提供搜索关键词
  118. - 搜索关键词**只能**从点的名称中提取,不能推导、不能扩展、不能添加任何额外的词
  119. ## 已匹配的点(可作为前序节点)
  120. {matched_points_section}
  121. ## 待分析的点
  122. {points_section}
  123. ## 输出要求
  124. 以 JSON 格式输出:
  125. {{
  126. "推理过程": "详细说明判断逻辑...",
  127. "第一步候选": [
  128. {{
  129. "点名称": "...",
  130. "第一步概率": 0.95,
  131. "推理依据": "...",
  132. "来源分析": "外部触发/人设延伸/商业驱动/其他",
  133. "前序节点": {{
  134. "类型": "已有人设" 或 "外部触发",
  135. "人设节点名称": "..." 或 null, // 如果类型是"已有人设",必须从上面的"已匹配的点"中选择
  136. "匹配的人设特征": "..." 或 null, // 如果有人设节点,填写其匹配的特征
  137. "相似度": 0.84 或 null, // 人设节点与特征的匹配相似度
  138. "前序概率": 0.75, // 前序节点引发第一步的概率(0-1之间)
  139. "搜索关键词": ["关键词1", "关键词2"] 或 null, // 只在"外部触发"时提供,且只能从点名称中提取
  140. "推理": "说明为什么这个前序节点可能引发第一步"
  141. }}
  142. }}
  143. ]
  144. }}
  145. 注意:
  146. 1. 只输出最有可能是第一步的点(通常1-3个)
  147. 2. 按第一步概率降序排列
  148. 3. 客观分析前序节点,选择前序概率最高的(无论是已匹配人设还是外部触发)
  149. 4. 搜索关键词只能从点名称中提取,不能推导
  150. 5. 前序节点不需要能完全推导出第一步,只要可能引发创作者关注即可
  151. '''.strip()
  152. async def analyze_post(post_data: Dict) -> Dict:
  153. """
  154. 分析单个帖子
  155. Args:
  156. post_data: 帖子数据(包含 how解构结果)
  157. Returns:
  158. 分析结果
  159. """
  160. post_id = post_data.get("帖子id", "")
  161. how_result = post_data.get("how解构结果", {})
  162. # 提取所有点的信息
  163. points = extract_points_from_filtered_result(how_result)
  164. if not points:
  165. return {
  166. "帖子id": post_id,
  167. "模型": MODEL_NAME,
  168. "输入": {"点列表": []},
  169. "输出": None,
  170. "错误": "没有可分析的点"
  171. }
  172. # 构建 prompt
  173. prompt = build_prompt(points)
  174. # 使用 custom_span 标识单个帖子的分析流程
  175. with custom_span(
  176. name=f"分析第一步 - 帖子 {post_id}",
  177. data={
  178. "帖子id": post_id,
  179. "点数量": len(points),
  180. "模型": MODEL_NAME
  181. }
  182. ):
  183. # 调用 agent
  184. result = await Runner.run(agent, input=prompt)
  185. output = result.final_output
  186. # 解析 JSON
  187. try:
  188. if "```json" in output:
  189. json_start = output.find("```json") + 7
  190. json_end = output.find("```", json_start)
  191. json_str = output[json_start:json_end].strip()
  192. elif "{" in output and "}" in output:
  193. json_start = output.find("{")
  194. json_end = output.rfind("}") + 1
  195. json_str = output[json_start:json_end]
  196. else:
  197. json_str = output
  198. analysis_result = json.loads(json_str)
  199. return {
  200. "帖子id": post_id,
  201. "模型": MODEL_NAME,
  202. "输入": {
  203. "点列表": points,
  204. "prompt": prompt
  205. },
  206. "输出": analysis_result
  207. }
  208. except Exception as e:
  209. return {
  210. "帖子id": post_id,
  211. "模型": MODEL_NAME,
  212. "输入": {
  213. "点列表": points,
  214. "prompt": prompt
  215. },
  216. "输出": None,
  217. "错误": str(e),
  218. "原始输出": output
  219. }
  220. async def main(current_time: str = None, log_url: str = None):
  221. """主函数
  222. Args:
  223. current_time: 当前时间戳(从外部传入)
  224. log_url: 日志链接(从外部传入)
  225. """
  226. # 使用路径配置
  227. config = PathConfig()
  228. # 确保输出目录存在
  229. config.ensure_dirs()
  230. # 获取路径
  231. input_dir = config.intermediate_dir / "filtered_results"
  232. output_dir = config.intermediate_dir / "first_step_analysis_v2"
  233. # 确保输出目录存在
  234. output_dir.mkdir(parents=True, exist_ok=True)
  235. print(f"账号: {config.account_name}")
  236. print(f"输入目录: {input_dir}")
  237. print(f"输出目录: {output_dir}")
  238. print(f"使用模型: {MODEL_NAME}")
  239. if log_url:
  240. print(f"Trace URL: {log_url}")
  241. print()
  242. # 读取所有过滤后的文件
  243. input_files = list(input_dir.glob("*_filtered.json"))
  244. if not input_files:
  245. print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件")
  246. return
  247. print(f"找到 {len(input_files)} 个文件待分析\n")
  248. # 批量分析
  249. results = []
  250. for i, input_file in enumerate(input_files, 1):
  251. print(f"[{i}/{len(input_files)}] 分析文件: {input_file.name}")
  252. # 读取文件
  253. with open(input_file, "r", encoding="utf-8") as f:
  254. post_data = json.load(f)
  255. # 分析
  256. result = await analyze_post(post_data)
  257. results.append(result)
  258. # 立即保存单个帖子的结果
  259. post_id = result.get("帖子id", "unknown")
  260. single_output_file = output_dir / f"{post_id}_first_step.json"
  261. single_result = {
  262. "元数据": {
  263. "current_time": current_time,
  264. "log_url": log_url,
  265. "model": MODEL_NAME
  266. },
  267. "帖子id": post_id,
  268. "分析结果": result
  269. }
  270. with open(single_output_file, "w", encoding="utf-8") as f:
  271. json.dump(single_result, f, ensure_ascii=False, indent=2)
  272. # 显示结果
  273. output = result.get("输出", {})
  274. if output:
  275. first_steps = output.get("第一步候选", [])
  276. print(f" 第一步候选:")
  277. for step in first_steps:
  278. print(f" - {step.get('点名称', 'N/A')} (概率: {step.get('第一步概率', 0):.2f})")
  279. print(f" ✓ 已保存: {single_output_file.name}")
  280. else:
  281. print(f" 分析失败: {result.get('错误', 'N/A')}")
  282. print()
  283. print(f"✓ 所有分析完成,结果已保存到: {output_dir}")
  284. if log_url:
  285. print(f"Trace: {log_url}")
  286. # 打印汇总
  287. print("\n" + "=" * 80)
  288. print("分析汇总")
  289. print("=" * 80)
  290. for result in results:
  291. post_id = result["帖子id"]
  292. output = result.get("输出", {})
  293. if output:
  294. first_steps = output.get("第一步候选", [])
  295. print(f"\n帖子 {post_id}:")
  296. for step in first_steps:
  297. print(f" - {step.get('点名称', 'N/A')} ({step.get('来源分析', 'N/A')}, 概率: {step.get('第一步概率', 0):.2f})")
  298. else:
  299. print(f"\n帖子 {post_id}: 分析失败")
  300. if __name__ == "__main__":
  301. # 设置 trace
  302. current_time, log_url = set_trace()
  303. # 使用 trace 上下文包裹整个执行流程
  304. with trace("第一步分析"):
  305. asyncio.run(main(current_time, log_url))