| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- """
- Phase 1.6: 使用 Claude 对 source.json 中的原始帖子进行结构化处理
- 从 source.json 读取原始帖子数据,调用 Claude API 提取工序步骤,
- 输出到 case_detailed.json 或更新原有的 case_*.json
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- # 工序提取的 prompt 模板
- WORKFLOW_EXTRACTION_PROMPT = """将以下帖子内容总结为AI图片生成的工序,以JSON格式输出。
- ## 工序提取规则
- - 步骤粒度是"做了什么",而非"怎么做"
- - 若涉及输入内容,描述其目的而非具体内容(如"输入将照片转化为X风格的提示词",而非"输入提示词包含A/B/C参数")
- - 若涉及上传素材,说明素材类型和用途
- - 同一工具内的参数配置不拆分为多步
- - 若本质上只有一步,就输出一步
- ## 输出格式
- {{
- "工序步骤": [
- {{
- "步骤序号": 1,
- "步骤描述": "string"
- }}
- ]
- }}
- ## 帖子内容
- 标题:{title}
- 正文:
- {body_text}
- 请严格按照上述格式输出JSON,不要包含其他内容。"""
- async def extract_workflow_from_post(
- post: Dict[str, Any],
- llm_call: Any,
- model: str = "anthropic/claude-sonnet-4-5"
- ) -> tuple[Optional[Dict[str, Any]], float]:
- """
- 使用 LLM 从单个帖子中提取工序步骤
- Args:
- post: 原始帖子数据(包含 title、body_text 等)
- llm_call: LLM 调用函数
- model: 模型名称
- Returns:
- (提取的工序数据, 成本)
- """
- title = post.get("title", "")
- body_text = post.get("body_text", "")
- if not body_text:
- return None, 0.0
- # 构造 prompt
- prompt = WORKFLOW_EXTRACTION_PROMPT.format(
- title=title,
- body_text=body_text[:2000] # 限制长度避免超出 token 限制
- )
- messages = [{"role": "user", "content": prompt}]
- try:
- # 调用 LLM
- response = await llm_call(
- messages=messages,
- model=model,
- temperature=0.1,
- max_tokens=2000
- )
- # 计算成本
- usage = response.get("usage", {})
- input_tokens = usage.get("input_tokens", 0) or usage.get("prompt_tokens", 0)
- output_tokens = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0)
- # Claude Sonnet 4.5 定价(通过 OpenRouter)
- cost = (input_tokens / 1e6 * 3.0) + (output_tokens / 1e6 * 15.0)
- # 解析响应
- content = response.get("content", "")
- if isinstance(content, list):
- content = content[0].get("text", "") if content else ""
- # 尝试提取 JSON
- import re
- json_match = re.search(r'\{[\s\S]*\}', content)
- if json_match:
- workflow_data = json.loads(json_match.group())
- return workflow_data, cost
- else:
- print(f"Warning: Failed to extract JSON from response for post: {title}")
- return None, cost
- except Exception as e:
- print(f"Error processing post '{title}': {e}")
- return None, 0.0
- async def process_sources(
- source_file: Path,
- output_file: Path,
- llm_call: Any,
- model: str = "anthropic/claude-sonnet-4-5",
- max_concurrent: int = 3
- ) -> Dict[str, Any]:
- """
- 处理 source.json 中的所有帖子,提取工序步骤
- Args:
- source_file: source.json 文件路径
- output_file: 输出文件路径
- llm_call: LLM 调用函数
- model: 模型名称
- max_concurrent: 最大并发数
- Returns:
- 统计信息
- """
- # 读取 source.json
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- sources = source_data.get("sources", [])
- print(f"Processing {len(sources)} posts...")
- # 创建信号量限制并发
- semaphore = asyncio.Semaphore(max_concurrent)
- async def process_with_semaphore(source_item):
- async with semaphore:
- post = source_item.get("post", {})
- workflow, cost = await extract_workflow_from_post(post, llm_call, model)
- return {
- "case_file": source_item.get("case_file"),
- "platform": source_item.get("platform"),
- "channel_content_id": source_item.get("channel_content_id"),
- "source_url": source_item.get("source_url"),
- "title": post.get("title", ""),
- "workflow": workflow,
- }, cost
- # 并发处理所有帖子
- tasks = [process_with_semaphore(src) for src in sources]
- results_with_costs = await asyncio.gather(*tasks)
- # 分离结果和成本
- results = [r[0] for r in results_with_costs]
- costs = [r[1] for r in results_with_costs]
- total_cost = sum(costs)
- # 统计
- success_count = sum(1 for r in results if r.get("workflow"))
- failed_count = len(results) - success_count
- # 输出结果
- output_data = {
- "total": len(results),
- "success": success_count,
- "failed": failed_count,
- "cases": results
- }
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- return {
- "total": len(results),
- "success": success_count,
- "failed": failed_count,
- "total_cost": total_cost,
- "output_file": str(output_file)
- }
- async def main():
- """命令行入口"""
- import sys
- if len(sys.argv) < 2:
- print("Usage: python process_sources.py <raw_cases_dir>")
- sys.exit(1)
- raw_cases_dir = Path(sys.argv[1])
- source_file = raw_cases_dir / "source.json"
- output_file = raw_cases_dir / "case_detailed.json"
- if not source_file.exists():
- print(f"Error: {source_file} not found")
- sys.exit(1)
- # 导入 LLM 调用函数
- sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent.parent))
- from agent.llm.openrouter import create_openrouter_llm_call
- llm_call = create_openrouter_llm_call(model="anthropic/claude-sonnet-4-5")
- print(f"Processing sources from: {source_file}")
- stats = await process_sources(source_file, output_file, llm_call)
- print(f"\n[OK] Total: {stats['total']}, Success: {stats['success']}, Failed: {stats['failed']}")
- print(f" Output: {stats['output_file']}")
- if __name__ == "__main__":
- asyncio.run(main())
|