process_sources.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """
  2. Phase 1.6: 使用 Claude 对 source.json 中的原始帖子进行结构化处理
  3. 从 source.json 读取原始帖子数据,调用 Claude API 提取工序步骤,
  4. 输出到 case_detailed.json 或更新原有的 case_*.json
  5. """
  6. import asyncio
  7. import json
  8. from pathlib import Path
  9. from typing import Any, Dict, List, Optional
  10. # 工序提取的 prompt 模板
  11. WORKFLOW_EXTRACTION_PROMPT = """将以下帖子内容总结为AI图片生成的工序,以JSON格式输出。
  12. ## 工序提取规则
  13. - 步骤粒度是"做了什么",而非"怎么做"
  14. - 若涉及输入内容,描述其目的而非具体内容(如"输入将照片转化为X风格的提示词",而非"输入提示词包含A/B/C参数")
  15. - 若涉及上传素材,说明素材类型和用途
  16. - 同一工具内的参数配置不拆分为多步
  17. - 若本质上只有一步,就输出一步
  18. ## 输出格式
  19. {{
  20. "工序步骤": [
  21. {{
  22. "步骤序号": 1,
  23. "步骤描述": "string"
  24. }}
  25. ]
  26. }}
  27. ## 帖子内容
  28. 标题:{title}
  29. 正文:
  30. {body_text}
  31. 请严格按照上述格式输出JSON,不要包含其他内容。"""
  32. async def extract_workflow_from_post(
  33. post: Dict[str, Any],
  34. llm_call: Any,
  35. model: str = "anthropic/claude-sonnet-4-5"
  36. ) -> tuple[Optional[Dict[str, Any]], float]:
  37. """
  38. 使用 LLM 从单个帖子中提取工序步骤
  39. Args:
  40. post: 原始帖子数据(包含 title、body_text 等)
  41. llm_call: LLM 调用函数
  42. model: 模型名称
  43. Returns:
  44. (提取的工序数据, 成本)
  45. """
  46. title = post.get("title", "")
  47. body_text = post.get("body_text", "")
  48. if not body_text:
  49. return None, 0.0
  50. # 构造 prompt
  51. prompt = WORKFLOW_EXTRACTION_PROMPT.format(
  52. title=title,
  53. body_text=body_text[:2000] # 限制长度避免超出 token 限制
  54. )
  55. messages = [{"role": "user", "content": prompt}]
  56. try:
  57. # 调用 LLM
  58. response = await llm_call(
  59. messages=messages,
  60. model=model,
  61. temperature=0.1,
  62. max_tokens=2000
  63. )
  64. # 计算成本
  65. usage = response.get("usage", {})
  66. input_tokens = usage.get("input_tokens", 0) or usage.get("prompt_tokens", 0)
  67. output_tokens = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0)
  68. # Claude Sonnet 4.5 定价(通过 OpenRouter)
  69. cost = (input_tokens / 1e6 * 3.0) + (output_tokens / 1e6 * 15.0)
  70. # 解析响应
  71. content = response.get("content", "")
  72. if isinstance(content, list):
  73. content = content[0].get("text", "") if content else ""
  74. # 尝试提取 JSON
  75. import re
  76. json_match = re.search(r'\{[\s\S]*\}', content)
  77. if json_match:
  78. workflow_data = json.loads(json_match.group())
  79. return workflow_data, cost
  80. else:
  81. print(f"Warning: Failed to extract JSON from response for post: {title}")
  82. return None, cost
  83. except Exception as e:
  84. print(f"Error processing post '{title}': {e}")
  85. return None, 0.0
  86. async def process_sources(
  87. source_file: Path,
  88. output_file: Path,
  89. llm_call: Any,
  90. model: str = "anthropic/claude-sonnet-4-5",
  91. max_concurrent: int = 3
  92. ) -> Dict[str, Any]:
  93. """
  94. 处理 source.json 中的所有帖子,提取工序步骤
  95. Args:
  96. source_file: source.json 文件路径
  97. output_file: 输出文件路径
  98. llm_call: LLM 调用函数
  99. model: 模型名称
  100. max_concurrent: 最大并发数
  101. Returns:
  102. 统计信息
  103. """
  104. # 读取 source.json
  105. with open(source_file, "r", encoding="utf-8") as f:
  106. source_data = json.load(f)
  107. sources = source_data.get("sources", [])
  108. print(f"Processing {len(sources)} posts...")
  109. # 创建信号量限制并发
  110. semaphore = asyncio.Semaphore(max_concurrent)
  111. async def process_with_semaphore(source_item):
  112. async with semaphore:
  113. post = source_item.get("post", {})
  114. workflow, cost = await extract_workflow_from_post(post, llm_call, model)
  115. return {
  116. "case_file": source_item.get("case_file"),
  117. "platform": source_item.get("platform"),
  118. "channel_content_id": source_item.get("channel_content_id"),
  119. "source_url": source_item.get("source_url"),
  120. "title": post.get("title", ""),
  121. "workflow": workflow,
  122. }, cost
  123. # 并发处理所有帖子
  124. tasks = [process_with_semaphore(src) for src in sources]
  125. results_with_costs = await asyncio.gather(*tasks)
  126. # 分离结果和成本
  127. results = [r[0] for r in results_with_costs]
  128. costs = [r[1] for r in results_with_costs]
  129. total_cost = sum(costs)
  130. # 统计
  131. success_count = sum(1 for r in results if r.get("workflow"))
  132. failed_count = len(results) - success_count
  133. # 输出结果
  134. output_data = {
  135. "total": len(results),
  136. "success": success_count,
  137. "failed": failed_count,
  138. "cases": results
  139. }
  140. output_file.parent.mkdir(parents=True, exist_ok=True)
  141. with open(output_file, "w", encoding="utf-8") as f:
  142. json.dump(output_data, f, ensure_ascii=False, indent=2)
  143. return {
  144. "total": len(results),
  145. "success": success_count,
  146. "failed": failed_count,
  147. "total_cost": total_cost,
  148. "output_file": str(output_file)
  149. }
  150. async def main():
  151. """命令行入口"""
  152. import sys
  153. if len(sys.argv) < 2:
  154. print("Usage: python process_sources.py <raw_cases_dir>")
  155. sys.exit(1)
  156. raw_cases_dir = Path(sys.argv[1])
  157. source_file = raw_cases_dir / "source.json"
  158. output_file = raw_cases_dir / "case_detailed.json"
  159. if not source_file.exists():
  160. print(f"Error: {source_file} not found")
  161. sys.exit(1)
  162. # 导入 LLM 调用函数
  163. sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent.parent))
  164. from agent.llm.openrouter import create_openrouter_llm_call
  165. llm_call = create_openrouter_llm_call(model="anthropic/claude-sonnet-4-5")
  166. print(f"Processing sources from: {source_file}")
  167. stats = await process_sources(source_file, output_file, llm_call)
  168. print(f"\n[OK] Total: {stats['total']}, Success: {stats['success']}, Failed: {stats['failed']}")
  169. print(f" Output: {stats['output_file']}")
  170. if __name__ == "__main__":
  171. asyncio.run(main())