""" Phase 1.6: 使用 Claude 对 source.json 中的原始帖子进行结构化处理 从 source.json 读取原始帖子数据,调用 Claude API 提取工序步骤, 输出到 case_detailed.json 或更新原有的 case_*.json """ import asyncio import json from pathlib import Path from typing import Any, Dict, List, Optional # 工序提取的 prompt 模板 WORKFLOW_EXTRACTION_PROMPT = """将以下帖子内容总结为AI图片生成的工序,以JSON格式输出。 ## 工序提取规则 - 步骤粒度是"做了什么",而非"怎么做" - 若涉及输入内容,描述其目的而非具体内容(如"输入将照片转化为X风格的提示词",而非"输入提示词包含A/B/C参数") - 若涉及上传素材,说明素材类型和用途 - 同一工具内的参数配置不拆分为多步 - 若本质上只有一步,就输出一步 ## 输出格式 {{ "工序步骤": [ {{ "步骤序号": 1, "步骤描述": "string" }} ] }} ## 帖子内容 标题:{title} 正文: {body_text} 请严格按照上述格式输出JSON,不要包含其他内容。""" async def extract_workflow_from_post( post: Dict[str, Any], llm_call: Any, model: str = "anthropic/claude-sonnet-4-5" ) -> tuple[Optional[Dict[str, Any]], float]: """ 使用 LLM 从单个帖子中提取工序步骤 Args: post: 原始帖子数据(包含 title、body_text 等) llm_call: LLM 调用函数 model: 模型名称 Returns: (提取的工序数据, 成本) """ title = post.get("title", "") body_text = post.get("body_text", "") if not body_text: return None, 0.0 # 构造 prompt prompt = WORKFLOW_EXTRACTION_PROMPT.format( title=title, body_text=body_text[:2000] # 限制长度避免超出 token 限制 ) messages = [{"role": "user", "content": prompt}] try: # 调用 LLM response = await llm_call( messages=messages, model=model, temperature=0.1, max_tokens=2000 ) # 计算成本 usage = response.get("usage", {}) input_tokens = usage.get("input_tokens", 0) or usage.get("prompt_tokens", 0) output_tokens = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0) # Claude Sonnet 4.5 定价(通过 OpenRouter) cost = (input_tokens / 1e6 * 3.0) + (output_tokens / 1e6 * 15.0) # 解析响应 content = response.get("content", "") if isinstance(content, list): content = content[0].get("text", "") if content else "" # 尝试提取 JSON import re json_match = re.search(r'\{[\s\S]*\}', content) if json_match: workflow_data = json.loads(json_match.group()) return workflow_data, cost else: print(f"Warning: Failed to extract JSON from response for post: {title}") return None, cost except Exception as e: print(f"Error processing post '{title}': {e}") return None, 0.0 async def process_sources( source_file: Path, output_file: Path, llm_call: Any, model: str = "anthropic/claude-sonnet-4-5", max_concurrent: int = 3 ) -> Dict[str, Any]: """ 处理 source.json 中的所有帖子,提取工序步骤 Args: source_file: source.json 文件路径 output_file: 输出文件路径 llm_call: LLM 调用函数 model: 模型名称 max_concurrent: 最大并发数 Returns: 统计信息 """ # 读取 source.json with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) sources = source_data.get("sources", []) print(f"Processing {len(sources)} posts...") # 创建信号量限制并发 semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore(source_item): async with semaphore: post = source_item.get("post", {}) workflow, cost = await extract_workflow_from_post(post, llm_call, model) return { "case_file": source_item.get("case_file"), "platform": source_item.get("platform"), "channel_content_id": source_item.get("channel_content_id"), "source_url": source_item.get("source_url"), "title": post.get("title", ""), "workflow": workflow, }, cost # 并发处理所有帖子 tasks = [process_with_semaphore(src) for src in sources] results_with_costs = await asyncio.gather(*tasks) # 分离结果和成本 results = [r[0] for r in results_with_costs] costs = [r[1] for r in results_with_costs] total_cost = sum(costs) # 统计 success_count = sum(1 for r in results if r.get("workflow")) failed_count = len(results) - success_count # 输出结果 output_data = { "total": len(results), "success": success_count, "failed": failed_count, "cases": results } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) return { "total": len(results), "success": success_count, "failed": failed_count, "total_cost": total_cost, "output_file": str(output_file) } async def main(): """命令行入口""" import sys if len(sys.argv) < 2: print("Usage: python process_sources.py ") sys.exit(1) raw_cases_dir = Path(sys.argv[1]) source_file = raw_cases_dir / "source.json" output_file = raw_cases_dir / "case_detailed.json" if not source_file.exists(): print(f"Error: {source_file} not found") sys.exit(1) # 导入 LLM 调用函数 sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent.parent)) from agent.llm.openrouter import create_openrouter_llm_call llm_call = create_openrouter_llm_call(model="anthropic/claude-sonnet-4-5") print(f"Processing sources from: {source_file}") stats = await process_sources(source_file, output_file, llm_call) print(f"\n[OK] Total: {stats['total']}, Success: {stats['success']}, Failed: {stats['failed']}") print(f" Output: {stats['output_file']}") if __name__ == "__main__": asyncio.run(main())