analyze_first_step.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 第一步分析脚本
  5. 基于过滤后的 how 解构结果,分析哪些点最有可能是创作者的第一步(创作起点)。
  6. 输入:intermediate/filtered_results/ 中的过滤结果
  7. 输出:第一步分析结果
  8. """
  9. import asyncio
  10. import json
  11. from pathlib import Path
  12. from typing import Dict, List
  13. import sys
  14. # 添加项目根目录到路径
  15. project_root = Path(__file__).parent.parent.parent
  16. sys.path.insert(0, str(project_root))
  17. from agents import Agent, Runner, ModelSettings, trace
  18. from agents.tracing.create import custom_span
  19. from lib.client import get_model
  20. from lib.my_trace import set_trace_smith as set_trace
  21. from script.data_processing.path_config import PathConfig
  22. # 模型配置
  23. MODEL_NAME = "google/gemini-3-pro-preview"
  24. # MODEL_NAME = 'anthropic/claude-sonnet-4.5'
  25. agent = Agent(
  26. name="First Step Analyzer",
  27. model=get_model(MODEL_NAME),
  28. model_settings=ModelSettings(
  29. temperature=0.0,
  30. max_tokens=65536,
  31. ),
  32. tools=[],
  33. )
  34. def extract_points_from_filtered_result(how_result: Dict) -> List[Dict]:
  35. """
  36. 从过滤后的 how 解构结果中提取所有点的信息
  37. Args:
  38. how_result: how解构结果
  39. Returns:
  40. 点信息列表,不包含原始分类标签
  41. """
  42. points = []
  43. for point_type in ["灵感点", "关键点", "目的点"]:
  44. point_list_key = f"{point_type}列表"
  45. point_list = how_result.get(point_list_key, [])
  46. for point in point_list:
  47. point_name = point.get("名称", "")
  48. point_desc = point.get("描述", "")
  49. # 检查是否有匹配到人设特征
  50. has_match = False
  51. matched_feature = None
  52. similarity = None
  53. # 遍历 how 步骤列表中的特征
  54. for step in point.get("how步骤列表", []):
  55. for feature in step.get("特征列表", []):
  56. match_results = feature.get("匹配结果", [])
  57. if match_results: # 如果有匹配结果(top1)
  58. has_match = True
  59. match = match_results[0]
  60. matched_feature = match.get("人设特征名称", "")
  61. similarity = match.get("匹配结果", {}).get("相似度", 0)
  62. break
  63. if has_match:
  64. break
  65. point_info = {
  66. "名称": point_name,
  67. "描述": point_desc,
  68. "是否匹配到已有人设": has_match
  69. }
  70. if has_match:
  71. point_info["匹配的人设特征"] = matched_feature
  72. point_info["相似度"] = similarity
  73. points.append(point_info)
  74. return points
  75. def build_prompt(points: List[Dict]) -> str:
  76. """
  77. 构建分析 prompt
  78. Args:
  79. points: 点信息列表
  80. Returns:
  81. prompt 文本
  82. """
  83. # 构建点的描述文本
  84. points_text = []
  85. for i, point in enumerate(points, 1):
  86. text = f"{i}. {point['名称']}\n {point['描述']}"
  87. if point['是否匹配到已有人设']:
  88. text += f"\n [已匹配] 匹配到人设特征: {point['匹配的人设特征']} (相似度: {point['相似度']:.2f})"
  89. else:
  90. text += "\n [未匹配] 未匹配到已有人设特征"
  91. points_text.append(text)
  92. points_section = "\n\n".join(points_text)
  93. return f'''
  94. 以下是一个内容创作的解构结果。这些点已经被分析和分类,但这个分类是分析维度,不代表真实的创作顺序。
  95. 请判断:在这些点中,哪些最有可能是创作者的"第一步"(创作起点)?
  96. ## 判断标准
  97. **起点特征**:
  98. - 最先触发创作、不依赖其他点的节点
  99. - 可能是外部事件、时事热点、商业需求等
  100. - 起点可能有多个
  101. **参考信息**:
  102. - **已匹配到人设的点**:来源于创作者已有的人设/风格/习惯
  103. - **未匹配的点**:可能来自外部触发、人设推导、或新尝试
  104. ## 待分析的点
  105. {points_section}
  106. ## 输出要求
  107. 以 JSON 格式输出:
  108. {{
  109. "推理过程": "详细说明判断逻辑...",
  110. "第一步候选": [
  111. {{
  112. "点名称": "...",
  113. "第一步概率": 0.95, // 0-1之间的数值
  114. "推理依据": "...",
  115. "来源分析": "外部触发/人设延伸/商业驱动/其他"
  116. }}
  117. ]
  118. }}
  119. 注意:
  120. 1. 只输出最有可能是第一步的点(通常1-3个)
  121. 2. 按第一步概率降序排列
  122. 3. 不要被点的呈现顺序影响判断
  123. '''.strip()
  124. async def analyze_post(post_data: Dict) -> Dict:
  125. """
  126. 分析单个帖子
  127. Args:
  128. post_data: 帖子数据(包含 how解构结果)
  129. Returns:
  130. 分析结果
  131. """
  132. post_id = post_data.get("帖子id", "")
  133. how_result = post_data.get("how解构结果", {})
  134. # 提取所有点的信息
  135. points = extract_points_from_filtered_result(how_result)
  136. if not points:
  137. return {
  138. "帖子id": post_id,
  139. "模型": MODEL_NAME,
  140. "输入": {"点列表": []},
  141. "输出": None,
  142. "错误": "没有可分析的点"
  143. }
  144. # 构建 prompt
  145. prompt = build_prompt(points)
  146. # 使用 custom_span 标识单个帖子的分析流程
  147. with custom_span(
  148. name=f"分析第一步 - 帖子 {post_id}",
  149. data={
  150. "帖子id": post_id,
  151. "点数量": len(points),
  152. "模型": MODEL_NAME
  153. }
  154. ):
  155. # 调用 agent
  156. result = await Runner.run(agent, input=prompt)
  157. output = result.final_output
  158. # 解析 JSON
  159. try:
  160. if "```json" in output:
  161. json_start = output.find("```json") + 7
  162. json_end = output.find("```", json_start)
  163. json_str = output[json_start:json_end].strip()
  164. elif "{" in output and "}" in output:
  165. json_start = output.find("{")
  166. json_end = output.rfind("}") + 1
  167. json_str = output[json_start:json_end]
  168. else:
  169. json_str = output
  170. analysis_result = json.loads(json_str)
  171. return {
  172. "帖子id": post_id,
  173. "模型": MODEL_NAME,
  174. "输入": {
  175. "点列表": points,
  176. "prompt": prompt
  177. },
  178. "输出": analysis_result
  179. }
  180. except Exception as e:
  181. return {
  182. "帖子id": post_id,
  183. "模型": MODEL_NAME,
  184. "输入": {
  185. "点列表": points,
  186. "prompt": prompt
  187. },
  188. "输出": None,
  189. "错误": str(e),
  190. "原始输出": output
  191. }
  192. async def main(current_time: str = None, log_url: str = None):
  193. """主函数
  194. Args:
  195. current_time: 当前时间戳(从外部传入)
  196. log_url: 日志链接(从外部传入)
  197. """
  198. # 使用路径配置
  199. config = PathConfig()
  200. # 确保输出目录存在
  201. config.ensure_dirs()
  202. # 获取路径
  203. input_dir = config.intermediate_dir / "filtered_results"
  204. output_dir = config.intermediate_dir / "first_step_analysis"
  205. # 确保输出目录存在
  206. output_dir.mkdir(parents=True, exist_ok=True)
  207. print(f"账号: {config.account_name}")
  208. print(f"输入目录: {input_dir}")
  209. print(f"输出目录: {output_dir}")
  210. print(f"使用模型: {MODEL_NAME}")
  211. if log_url:
  212. print(f"Trace URL: {log_url}")
  213. print()
  214. # 读取所有过滤后的文件
  215. input_files = list(input_dir.glob("*_filtered.json"))
  216. if not input_files:
  217. print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件")
  218. return
  219. print(f"找到 {len(input_files)} 个文件待分析\n")
  220. # 批量分析
  221. results = []
  222. for i, input_file in enumerate(input_files, 1):
  223. print(f"[{i}/{len(input_files)}] 分析文件: {input_file.name}")
  224. # 读取文件
  225. with open(input_file, "r", encoding="utf-8") as f:
  226. post_data = json.load(f)
  227. # 分析
  228. result = await analyze_post(post_data)
  229. results.append(result)
  230. # 立即保存单个帖子的结果
  231. post_id = result.get("帖子id", "unknown")
  232. single_output_file = output_dir / f"{post_id}_first_step.json"
  233. single_result = {
  234. "元数据": {
  235. "current_time": current_time,
  236. "log_url": log_url,
  237. "model": MODEL_NAME
  238. },
  239. "帖子id": post_id,
  240. "分析结果": result
  241. }
  242. with open(single_output_file, "w", encoding="utf-8") as f:
  243. json.dump(single_result, f, ensure_ascii=False, indent=2)
  244. # 显示结果
  245. output = result.get("输出", {})
  246. if output:
  247. first_steps = output.get("第一步候选", [])
  248. print(f" 第一步候选:")
  249. for step in first_steps:
  250. print(f" - {step.get('点名称', 'N/A')} (概率: {step.get('第一步概率', 0):.2f})")
  251. print(f" ✓ 已保存: {single_output_file.name}")
  252. else:
  253. print(f" 分析失败: {result.get('错误', 'N/A')}")
  254. print()
  255. print(f"✓ 所有分析完成,结果已保存到: {output_dir}")
  256. if log_url:
  257. print(f"Trace: {log_url}")
  258. # 打印汇总
  259. print("\n" + "=" * 80)
  260. print("分析汇总")
  261. print("=" * 80)
  262. for result in results:
  263. post_id = result["帖子id"]
  264. output = result.get("输出", {})
  265. if output:
  266. first_steps = output.get("第一步候选", [])
  267. print(f"\n帖子 {post_id}:")
  268. for step in first_steps:
  269. print(f" - {step.get('点名称', 'N/A')} ({step.get('来源分析', 'N/A')}, 概率: {step.get('第一步概率', 0):.2f})")
  270. else:
  271. print(f"\n帖子 {post_id}: 分析失败")
  272. if __name__ == "__main__":
  273. # 设置 trace
  274. current_time, log_url = set_trace()
  275. # 使用 trace 上下文包裹整个执行流程
  276. with trace("第一步分析"):
  277. asyncio.run(main(current_time, log_url))