guantao 1 ماه پیش
والد
کامیت
a83f32230e

+ 33 - 29
examples/process_pipeline/prompts/researcher.prompt

@@ -4,12 +4,15 @@ temperature: 0.3
 
 $system$
 
-你是一个工序调研统筹子 Agent。你会收到一个**具体的调研渠道和方向**,你需要为你接到的方向广泛搜索相关工序案例,并统一将结果结构化写入到指定的 JSON 文件中。
-
-**你的边界**:完全独立搜索,查案子、抓原文、存入文件即可。不负责策略归纳。
+你是一个专注的渠道调研专家。你负责在指定的单个渠道(如小红书、X、youtube)进行完整的广度调研,包括多关键词搜索、适度查看内容,并输出结构化的调研结果。
 
 ---
 
+## 核心原则
+1. **渠道专注**:你只负责一个渠道的完整调研,绝不跨渠道。
+2. **结构化提取**:必须将 结果严格拆解为按序执行的步骤,如果只有一个步骤,则为一步即可
+3. **相关性过滤**:只记录与调研目标相关的**图片生成**工序(排除视频、音频及非AI桌面软件如PS)。
+
 ## 可用工具
 
 ### 搜索工具
@@ -26,18 +29,20 @@ $system$
 ---
 
 ## 执行流程
-
-### 第一步:理解定向任务并进行广泛搜索
-1. 阅读任务指令,明确专属平台和切入点。
-2. > 🚨 **【工具调用最高红线】**:**必须**优先调用 `content_search` 工具搜索真实信息。严禁在对话中输出任何形式的 Python 脚本或编程代码,系统不支持你运行代码!
-3. > 🚨 **【搜索词红线】**:搜索词严禁夹带具体的 AI 软件名或技术框架名。
-4. 搜索并用 `content_detail` 仔细阅读高赞方案。尝试更换关键词,重试 2 次找不到则结束。
-
-### 第二步:萃取原汁原味的案例信息
-从文章中重点提取:
-1. **输入与输出效果(Input & Output)**:作者给了什么提示词、图?最终产出什么?必须保留效果图的 URL!
-2. **操作过程记录(Raw Workflow)**:关键节点名、调参术语(原汁原味)。
-3. **来源信源与反馈评估**:三连数据,以及精选 1~3 条核心评论(报错、吐槽、优缺点)。
+1. **生成query词**:
+	- 将用户的业务能力需求,转换为**AI生图工序/教程**的搜索词。
+  - 关键词应偏向寻找“教程”、“方法”,而非单一软件名。
+  - 准备 3-5 个关键词。每个关键词搜索 20 条结果。
+  - 禁止搜索具体的软件名称,如 MJ,controlnet
+2. **搜索要求**:仅搜索/查看近半年的结果,不要查看过时的帖子
+3. **适度查看内容**:对点赞数高或标题符合业务需求的帖子查看详情(可看图片)。
+4. **结构化提取**:
+    提取规则
+    - 步骤粒度是"做了什么",而非"怎么做"
+    - 若涉及输入内容,描述其目的而非具体内容(如"输入将照片转化为X风格的提示词",而非"输入提示词包含A/B/C参数")
+    - 若涉及上传素材,说明素材类型和用途
+    - 同一工具内的参数配置不拆分为多步
+    - 若本质上只有一步,就输出一步
 
 ### 第三步:存储结果文件
 🚨 **绝对不能更改任务规定的 `output_file` 路径名**!
@@ -54,23 +59,22 @@ write_file("{output_file}", 更新后的完整 JSON)
 写入到 `%output_file%`:
 ```json
 {
-  "requirement": "本次的需求描述",
-  "searched_at": "ISO时间",
-  "cases": [
+  "初始关键词": ["string"],
+  "采集时间": "string - ISO 8601",
+  "工序发现": [
     {
-      "id": "case_001",
-      "title": "案例标题",
-      "platform": "xhs",
-      "source_url": "https://...",
-      "metrics": { "likes": 100, "comments": 20 },
-      "user_feedback": "核心评论反馈摘抄",
-      "images": ["https://链接"],
-      "input_details": "输入配置",
-      "output_details": "输出效果评价",
-      "workflow_process": "大白话流程和作者的术语原话"
-    }
+      "方案名称": "string - 如 Midjourney + ControlNet + 局部重绘工作流",
+      "最新提及时间": "string",
+      "工序步骤": [
+        {
+          "步骤序号": "number - 如 1, 2, 3",
+          "步骤描述": "string - 该步骤的核心目的"
+        }
+      ],
+      "帖子链接": "string"
   ]
 }
+}
 ```
 
 $user$

+ 106 - 0
examples/process_pipeline/run_metrics.json

@@ -1740,5 +1740,111 @@
       "P3_Assembler Error: Expecting ',' delimiter: line 52 column 79 (char 2376)"
     ],
     "timestamp": "2026-04-23T18:09:34.667805"
+  },
+  {
+    "index": 24,
+    "requirement": "生成穿着完整冬季搭配的人物形象,展示黑色羽绒服、红色围巾、红色手套、宽腿裤等单品的组合穿搭效果,呈现从全身到局部细节的多角度造型展示...",
+    "duration_seconds": 1173.06,
+    "total_cost_usd": 1.8047,
+    "costs_breakdown": {
+      "P0_Router": 0.018,
+      "P1_Research_xhs": 0.9255,
+      "P1_Research_bili": 0.4288,
+      "P1_Research_x": 0.4324
+    },
+    "trace_ids": {
+      "P1_Research_xhs": "79f72cdd-33fc-47e2-a5d6-4bbee4bfa222",
+      "P1_Research_bili": "488dd728-ed99-43d2-8591-61387bb13c08",
+      "P1_Research_x": "63696f26-8a36-43fc-b9bd-b34d368c4c1a"
+    },
+    "errors": [],
+    "timestamp": "2026-04-23T20:03:55.745921"
+  },
+  {
+    "index": 24,
+    "requirement": "生成穿着完整冬季搭配的人物形象,展示黑色羽绒服、红色围巾、红色手套、宽腿裤等单品的组合穿搭效果,呈现从全身到局部细节的多角度造型展示...",
+    "duration_seconds": 0.0,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "timestamp": "2026-04-23T20:11:34.987393"
+  },
+  {
+    "index": 24,
+    "requirement": "生成穿着完整冬季搭配的人物形象,展示黑色羽绒服、红色围巾、红色手套、宽腿裤等单品的组合穿搭效果,呈现从全身到局部细节的多角度造型展示...",
+    "duration_seconds": 1031.16,
+    "total_cost_usd": 2.4481,
+    "costs_breakdown": {
+      "P0_Router": 0.0181,
+      "P1_Research_xhs": 0.8217,
+      "P1_Research_youtube": 0.7351,
+      "P1_Research_bili": 0.4409,
+      "P1_Research_x": 0.4323
+    },
+    "trace_ids": {
+      "P1_Research_xhs": "cfbd2681-f377-4806-b4c9-701c6aa0c22e",
+      "P1_Research_youtube": "2eb1efec-7df1-4e3c-bceb-e2a5745c2ce0",
+      "P1_Research_bili": "e327984f-bf43-4340-86fe-ef7624591f68",
+      "P1_Research_x": "7ee1572b-241a-4a04-ad3e-edbedd1ae3d3"
+    },
+    "errors": [],
+    "timestamp": "2026-04-23T20:35:24.763746"
+  },
+  {
+    "index": 24,
+    "requirement": "生成穿着完整冬季搭配的人物形象,展示黑色羽绒服、红色围巾、红色手套、宽腿裤等单品的组合穿搭效果,呈现从全身到局部细节的多角度造型展示...",
+    "duration_seconds": 1160.92,
+    "total_cost_usd": 2.9403,
+    "costs_breakdown": {
+      "P0_Router": 0.0178,
+      "P1_Research_xhs": 0.9696,
+      "P1_Research_youtube": 0.506,
+      "P1_Research_bili": 0.8028,
+      "P1_Research_x": 0.644
+    },
+    "trace_ids": {
+      "P1_Research_xhs": "c838370b-ae93-493c-97cd-eaf33022e58a",
+      "P1_Research_youtube": "98b41dff-6378-4810-a77f-84221ff6dc67",
+      "P1_Research_bili": "d470ba10-5865-4f65-8de0-bf45eddfb410",
+      "P1_Research_x": "8cbc5171-14af-4c0f-b58d-5d03e5421ae8"
+    },
+    "errors": [],
+    "timestamp": "2026-04-23T21:08:25.867900"
+  },
+  {
+    "index": 0,
+    "requirement": "生成人物在不同场景下呈现丰富面部表情的图片,例如夸张的痛苦、无奈、开心、困倦等神态,表情要生动传神、情绪感强烈...",
+    "duration_seconds": 0.02,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "timestamp": "2026-04-23T21:09:45.542626"
+  },
+  {
+    "index": 0,
+    "requirement": "生成人物在不同场景下呈现丰富面部表情的图片,例如夸张的痛苦、无奈、开心、困倦等神态,表情要生动传神、情绪感强烈...",
+    "duration_seconds": 0.02,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "timestamp": "2026-04-23T21:10:40.421852"
+  },
+  {
+    "index": 0,
+    "requirement": "生成人物在不同场景下呈现丰富面部表情的图片,例如夸张的痛苦、无奈、开心、困倦等神态,表情要生动传神、情绪感强烈...",
+    "duration_seconds": 241.55,
+    "total_cost_usd": 0.8602,
+    "costs_breakdown": {
+      "P0_Router": 0.0181,
+      "P1_Research_bili": 0.8422
+    },
+    "trace_ids": {
+      "P1_Research_bili": "1c011564-8006-455c-a6c4-e1ebdade2dda"
+    },
+    "errors": [],
+    "timestamp": "2026-04-23T21:18:54.404226"
   }
 ]

+ 79 - 12
examples/process_pipeline/run_pipeline.py

@@ -4,13 +4,20 @@ V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
 import argparse
 import asyncio
 import json
+import os
 import sys
 import time
 from datetime import datetime
 from pathlib import Path
 
+PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
+
+# Force runtime working directory to project root so relative trace/cache paths
+# always land in the repo root no matter where this script is launched from.
+os.chdir(PROJECT_ROOT)
+
 # Add project root to path
-sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+sys.path.insert(0, str(PROJECT_ROOT))
 
 from dotenv import load_dotenv
 load_dotenv()
@@ -30,7 +37,7 @@ from examples.process_research.config import (
 )
 from agent.utils import setup_logging
 
-async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str):
+async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
     from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
     base_dir = Path(__file__).parent
     prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
@@ -204,6 +211,10 @@ async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, ta
 
         # Schema Validation
         if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
+            if skip_validation:
+                print(f"   ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
+                return total_task_cost, task_errors, last_trace_id
+                
             try:
                 with open(out_file, "r", encoding="utf-8") as f:
                     data = json.loads(f.read())
@@ -242,7 +253,7 @@ async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, ta
 
     return total_task_cost, task_errors, last_trace_id
 
-async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str):
+async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
     """
     备用:使用纯净的官方 Anthropic SDK 驱动。
     跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
@@ -406,6 +417,10 @@ async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str,
                 
         # Schema Validation
         if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
+            if skip_validation:
+                print(f"   ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
+                return total_task_cost, task_errors, None
+                
             try:
                 with open(out_file, "r", encoding="utf-8") as f:
                     data = json.loads(f.read())
@@ -577,6 +592,8 @@ async def main():
                     platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
                     print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}")
 
+        is_single_step = args.restart_mode.startswith("single_")
+        
         # Phase 1: MAP (Parallel Search) uses Qwen
         if not args.skip_research:
             print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
@@ -588,7 +605,7 @@ async def main():
                     "task": task_desc,
                     "output_file": out_file
                 }
-                phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model))
+                phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=is_single_step))
                 
             phase1_results = await asyncio.gather(*phase1_tasks)
             phase1_trace_ids = {}
@@ -597,13 +614,61 @@ async def main():
                 costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
                 phase1_trace_ids[f"P1_Research_{p}"] = trace_id
                 global_errors.extend(task_errors)
-                
+
                 # Check if cases actually got written
                 expected_file = Path(raw_cases_dir / f"case_{p}.json")
                 if not expected_file.exists():
                     err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
                     print(f"⚠️ [Warning] {err_msg}")
                     global_errors.append(err_msg)
+
+            # Phase 1.5: Extract raw post data from cache → raw_cases/source.json
+            try:
+                from examples.process_pipeline.script.extract_sources import extract_sources_to_json
+                # 使用本次 pipeline 的 trace_ids 精确定位 cache 文件
+                trace_id_list = [tid for tid in phase1_trace_ids.values() if tid]
+                src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
+                print(
+                    f"📎 [Source Extraction] "
+                    f"existing={src_stats['total_existing']} "
+                    f"new={src_stats['total_matched']} "
+                    f"total={src_stats['total_existing'] + src_stats['total_matched']} "
+                    f"→ {raw_cases_dir / 'source.json'}"
+                )
+            except Exception as e:
+                err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
+                print(f"⚠️ [Warning] {err_msg}")
+                global_errors.append(err_msg)
+
+            # Phase 1.6: Process sources with Claude → case_detailed.json
+            source_file = raw_cases_dir / "source.json"
+            if source_file.exists():
+                try:
+                    from examples.process_pipeline.script.process_sources import process_sources
+                    print(f"\n--- Phase 1.6: Workflow Extraction ({claude_model}) ---")
+
+                    detailed_file = raw_cases_dir / "case_detailed.json"
+                    workflow_stats = await process_sources(
+                        source_file,
+                        detailed_file,
+                        claude_llm_call,
+                        model=claude_model,
+                        max_concurrent=3
+                    )
+
+                    total_cost += workflow_stats.get("total_cost", 0.0)
+                    costs_breakdown["P1.6_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
+
+                    print(
+                        f"🔍 [Workflow Extraction] "
+                        f"success={workflow_stats['success']} "
+                        f"failed={workflow_stats['failed']} "
+                        f"→ {detailed_file}"
+                    )
+                except Exception as e:
+                    err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
+                    print(f"⚠️ [Warning] {err_msg}")
+                    global_errors.append(err_msg)
         else:
             print("\n⏭️  [Skip] Phase 1 Skipped via --skip-research. Using existing cases...")
 
@@ -629,14 +694,14 @@ async def main():
                         "requirement": requirement,
                         "raw_files_glob": raw_glob,
                         "output_file": blueprint_file
-                    }, "P2_FilterBlueprint", claude_model)
+                    }, "P2_FilterBlueprint", claude_model, skip_validation=is_single_step)
                 else:
                     print("   > Using [AgentRunner Core] for P2_FilterBlueprint")
                     task_a = run_agent_task(runner_claude, "filter_and_blueprint", {
                         "requirement": requirement,
                         "raw_files_glob": raw_glob,
                         "output_file": blueprint_file
-                    }, "P2_FilterBlueprint", claude_model)
+                    }, "P2_FilterBlueprint", claude_model, skip_validation=is_single_step)
                     
             if Path(capabilities_file).exists():
                 print(f"⏭️  [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.")
@@ -648,14 +713,14 @@ async def main():
                         "requirement": requirement,
                         "raw_files_glob": raw_glob,
                         "output_file": capabilities_file
-                    }, "P2_ExtractCaps", claude_model)
+                    }, "P2_ExtractCaps", claude_model, skip_validation=is_single_step)
                 else:
                     print("   > Using [AgentRunner Core] for P2_ExtractCaps")
                     task_b = run_agent_task(runner_claude, "extract_capabilities", {
                         "requirement": requirement,
                         "raw_files_glob": raw_glob,
                         "output_file": capabilities_file
-                    }, "P2_ExtractCaps", claude_model)
+                    }, "P2_ExtractCaps", claude_model, skip_validation=is_single_step)
             
             to_await = []
             names_await = []
@@ -679,12 +744,14 @@ async def main():
             print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
             strategy_file = str(output_dir / "strategy.json")
             
+            if args.restart_mode == "single":
+                force_strategy_rerun = False
+            
             if Path(strategy_file).exists() and not force_strategy_rerun:
                 print(f"⏭️  [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
             else:
                 if Path(strategy_file).exists():
                     print(f"⚠️  [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
-                
                 if args.use_claude_sdk:
                     print("   > Using [Anthropic SDK Core]")
                     phase3_cost, phase3_errs, phase3_trace_id = await run_anthropic_sdk_task("assemble_strategy", {
@@ -692,7 +759,7 @@ async def main():
                         "blueprint_file": blueprint_file,
                         "capabilities_file": capabilities_file,
                         "output_file": strategy_file
-                    }, "P3_Assembler", claude_model)
+                    }, "P3_Assembler", claude_model, skip_validation=is_single_step)
                 else:
                     print("   > Using [AgentRunner Core]")
                     phase3_cost, phase3_errs, phase3_trace_id = await run_agent_task(runner_claude, "assemble_strategy", {
@@ -700,7 +767,7 @@ async def main():
                         "blueprint_file": blueprint_file,
                         "capabilities_file": capabilities_file,
                         "output_file": strategy_file
-                    }, "P3_Assembler", claude_model)
+                    }, "P3_Assembler", claude_model, skip_validation=is_single_step)
                 total_cost += phase3_cost
                 costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
                 global_errors.extend(phase3_errs)

+ 262 - 0
examples/process_pipeline/script/extract_sources.py

@@ -0,0 +1,262 @@
+"""
+从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
+解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
+
+主函数:extract_sources_to_json(raw_cases_dir)
+  - 扫描该目录下所有 case_{platform}.json
+  - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
+  - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
+  - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
+"""
+
+import json
+import re
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple
+
+
+# ── URL → (platform, content_id) 解析 ────────────────────────────────
+
+_URL_PATTERNS = [
+    # B站: https://www.bilibili.com/video/BV1xxx
+    ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
+    # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
+    ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
+    # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
+    ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
+    # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
+    ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
+    # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
+    ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
+    ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
+    # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
+    ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
+]
+
+
+def parse_url(url: str) -> Optional[Tuple[str, str]]:
+    """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
+    if not url or not isinstance(url, str):
+        return None
+    for platform, pat in _URL_PATTERNS:
+        m = pat.search(url)
+        if m:
+            return platform, m.group(1)
+    return None
+
+
+# ── 从 case 文件中抽取所有链接 ────────────────────────────────
+
+def extract_urls_from_case(case_data: Any) -> List[str]:
+    """兼容新旧两种格式,返回 case 文件里出现的所有 URL。"""
+    urls: List[str] = []
+
+    if not isinstance(case_data, dict):
+        return urls
+
+    # 新格式:工序发现[].帖子链接
+    for item in case_data.get("工序发现", []) or []:
+        if isinstance(item, dict):
+            link = item.get("帖子链接") or item.get("source_url")
+            if link:
+                urls.append(link)
+
+    # 旧格式:cases[].source_url
+    for item in case_data.get("cases", []) or []:
+        if isinstance(item, dict):
+            link = item.get("source_url") or item.get("帖子链接")
+            if link:
+                urls.append(link)
+
+    return urls
+
+
+# ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
+
+def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
+    """
+    构建 (platform, channel_content_id) -> post 映射。
+
+    Args:
+        cache_dir: cache 目录路径
+        trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
+                   否则扫描所有 cache 文件
+
+    Returns:
+        (platform, content_id) -> post 的映射字典
+    """
+    index: Dict[Tuple[str, str], Dict[str, Any]] = {}
+    if not cache_dir.exists():
+        return index
+
+    # 如果提供了 trace_ids,只加载这些特定文件
+    if trace_ids:
+        cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
+        cache_files = [f for f in cache_files if f.exists()]
+    else:
+        # 否则扫描所有 cache 文件
+        cache_files = list(cache_dir.glob("*.json"))
+
+    for cache_file in cache_files:
+        try:
+            with open(cache_file, "r", encoding="utf-8") as f:
+                data = json.load(f)
+        except Exception:
+            continue
+
+        for key, entry in data.items():
+            if not key.startswith("search:"):
+                continue
+            platform = key.split(":", 1)[1]
+
+            # 新格式:entry = {"history": [...], "latest_index": n}
+            # 旧格式:entry = {"keyword": ..., "posts": [...]}
+            if isinstance(entry, dict) and "history" in entry:
+                post_lists = [h.get("posts", []) for h in entry.get("history", [])]
+            elif isinstance(entry, dict) and "posts" in entry:
+                post_lists = [entry.get("posts", [])]
+            else:
+                continue
+
+            for posts in post_lists:
+                for post in posts or []:
+                    if not isinstance(post, dict):
+                        continue
+                    cid = post.get("channel_content_id")
+                    if cid:
+                        # 直接用 (platform, content_id) 作为索引键
+                        index[(platform, str(cid))] = post
+
+    return index
+
+
+# ── 主入口 ────────────────────────────────
+
+def extract_sources_to_json(
+    raw_cases_dir: Path,
+    cache_dir: Optional[Path] = None,
+    output_name: str = "source.json",
+    trace_ids: Optional[List[str]] = None,
+) -> Dict[str, Any]:
+    """
+    扫描 raw_cases_dir 下的 case_*.json,
+    从 cache 中找出原始帖子,输出到 {raw_cases_dir}/{output_name}。
+
+    返回统计信息 dict。
+    """
+    raw_cases_dir = Path(raw_cases_dir)
+    if cache_dir is None:
+        # 项目根目录:script 文件往上三级
+        project_root = Path(__file__).resolve().parent.parent.parent.parent
+        cache_dir = project_root / ".cache" / "content_search"
+    cache_dir = Path(cache_dir)
+
+    # 1. 构建 cache 索引
+    cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
+
+    # 2. 加载已有的 source.json(如果存在)
+    output_file = raw_cases_dir / output_name
+    existing_sources = []
+    existing_ids = set()  # (platform, channel_content_id) 集合用于去重
+
+    if output_file.exists():
+        try:
+            with open(output_file, "r", encoding="utf-8") as f:
+                existing_data = json.load(f)
+                existing_sources = existing_data.get("sources", [])
+                # 构建已有的 ID 集合
+                for src in existing_sources:
+                    key = (src.get("platform"), src.get("channel_content_id"))
+                    existing_ids.add(key)
+        except Exception as e:
+            print(f"Warning: Failed to load existing source.json: {e}")
+
+    # 3. 扫描所有 case 文件
+    matched: List[Dict[str, Any]] = []
+    unmatched: List[Dict[str, Any]] = []
+    seen_keys: set = set(existing_ids)  # 从已有的 ID 开始
+
+    for case_file in sorted(raw_cases_dir.glob("case_*.json")):
+        # 跳过自己(如果 source.json 误被命名成 case_*)
+        if case_file.name == output_name:
+            continue
+        try:
+            with open(case_file, "r", encoding="utf-8") as f:
+                case_data = json.load(f)
+        except Exception as e:
+            unmatched.append({"case_file": case_file.name, "error": str(e)})
+            continue
+
+        urls = extract_urls_from_case(case_data)
+        for url in urls:
+            # 解析 URL 得到 platform 和 content_id
+            parsed = parse_url(url)
+            if not parsed:
+                unmatched.append({
+                    "case_file": case_file.name,
+                    "url": url,
+                    "reason": "url_parse_failed",
+                })
+                continue
+
+            platform, cid = parsed
+            key = (platform, cid)
+            if key in seen_keys:
+                continue
+            seen_keys.add(key)
+
+            # 直接用 (platform, content_id) 在 cache 索引中查找
+            post = cache_index.get(key)
+            if post:
+                matched.append({
+                    "case_file": case_file.name,
+                    "platform": platform,
+                    "channel_content_id": cid,
+                    "source_url": url,
+                    "post": post,
+                })
+            else:
+                unmatched.append({
+                    "case_file": case_file.name,
+                    "platform": platform,
+                    "channel_content_id": cid,
+                    "source_url": url,
+                    "reason": "not_in_cache",
+                })
+
+    # 4. 合并已有数据和新匹配的数据
+    all_sources = existing_sources + matched
+
+    # 5. 写输出
+    output = {
+        "total": len(all_sources),
+        "cache_dir": str(cache_dir),
+        "sources": all_sources,
+    }
+
+    output_file.parent.mkdir(parents=True, exist_ok=True)
+    with open(output_file, "w", encoding="utf-8") as f:
+        json.dump(output, f, ensure_ascii=False, indent=2)
+
+    # 返回统计信息(包含 unmatched 用于日志输出)
+    return {
+        "total_matched": len(matched),
+        "total_existing": len(existing_sources),
+        "total_unmatched": len(unmatched),
+        "output_file": str(output_file),
+    }
+
+
+if __name__ == "__main__":
+    # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
+    import sys
+    if len(sys.argv) < 2:
+        print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
+        sys.exit(1)
+
+    raw_cases_dir = Path(sys.argv[1])
+    cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
+
+    result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
+    print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
+    print(f"     Output: {raw_cases_dir / 'source.json'}")

+ 195 - 0
examples/process_pipeline/script/fix_json_quotes.py

@@ -0,0 +1,195 @@
+"""
+修复 case JSON 文件中的引号错误
+
+常见问题:
+1. 中文引号("")误用为英文引号
+2. 字符串值中包含未转义的英文双引号(LLM 生成时常见)
+"""
+
+import json
+import re
+from pathlib import Path
+from typing import Any, Dict, Optional, Tuple
+
+
+def fix_chinese_quotes(text: str) -> str:
+    """将中文引号替换为英文引号"""
+    return text.replace('“', '"').replace('”', '"').replace('‘', "'").replace('’', "'")
+
+
+def fix_unescaped_quotes_in_values(text: str) -> str:
+    """
+    修复 JSON 字符串值中未转义的双引号。
+    策略:找到 JSON 键值对中的字符串值,将值内部的未转义引号替换为中文引号。
+    """
+    # 匹配 "key": "value" 中的 value 部分(value 可能包含未转义的引号)
+    # 使用逐字符解析来精确处理
+    result = []
+    i = 0
+    in_string = False
+    is_value = False  # 是否在值的字符串中(而非键)
+    colon_seen = False
+
+    while i < len(text):
+        c = text[i]
+
+        if not in_string:
+            if c == '"':
+                in_string = True
+                # 判断这是键还是值
+                is_value = colon_seen
+                if colon_seen:
+                    colon_seen = False
+                result.append(c)
+            elif c == ':':
+                colon_seen = True
+                result.append(c)
+            elif c in ('{', '[', ',', '\n', ' ', '\t', '\r'):
+                if c in ('{', '[', ','):
+                    colon_seen = False
+                result.append(c)
+            else:
+                result.append(c)
+        else:
+            if c == '\\':
+                # 转义序列,直接保留
+                result.append(c)
+                i += 1
+                if i < len(text):
+                    result.append(text[i])
+            elif c == '"':
+                # 检查是否是字符串结束
+                # 向后看:如果后面跟着 ,/}/]/: 或空白+这些字符,则是结束引号
+                j = i + 1
+                while j < len(text) and text[j] in (' ', '\t', '\r', '\n'):
+                    j += 1
+                next_char = text[j] if j < len(text) else ''
+
+                if next_char in (',', '}', ']', ':') or j >= len(text):
+                    # 这是字符串结束引号
+                    in_string = False
+                    result.append(c)
+                else:
+                    # 这是值内部的未转义引号,替换为中文引号
+                    if is_value:
+                        result.append('“')  # "
+                    else:
+                        result.append(c)
+            else:
+                result.append(c)
+        i += 1
+
+    return ''.join(result)
+
+
+def try_fix_and_parse(content: str) -> Tuple[bool, Any, str]:
+    """
+    尝试多种修复策略解析 JSON
+
+    Returns:
+        (success, data, fix_description)
+    """
+    # 策略 1:直接解析
+    try:
+        return True, json.loads(content), "valid"
+    except json.JSONDecodeError:
+        pass
+
+    # 策略 2:替换中文引号
+    fixed = fix_chinese_quotes(content)
+    try:
+        return True, json.loads(fixed), "fixed_chinese_quotes"
+    except json.JSONDecodeError:
+        pass
+
+    # 策略 3:修复值中未转义的引号
+    fixed2 = fix_unescaped_quotes_in_values(fixed)
+    try:
+        return True, json.loads(fixed2), "fixed_unescaped_quotes"
+    except json.JSONDecodeError:
+        pass
+
+    # 策略 4:尝试 json_repair 库(如果可用)
+    try:
+        import json_repair
+        data = json_repair.repair_json(content, return_objects=True)
+        if data:
+            return True, data, "fixed_by_json_repair"
+    except ImportError:
+        pass
+    except Exception:
+        pass
+
+    return False, None, "unfixable"
+
+
+def fix_json_file(file_path: Path, backup: bool = True) -> Dict[str, Any]:
+    """修复 JSON 文件"""
+    result = {"success": False, "message": "", "fixed": False, "file": str(file_path)}
+
+    try:
+        with open(file_path, 'r', encoding='utf-8') as f:
+            content = f.read()
+    except Exception as e:
+        result["message"] = f"File read error: {e}"
+        return result
+
+    success, data, fix_desc = try_fix_and_parse(content)
+
+    if success:
+        if fix_desc != "valid":
+            if backup:
+                backup_path = file_path.with_suffix('.json.bak')
+                backup_path.write_text(content, encoding='utf-8')
+                result["backup"] = str(backup_path)
+
+            with open(file_path, 'w', encoding='utf-8') as f:
+                json.dump(data, f, ensure_ascii=False, indent=2)
+
+            result["fixed"] = True
+            result["message"] = fix_desc
+        else:
+            result["message"] = "Already valid JSON"
+        result["success"] = True
+    else:
+        result["message"] = "unfixable"
+
+    return result
+
+
+def fix_directory(dir_path: Path, pattern: str = "case_*.json") -> Dict[str, Any]:
+    """修复目录下所有匹配的 JSON 文件"""
+    results = []
+    total = fixed = failed = 0
+
+    for file_path in sorted(dir_path.glob(pattern)):
+        total += 1
+        result = fix_json_file(file_path, backup=True)
+        results.append(result)
+
+        if result["success"]:
+            if result["fixed"]:
+                fixed += 1
+                print(f"[FIXED] {file_path.name}: {result['message']}")
+            else:
+                print(f"[OK] {file_path.name}")
+        else:
+            failed += 1
+            print(f"[FAILED] {file_path.name}: {result['message']}")
+
+    return {"total": total, "fixed": fixed, "failed": failed, "results": results}
+
+
+if __name__ == "__main__":
+    import sys
+    if len(sys.argv) < 2:
+        print("Usage: python fix_json_quotes.py <directory>")
+        sys.exit(1)
+
+    dir_path = Path(sys.argv[1])
+    print(f"Fixing JSON files in: {dir_path}")
+    print("=" * 60)
+    summary = fix_directory(dir_path)
+    print("=" * 60)
+    print(f"Total: {summary['total']}, Fixed: {summary['fixed']}, Failed: {summary['failed']}")
+

+ 215 - 0
examples/process_pipeline/script/process_sources.py

@@ -0,0 +1,215 @@
+"""
+Phase 1.6: 使用 Claude 对 source.json 中的原始帖子进行结构化处理
+
+从 source.json 读取原始帖子数据,调用 Claude API 提取工序步骤,
+输出到 case_detailed.json 或更新原有的 case_*.json
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+# 工序提取的 prompt 模板
+WORKFLOW_EXTRACTION_PROMPT = """将以下帖子内容总结为AI图片生成的工序,以JSON格式输出。
+
+## 工序提取规则
+- 步骤粒度是"做了什么",而非"怎么做"
+- 若涉及输入内容,描述其目的而非具体内容(如"输入将照片转化为X风格的提示词",而非"输入提示词包含A/B/C参数")
+- 若涉及上传素材,说明素材类型和用途
+- 同一工具内的参数配置不拆分为多步
+- 若本质上只有一步,就输出一步
+
+## 输出格式
+{{
+  "工序步骤": [
+    {{
+      "步骤序号": 1,
+      "步骤描述": "string"
+    }}
+  ]
+}}
+
+## 帖子内容
+标题:{title}
+
+正文:
+{body_text}
+
+请严格按照上述格式输出JSON,不要包含其他内容。"""
+
+
+async def extract_workflow_from_post(
+    post: Dict[str, Any],
+    llm_call: Any,
+    model: str = "anthropic/claude-sonnet-4-5"
+) -> tuple[Optional[Dict[str, Any]], float]:
+    """
+    使用 LLM 从单个帖子中提取工序步骤
+
+    Args:
+        post: 原始帖子数据(包含 title、body_text 等)
+        llm_call: LLM 调用函数
+        model: 模型名称
+
+    Returns:
+        (提取的工序数据, 成本)
+    """
+    title = post.get("title", "")
+    body_text = post.get("body_text", "")
+
+    if not body_text:
+        return None, 0.0
+
+    # 构造 prompt
+    prompt = WORKFLOW_EXTRACTION_PROMPT.format(
+        title=title,
+        body_text=body_text[:2000]  # 限制长度避免超出 token 限制
+    )
+
+    messages = [{"role": "user", "content": prompt}]
+
+    try:
+        # 调用 LLM
+        response = await llm_call(
+            messages=messages,
+            model=model,
+            temperature=0.1,
+            max_tokens=2000
+        )
+
+        # 计算成本
+        usage = response.get("usage", {})
+        input_tokens = usage.get("input_tokens", 0) or usage.get("prompt_tokens", 0)
+        output_tokens = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0)
+
+        # Claude Sonnet 4.5 定价(通过 OpenRouter)
+        cost = (input_tokens / 1e6 * 3.0) + (output_tokens / 1e6 * 15.0)
+
+        # 解析响应
+        content = response.get("content", "")
+        if isinstance(content, list):
+            content = content[0].get("text", "") if content else ""
+
+        # 尝试提取 JSON
+        import re
+        json_match = re.search(r'\{[\s\S]*\}', content)
+        if json_match:
+            workflow_data = json.loads(json_match.group())
+            return workflow_data, cost
+        else:
+            print(f"Warning: Failed to extract JSON from response for post: {title}")
+            return None, cost
+
+    except Exception as e:
+        print(f"Error processing post '{title}': {e}")
+        return None, 0.0
+
+
+async def process_sources(
+    source_file: Path,
+    output_file: Path,
+    llm_call: Any,
+    model: str = "anthropic/claude-sonnet-4-5",
+    max_concurrent: int = 3
+) -> Dict[str, Any]:
+    """
+    处理 source.json 中的所有帖子,提取工序步骤
+
+    Args:
+        source_file: source.json 文件路径
+        output_file: 输出文件路径
+        llm_call: LLM 调用函数
+        model: 模型名称
+        max_concurrent: 最大并发数
+
+    Returns:
+        统计信息
+    """
+    # 读取 source.json
+    with open(source_file, "r", encoding="utf-8") as f:
+        source_data = json.load(f)
+
+    sources = source_data.get("sources", [])
+    print(f"Processing {len(sources)} posts...")
+
+    # 创建信号量限制并发
+    semaphore = asyncio.Semaphore(max_concurrent)
+
+    async def process_with_semaphore(source_item):
+        async with semaphore:
+            post = source_item.get("post", {})
+            workflow, cost = await extract_workflow_from_post(post, llm_call, model)
+            return {
+                "case_file": source_item.get("case_file"),
+                "platform": source_item.get("platform"),
+                "channel_content_id": source_item.get("channel_content_id"),
+                "source_url": source_item.get("source_url"),
+                "title": post.get("title", ""),
+                "workflow": workflow,
+            }, cost
+
+    # 并发处理所有帖子
+    tasks = [process_with_semaphore(src) for src in sources]
+    results_with_costs = await asyncio.gather(*tasks)
+
+    # 分离结果和成本
+    results = [r[0] for r in results_with_costs]
+    costs = [r[1] for r in results_with_costs]
+    total_cost = sum(costs)
+
+    # 统计
+    success_count = sum(1 for r in results if r.get("workflow"))
+    failed_count = len(results) - success_count
+
+    # 输出结果
+    output_data = {
+        "total": len(results),
+        "success": success_count,
+        "failed": failed_count,
+        "cases": results
+    }
+
+    output_file.parent.mkdir(parents=True, exist_ok=True)
+    with open(output_file, "w", encoding="utf-8") as f:
+        json.dump(output_data, f, ensure_ascii=False, indent=2)
+
+    return {
+        "total": len(results),
+        "success": success_count,
+        "failed": failed_count,
+        "total_cost": total_cost,
+        "output_file": str(output_file)
+    }
+
+
+async def main():
+    """命令行入口"""
+    import sys
+    if len(sys.argv) < 2:
+        print("Usage: python process_sources.py <raw_cases_dir>")
+        sys.exit(1)
+
+    raw_cases_dir = Path(sys.argv[1])
+    source_file = raw_cases_dir / "source.json"
+    output_file = raw_cases_dir / "case_detailed.json"
+
+    if not source_file.exists():
+        print(f"Error: {source_file} not found")
+        sys.exit(1)
+
+    # 导入 LLM 调用函数
+    sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent.parent))
+    from agent.llm.openrouter import create_openrouter_llm_call
+
+    llm_call = create_openrouter_llm_call(model="anthropic/claude-sonnet-4-5")
+
+    print(f"Processing sources from: {source_file}")
+    stats = await process_sources(source_file, output_file, llm_call)
+
+    print(f"\n[OK] Total: {stats['total']}, Success: {stats['success']}, Failed: {stats['failed']}")
+    print(f"     Output: {stats['output_file']}")
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 31 - 11
examples/process_pipeline/server.py

@@ -96,11 +96,13 @@ def get_requirement_data(index: int):
     def safe_load_json(p: Path):
         if not p.exists():
             return None
+        content = ""
         try:
             with open(p, "r", encoding="utf-8") as f:
-                return json.load(f)
-        except Exception:
-            return {"error": "Failed to parse JSON"}
+                content = f.read()
+                return json.loads(content)
+        except Exception as e:
+            return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
 
     data = {
         "strategy": safe_load_json(dir_path / "strategy.json"),
@@ -111,7 +113,7 @@ def get_requirement_data(index: int):
     
     raw_cases_dir = dir_path / "raw_cases"
     if raw_cases_dir.exists():
-        for f in raw_cases_dir.glob("case_*.json"):
+        for f in raw_cases_dir.glob("*.json"):
             data["raw_cases"][f.stem] = safe_load_json(f)
             
     return data
@@ -170,16 +172,16 @@ async def run_pipeline_task(index: int, run_req: RunRequest):
     
     mode = run_req.restart_mode
     
-    if mode in ['phase1_platforms', 'phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3']:
+    if mode in ['phase1_platforms', 'phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3', 'single_strategy']:
         if (dir_path / "strategy.json").exists(): (dir_path / "strategy.json").unlink()
         
-    if mode in ['phase1_platforms', 'phase2_all', 'phase2_blueprint']:
+    if mode in ['phase1_platforms', 'phase2_all', 'phase2_blueprint', 'single_blueprint']:
         if (dir_path / "blueprint.json").exists(): (dir_path / "blueprint.json").unlink()
         
-    if mode in ['phase1_platforms', 'phase2_all', 'phase2_capabilities']:
+    if mode in ['phase1_platforms', 'phase2_all', 'phase2_capabilities', 'single_capabilities']:
         if (dir_path / "capabilities_extracted.json").exists(): (dir_path / "capabilities_extracted.json").unlink()
         
-    if mode == 'phase1_platforms':
+    if mode in ['phase1_platforms', 'single_platforms']:
         raw_cases_dir = dir_path / "raw_cases"
         if raw_cases_dir.exists() and run_req.platforms:
             plats = [p.strip() for p in run_req.platforms.split(",") if p.strip()]
@@ -188,7 +190,7 @@ async def run_pipeline_task(index: int, run_req: RunRequest):
                 if f.exists():
                     f.unlink()
         run_req.skip_research = False
-    elif mode in ['phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3']:
+    elif mode in ['phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3', 'single_blueprint', 'single_capabilities', 'single_strategy']:
         run_req.skip_research = True
     
     # build command
@@ -198,10 +200,12 @@ async def run_pipeline_task(index: int, run_req: RunRequest):
         cmd.append("--skip-research")
     if run_req.research_only:
         cmd.append("--research-only")
-    if run_req.platforms:
+    if run_req.platforms and not run_req.skip_research:
         cmd.extend(["--platforms", run_req.platforms])
     if run_req.use_claude_sdk:
         cmd.append("--use-claude-sdk")
+    if run_req.restart_mode:
+        cmd.extend(["--restart-mode", run_req.restart_mode])
         
     run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
     
@@ -243,11 +247,27 @@ async def run_pipeline_task(index: int, run_req: RunRequest):
 async def trigger_pipeline(index: int, req: RunRequest):
     if index in active_runs and active_runs[index].status == "running":
         raise HTTPException(status_code=400, detail="Pipeline already running for this index")
-        
+    
     active_runs[index] = ActiveRun()
     asyncio.create_task(run_pipeline_task(index, req))
     return {"message": "Pipeline started", "index": index}
 
+@app.post("/api/pipeline/stop/{index}")
+async def stop_pipeline(index: int):
+    if index not in active_runs or active_runs[index].status != "running":
+        raise HTTPException(status_code=400, detail="No running pipeline found for this index")
+    
+    process = active_runs[index].process
+    if process:
+        try:
+            process.terminate()
+            active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
+            active_runs[index].status = "failed"
+            return {"status": "stopped"}
+        except Exception as e:
+            raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
+    raise HTTPException(status_code=500, detail="Process handle missing")
+
 @app.get("/api/pipeline/status")
 def get_all_status():
     res = {}

+ 203 - 5
examples/process_pipeline/ui/app.js

@@ -2,6 +2,7 @@ let requirements = [];
 let currentSelectedIndex = null;
 let activeRuns = {};
 let statusInterval = null;
+let currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x'];
 
 let currentPromptName = null;
 const modalPrompts = document.getElementById('prompts-modal');
@@ -41,6 +42,7 @@ if (selectForcePhase && groupPlatforms) {
 const jsonStrategy = document.getElementById('json-strategy');
 const jsonBlueprint = document.getElementById('json-blueprint');
 const jsonCaps = document.getElementById('json-caps');
+const jsonSource = document.getElementById('json-source');
 const jsonRaw = document.getElementById('json-raw');
 
 // Modals
@@ -100,6 +102,18 @@ function renderJSON(obj) {
     return String(obj);
 }
 
+function renderDataOrRaw(dataObj, renderFunc) {
+    if (!dataObj) return '<p style="color:var(--text-muted)">No data available</p>';
+    if (dataObj.error) {
+        const safeRaw = dataObj.raw_content ? dataObj.raw_content.replace(/</g, "&lt;").replace(/>/g, "&gt;") : "Empty file.";
+        return `<div style="padding:1rem; background:rgba(239, 68, 68, 0.1); border:1px solid var(--danger); border-radius:8px;">
+            <h3 style="color:var(--danger); margin-bottom:0.5rem">⚠️ JSON Parsing Failed</h3>
+            <pre style="white-space: pre-wrap; font-family: monospace; font-size: 0.85rem; color: var(--text-muted); overflow-x: auto">${safeRaw}</pre>
+        </div>`;
+    }
+    return renderFunc(dataObj);
+}
+
 function renderRawCases(rawCasesObj) {
     if (!rawCasesObj || Object.keys(rawCasesObj).length === 0) return 'No raw cases data';
     const platforms = Object.keys(rawCasesObj);
@@ -111,9 +125,22 @@ function renderRawCases(rawCasesObj) {
     html += `</div><div class="sub-tab-contents">`;
     platforms.forEach((p, i) => {
         html += `<div id="sub-tab-${p}" class="sub-tab-pane ${i === 0 ? '' : 'hidden'}">`;
+        
+        if (rawCasesObj[p].error) {
+            const safeRaw = rawCasesObj[p].raw_content ? rawCasesObj[p].raw_content.replace(/</g, "&lt;").replace(/>/g, "&gt;") : "Empty file.";
+            html += `<div style="padding:1rem; background:rgba(239, 68, 68, 0.1); border:1px solid var(--danger); border-radius:8px;">
+                <h3 style="color:var(--danger); margin-bottom:0.5rem">⚠️ JSON Parsing Failed</h3>
+                <pre style="white-space: pre-wrap; font-family: monospace; font-size: 0.85rem; color: var(--text-muted); overflow-x: auto">${safeRaw}</pre>
+            </div></div>`;
+            return;
+        }
+        
         const cases = rawCasesObj[p].cases || [];
         if (cases.length === 0) {
-            html += `<p style="color:var(--text-muted)">No cases found for this platform.</p>`;
+            html += `<div style="padding:1rem; background:rgba(255, 255, 255, 0.05); border:1px solid rgba(255,255,255,0.1); border-radius:8px;">
+                <h3 style="color:var(--text-main); margin-bottom:0.5rem">⚠️ Non-standard format (No cases array found)</h3>
+                <div style="font-family: monospace; font-size: 0.85rem; overflow-x: auto">${renderJSON(rawCasesObj[p])}</div>
+            </div>`;
         } else {
             const platCode = p.replace('case_', '');
             cases.forEach(c => {
@@ -169,6 +196,110 @@ function renderRawCases(rawCasesObj) {
     return html;
 }
 
+function renderSourceCases(sourceObj) {
+    if (!sourceObj) return '<p style="color:var(--text-muted)">No source data available</p>';
+    if (sourceObj.error) {
+        const safeRaw = sourceObj.raw_content ? sourceObj.raw_content.replace(/</g, "&lt;").replace(/>/g, "&gt;") : "Empty file.";
+        return `<div style="padding:1rem; background:rgba(239, 68, 68, 0.1); border:1px solid var(--danger); border-radius:8px;">
+            <h3 style="color:var(--danger); margin-bottom:0.5rem">⚠️ JSON Parsing Failed</h3>
+            <pre style="white-space: pre-wrap; font-family: monospace; font-size: 0.85rem; color: var(--text-muted); overflow-x: auto">${safeRaw}</pre>
+        </div>`;
+    }
+
+    if (!sourceObj.sources) {
+        return `<div style="padding:1rem; background:rgba(255, 255, 255, 0.05); border:1px solid rgba(255,255,255,0.1); border-radius:8px;">
+            <h3 style="color:var(--text-main); margin-bottom:0.5rem">⚠️ Non-standard source format</h3>
+            <div style="font-family: monospace; font-size: 0.85rem; overflow-x: auto">${renderJSON(sourceObj)}</div>
+        </div>`;
+    }
+
+    const sources = sourceObj.sources;
+    let html = `<input type="text" placeholder="🔍 Search sources by ID, title, content, or platform..." 
+        style="width: 100%; padding: 0.8rem 1rem; margin-bottom: 1.5rem; border-radius: 8px; border: 1px solid rgba(255,255,255,0.15); background: rgba(0,0,0,0.2); color: var(--text-main); font-size: 0.95rem; outline: none; transition: border-color 0.2s;"
+        onfocus="this.style.borderColor='var(--primary)'" onblur="this.style.borderColor='rgba(255,255,255,0.15)'"
+        oninput="filterSources(this.value)" />`;
+        
+    html += `<div id="sub-tab-source">`; // Container for searching
+
+    if (sources.length === 0) {
+        html += `<p style="color:var(--text-muted)">Source file is empty.</p>`;
+    } else {
+        sources.forEach((s, idx) => {
+            const post = s.post || {};
+            
+            let mediaHtml = '';
+            
+            // Handle images (XHS uses images string array, X uses image_url_list object array)
+            const images = post.images || [];
+            const xImages = post.image_url_list || [];
+            const allImages = [...images, ...xImages.map(img => img.image_url)].filter(Boolean);
+            
+            if (allImages.length > 0) {
+                mediaHtml += `<div style="margin-top: 1rem; display: flex; flex-wrap: wrap; gap: 8px;">`;
+                allImages.forEach(imgUrl => {
+                    mediaHtml += `<a href="${imgUrl}" target="_blank"><img src="${imgUrl}" style="height: 120px; border-radius: 6px; object-fit: cover; border: 1px solid rgba(255,255,255,0.1);" /></a>`;
+                });
+                mediaHtml += `</div>`;
+            }
+            
+            // Handle videos
+            const videos = post.videos || [];
+            const xVideos = post.video_url_list || [];
+            const allVideos = [...videos, ...xVideos.map(vid => vid.video_url)].filter(Boolean);
+            
+            if (allVideos.length > 0) {
+                mediaHtml += `<div style="margin-top: 1rem; display: flex; flex-wrap: wrap; gap: 8px;">`;
+                allVideos.forEach(vidUrl => {
+                    mediaHtml += `<video controls src="${vidUrl}" style="height: 200px; border-radius: 6px; border: 1px solid rgba(255,255,255,0.1);"></video>`;
+                });
+                mediaHtml += `</div>`;
+            }
+            
+            html += `<div class="data-card" id="source-card-${idx}">
+                <div class="card-header">
+                    <div class="card-title">📝 [${s.platform || post.channel || 'Unknown'}] ${post.title || 'Untitled'}</div>
+                    ${s.source_url ? `<a href="${s.source_url}" target="_blank" class="badge-emoji primary" style="text-decoration:none">🔗 View Source</a>` : ''}
+                </div>
+                <div class="card-body">
+                    <div class="tags-container" style="margin-bottom:0.8rem">
+                        <span class="badge-emoji">📱 Platform: ${s.platform || post.channel || 'N/A'}</span>
+                        <span class="badge-emoji">❤️ Likes: ${post.like_count || 0}</span>
+                        ${s.channel_content_id ? `<span class="badge-emoji">🆔 ID: ${s.channel_content_id}</span>` : ''}
+                    </div>
+                    <div style="background:rgba(255,255,255,0.03); padding:1rem; border-radius:6px; font-size:0.9rem; line-height:1.5; color:var(--text-main); white-space: pre-wrap;">${post.body_text || 'No content available.'}</div>
+                    ${mediaHtml}
+                </div>
+            </div>`;
+        });
+    }
+    html += `</div>`;
+    return html;
+}
+
+window.selectSubTab = function(p) {
+    document.querySelectorAll('#json-raw .sub-tab-btn').forEach(btn => {
+        btn.classList.remove('active');
+        if(btn.textContent === p.replace('case_', '').toUpperCase()) btn.classList.add('active');
+    });
+    document.querySelectorAll('#json-raw .sub-tab-pane').forEach(pane => {
+        pane.classList.add('hidden');
+    });
+    const target = document.getElementById(`sub-tab-${p}`);
+    if (target) target.classList.remove('hidden');
+};
+
+window.filterSources = function(query) {
+    const q = query.toLowerCase();
+    const cards = document.querySelectorAll('#sub-tab-source .data-card');
+    cards.forEach(card => {
+        if (card.textContent.toLowerCase().includes(q)) {
+            card.style.display = 'block';
+        } else {
+            card.style.display = 'none';
+        }
+    });
+};
+
 function renderCapabilities(capsObj) {
     if (!capsObj || !capsObj.extracted_capabilities) return 'No capabilities data';
     const caps = capsObj.extracted_capabilities;
@@ -447,10 +578,35 @@ async function fetchRequirementData(index) {
         const res = await fetch(`/api/requirements/${index}/data`);
         const data = await res.json();
         
-        jsonStrategy.innerHTML = data.strategy ? renderStrategy(data.strategy) : '<p style="color:var(--text-muted)">No strategy data</p>';
-        jsonBlueprint.innerHTML = data.blueprint ? renderBlueprint(data.blueprint) : '<p style="color:var(--text-muted)">No blueprint data</p>';
-        jsonCaps.innerHTML = data.capabilities ? renderCapabilities(data.capabilities) : '<p style="color:var(--text-muted)">No capabilities data</p>';
-        jsonRaw.innerHTML = data.raw_cases ? renderRawCases(data.raw_cases) : '<p style="color:var(--text-muted)">No raw cases data</p>';
+        jsonStrategy.innerHTML = renderDataOrRaw(data.strategy, renderStrategy);
+        jsonBlueprint.innerHTML = renderDataOrRaw(data.blueprint, renderBlueprint);
+        jsonCaps.innerHTML = renderDataOrRaw(data.capabilities, renderCapabilities);
+        
+        let rawCasesClone = null;
+        if (data.raw_cases) {
+            rawCasesClone = { ...data.raw_cases };
+            if (rawCasesClone['source']) {
+                jsonSource.innerHTML = renderSourceCases(rawCasesClone['source']);
+                delete rawCasesClone['source'];
+            } else {
+                jsonSource.innerHTML = '<p style="color:var(--text-muted)">No source data available</p>';
+            }
+        } else {
+            jsonSource.innerHTML = '<p style="color:var(--text-muted)">No source data available</p>';
+        }
+        
+        jsonRaw.innerHTML = renderDataOrRaw(rawCasesClone, renderRawCases);
+        
+        if (rawCasesClone && Object.keys(rawCasesClone).length > 0) {
+            currentAvailablePlatforms = Object.keys(rawCasesClone)
+                .filter(p => p.startsWith('case_'))
+                .map(p => p.replace('case_', ''));
+            if (currentAvailablePlatforms.length === 0) {
+                currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x'];
+            }
+        } else {
+            currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x'];
+        }
     } catch (e) {
         console.error("Failed to fetch data", e);
     }
@@ -576,20 +732,24 @@ function selectRequirement(index) {
 }
 
 function updateDetailBannerStatus(status) {
+    const btnStop = document.getElementById('btn-stop-pipeline');
     if (status === 'running') {
         elStatusBanner.classList.remove('hidden');
         elStatusBanner.style.background = 'rgba(59, 130, 246, 0.1)';
         elStatusText.textContent = 'Pipeline is currently running...';
         elStatusBanner.querySelector('.status-indicator').style.display = 'block';
         elStatusBanner.querySelector('.status-indicator').style.background = 'var(--accent-primary)';
+        if (btnStop) btnStop.style.display = 'inline-block';
     } else if (status === 'failed') {
         elStatusBanner.classList.remove('hidden');
         elStatusBanner.style.background = 'rgba(239, 68, 68, 0.1)';
         elStatusText.textContent = 'Pipeline run failed.';
         elStatusBanner.querySelector('.status-indicator').style.display = 'block';
         elStatusBanner.querySelector('.status-indicator').style.background = 'var(--danger)';
+        if (btnStop) btnStop.style.display = 'none';
     } else {
         elStatusBanner.classList.add('hidden');
+        if (btnStop) btnStop.style.display = 'none';
     }
 }
 
@@ -657,6 +817,13 @@ function setupEventListeners() {
     document.getElementById('btn-open-run-modal').addEventListener('click', () => {
         if (currentSelectedIndex !== null) {
             modalRun.classList.remove('hidden');
+            const selectForcePhase = document.getElementById('select-force-phase');
+            if (selectForcePhase) selectForcePhase.dispatchEvent(new Event('change'));
+            
+            const inputPlatforms = document.getElementById('input-platforms');
+            if (inputPlatforms && currentAvailablePlatforms && currentAvailablePlatforms.length > 0) {
+                inputPlatforms.value = currentAvailablePlatforms.join(',');
+            }
         }
     });
 
@@ -668,6 +835,19 @@ function setupEventListeners() {
         modalRun.classList.add('hidden');
     });
 
+    const selectForcePhase = document.getElementById('select-force-phase');
+    const groupPlatforms = document.getElementById('group-platforms');
+    if (selectForcePhase && groupPlatforms) {
+        selectForcePhase.addEventListener('change', (e) => {
+            const val = e.target.value;
+            if (['smart', 'phase1_platforms', 'single_platforms'].includes(val)) {
+                groupPlatforms.style.display = 'block';
+            } else {
+                groupPlatforms.style.display = 'none';
+            }
+        });
+    }
+
     document.getElementById('btn-confirm-run').addEventListener('click', triggerRun);
 
     document.getElementById('btn-view-logs').addEventListener('click', () => {
@@ -680,6 +860,24 @@ function setupEventListeners() {
         }
     });
 
+    const btnStop = document.getElementById('btn-stop-pipeline');
+    if (btnStop) {
+        btnStop.addEventListener('click', async () => {
+            if (currentSelectedIndex === null) return;
+            if (!confirm('Are you sure you want to stop the running pipeline?')) return;
+            try {
+                const res = await fetch(`/api/pipeline/stop/${currentSelectedIndex}`, { method: 'POST' });
+                if (!res.ok) {
+                    const err = await res.json();
+                    alert("Error stopping pipeline: " + err.detail);
+                }
+            } catch (e) {
+                console.error("Failed to stop pipeline", e);
+                alert("Failed to stop pipeline");
+            }
+        });
+    }
+
     document.getElementById('btn-close-logs').addEventListener('click', () => {
         modalLogs.classList.add('hidden');
     });

+ 18 - 5
examples/process_pipeline/ui/index.html

@@ -51,6 +51,7 @@
                     <div class="status-indicator"></div>
                     <span id="status-text">Running...</span>
                     <button class="btn btn-small" id="btn-view-logs">View Logs</button>
+                    <button class="btn btn-small" id="btn-stop-pipeline" style="background-color: var(--danger); border-color: var(--danger); margin-left: 10px; display: none;">🛑 Stop</button>
                 </div>
 
                 <div class="memo-container" id="memo-container">
@@ -69,6 +70,7 @@
                     <button class="tab-btn" data-target="tab-blueprint">Blueprint</button>
                     <button class="tab-btn" data-target="tab-caps">Capabilities</button>
                     <button class="tab-btn" data-target="tab-raw">Raw Cases</button>
+                    <button class="tab-btn" data-target="tab-source">Source</button>
                 </div>
 
                 <div class="tab-content-container">
@@ -84,6 +86,9 @@
                     <div class="tab-content" id="tab-raw">
                         <div class="content-viewer" id="json-raw">Loading...</div>
                     </div>
+                    <div class="tab-content" id="tab-source">
+                        <div class="content-viewer" id="json-source">Loading...</div>
+                    </div>
                 </div>
             </div>
         </main>
@@ -101,11 +106,19 @@
                     <label>Restart From</label>
                     <select id="select-force-phase" class="glass-input" style="background-color: #1e293b; color: white; margin-top: 5px;">
                         <option value="smart">Smart Resume (Continue where left off)</option>
-                        <option value="phase1_platforms">Phase 1: Regenerate Specific Platforms (Deletes chosen cases + Phase 2/3)</option>
-                        <option value="phase2_blueprint">Phase 2: Regenerate Blueprint Only (Deletes Blueprint + Strategy)</option>
-                        <option value="phase2_capabilities">Phase 2: Extract Capabilities Only (Deletes Caps + Strategy)</option>
-                        <option value="phase2_all">Phase 2: Entire Phase 2 (Deletes Blueprint + Caps + Strategy)</option>
-                        <option value="phase3">Phase 3: Strategy (Deletes Strategy only)</option>
+                        <optgroup label="Single Step (No Cascading)">
+                            <option value="single_platforms">Single Step: Re-run Platforms (Deletes chosen platforms ONLY)</option>
+                            <option value="single_blueprint">Single Step: Re-run Blueprint (Deletes Blueprint ONLY)</option>
+                            <option value="single_capabilities">Single Step: Re-run Capabilities (Deletes Capabilities ONLY)</option>
+                            <option value="single_strategy">Single Step: Re-run Strategy (Deletes Strategy ONLY)</option>
+                        </optgroup>
+                        <optgroup label="Cascading Regeneration">
+                            <option value="phase1_platforms">Phase 1: Specific Platforms -> Downstream</option>
+                            <option value="phase2_blueprint">Phase 2: Blueprint -> Strategy</option>
+                            <option value="phase2_capabilities">Phase 2: Capabilities -> Strategy</option>
+                            <option value="phase2_all">Phase 2: All Phase 2 -> Strategy</option>
+                            <option value="phase3">Phase 3: Strategy Only</option>
+                        </optgroup>
                     </select>
                 </div>
                 <div class="form-group" id="group-platforms">