فهرست منبع

docs: mode_workflow 实施计划(9 任务,含完整代码)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
刘文武 4 روز پیش
والد
کامیت
c9a4e46a7d
1فایلهای تغییر یافته به همراه1838 افزوده شده و 0 حذف شده
  1. 1838 0
      docs/superpowers/plans/2026-06-12-mode-workflow.md

+ 1838 - 0
docs/superpowers/plans/2026-06-12-mode-workflow.md

@@ -0,0 +1,1838 @@
+# mode_workflow Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** 在 `examples/mode_workflow/` 建一套"搜索评估 + 工序/工具大模型解构"系统:MySQL 三张新表为唯一事实源,单 server(8772) + 单页 HTML(Dashboard / Dataset / 聚类库)。
+
+**Architecture:** DB-first(区别于旧系统文件为主双写)。搜索/解构引擎函数从 `examples/process_pipeline/script/` 只读 import(沿用 `run_search.py` 先例),mode_workflow 只持有编排脚本、prompts、db 层、server、前端。解构由 server 起子进程跑 pipeline 脚本,结果直接写库,前端轮询任务状态。
+
+**Tech Stack:** Python(http.server + pymysql + asyncio)、MySQL(.env `MYSQL_*`)、OpenRouter LLM(经 `call_llm_with_retry`)、单文件 HTML + 原生 JS + ECharts(CDN)。
+
+**Spec:** `docs/superpowers/specs/2026-06-12-mode-workflow-design.md`
+
+**前置条件(执行前确认):**
+- `.env` 含 `MYSQL_HOST/MYSQL_PORT/MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE`(Task 2 起必需)
+- `.env` 含 `OPEN_ROUTER_API_KEY`(Task 4-6 的冒烟测试需要,会产生 ~$0.01-0.05 真实费用;没有 key 则跳过冒烟,只做代码审查)
+- `pip install -e .` 已装(`agent.llm.openrouter` 等可 import)
+
+**Spec 修正(已核实):** 评估 rubric 固化在 `llm_evaluate_sources` 的 `eval_prompt_template.md`(见 `search_and_evaluate.py:424` 注释),**不需要**复制 `eval_prompt.md` 到 prompts/。prompts/ 只放工序、工具两个解构 prompt。
+
+---
+
+## 既有代码事实(执行者必读,均已核实)
+
+| 事实 | 位置 |
+|---|---|
+| MySQL 连接模式:pymysql + DictCursor + autocommit,`_enabled()` 检查 `MYSQL_HOST` | `examples/process_pipeline/script/search_eval/fixed_query_eval/db.py:32-47` |
+| `search_all(platforms, queries, max_count, max_concurrent, query_overrides=None)` 返回 source 列表(case_id/platform/source_url/post/found_by_queries) | `search_and_evaluate.py:212` |
+| `evaluate_posts(sources, requirement, llm_call, model, max_concurrent, include_images, max_images, image_mode, query)` 返回 `(sources, total_cost)`,评估挂在 `source["llm_evaluation"]` | `search_and_evaluate.py:403` |
+| `transcribe_video_posts(sources, concurrency)`、`build_query_overrides(platforms, phrasings, llm, model)`、`_attach_image_refs(sources, max_images, concurrency, mode)` | `search_and_evaluate.py` |
+| `build_eval_llm_call(choice)` 返回 `(llm_call, model_id)`;`EVAL_MODELS`/`DEFAULT_EVAL_MODEL`;`_format_post_for_eval(source)`;`_evaluate_one` | `examples/process_pipeline/script/llm_evaluate_sources.py` |
+| `call_llm_with_retry(llm_call, messages, model, temperature, max_tokens, validate_fn, task_name)` 返回 `(data_dict, cost)`,只认 JSON 对象 | `examples/process_pipeline/script/llm_helper.py` |
+| 任意 OpenRouter 模型:`from agent.llm.openrouter import create_openrouter_llm_call; llm_call = create_openrouter_llm_call(model=...)` | `tool_extract.py:121-122` |
+| 工具解构返回 `{"tools":[{工具名称,实质作用域,形式作用域,创作层级,来源链接,输入,输出,用法,案例,缺点,最新更新时间}]}` | `fixed_query_eval/prompts/tool_extract_system.md` |
+| 工序解构返回 `{"source":{...},"procedures":[{id,name,purpose,category,declarations,type_registry,steps:[{id,kind,via,effect,action,substance,form,directive,intent,inputs,outputs}]}]}` | `mode_procedure/mode-dsl/prompts/procedure_extract_system.md` |
+| 工序解构的图片走 base64(绕防盗链),`_detect_image_mime`/`_fetch_data_url`/`_collect_images`/`_sanitize_workflow` 可整段复制 | `mode_procedure/mode-dsl/procedure_model_extract.py:34-126` |
+| judged_matrix:`{actions:[{name,l1,l2}×27], types:[{name,l1,l2}×50], matrix:[27行×50格,每格{tier,r,q}], stats:{valid:643}}`,有效节点 = tier≥1 | `search_eval/evaluation/judged_matrix.json` |
+| 历史搜索数据:`fixed_query_eval/runs_full/q0000-q0003/form_A.json`,顶层 `{form,query,original_q,platforms,total,failed,results:[...]}` | 同目录 |
+| 仓库无 pytest 体系(CLAUDE.md),验证用独立命令/冒烟脚本 | `CLAUDE.md` |
+
+路径深度:`examples/mode_workflow/x.py` 的 `PROJECT_ROOT = Path(__file__).resolve().parents[2]`;`examples/mode_workflow/pipeline/x.py` 是 `parents[3]`。
+
+---
+
+### Task 1: 脚手架与资产同步
+
+**Files:**
+- Create: `examples/mode_workflow/.gitignore`
+- Create: `examples/mode_workflow/prompts/`(拷贝 2 个 prompt)
+- Create: `examples/mode_workflow/reference/judged_matrix.json`(拷贝)
+- Create: `examples/mode_workflow/pipeline/`、`examples/mode_workflow/runs/` 目录
+
+- [ ] **Step 1: 建目录并同步资产**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent
+SE=examples/process_pipeline/script/search_eval
+mkdir -p examples/mode_workflow/{pipeline,prompts,reference,runs}
+cp "$SE/mode_procedure/mode-dsl/prompts/procedure_extract_system.md" examples/mode_workflow/prompts/
+cp "$SE/fixed_query_eval/prompts/tool_extract_system.md" examples/mode_workflow/prompts/
+cp "$SE/evaluation/judged_matrix.json" examples/mode_workflow/reference/
+```
+
+- [ ] **Step 2: 写 .gitignore**
+
+`examples/mode_workflow/.gitignore`:
+
+```gitignore
+runs/
+__pycache__/
+*.pyc
+```
+
+- [ ] **Step 3: 验证**
+
+```bash
+ls examples/mode_workflow/prompts/ examples/mode_workflow/reference/
+python3 -c "import json; d=json.load(open('examples/mode_workflow/reference/judged_matrix.json')); print(d['stats'])"
+```
+Expected: 两个 prompt 文件名 + judged_matrix.json;stats 行含 `'valid': 643`。
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add examples/mode_workflow
+git commit -m "feat(mode_workflow): 脚手架与 prompt/内容树资产同步"
+```
+
+---
+
+### Task 2: db.py — 三表 DDL + 读写层
+
+**Files:**
+- Create: `examples/mode_workflow/db.py`
+
+**设计要点:**
+- 风格沿用 `fixed_query_eval/db.py`(pymysql/DictCursor/autocommit),但本系统 **DB 是主存储**:写失败要 `raise`(由调用方决定怎么处理),不再静默吞掉。读侧保留防御。
+- `mode_process`/`mode_tools` 写入语义:删掉 `(case_id, version)` 同版本旧行再插(同版本重跑幂等,跨版本保留历史)。
+- `cost_usd`/`duration_s` 是**每次解构调用**的值,同一 `(case_id, version)` 的多行重复存同一个值;聚合时必须按 `(case_id, version)` 去重(见 Task 7)。
+
+- [ ] **Step 1: 写 `examples/mode_workflow/db.py`(完整文件)**
+
+```python
+# -*- coding: utf-8 -*-
+"""mode_workflow · MySQL 持久化(DB 为唯一事实源)
+================================================================================
+读 .env 的 MYSQL_* 连接 MySQL。三张表:
+  search_data  —— 每行一个 (query, 帖子):搜索 + llm 评估结果
+  mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列)
+  mode_tools   —— 每行一个解构出的工具
+
+与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise,
+不做"失败不阻断"。读侧保留防御(返回空/None)。
+
+用法:
+  python db.py init    # 建表(幂等)
+  python db.py check   # 打印三表行数
+"""
+import json
+import os
+import sys
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[2]
+sys.path.insert(0, str(PROJECT_ROOT))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+import pymysql
+from pymysql.cursors import DictCursor
+
+
+def _conn():
+    if not os.getenv("MYSQL_HOST"):
+        raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置")
+    return pymysql.connect(
+        host=os.getenv("MYSQL_HOST"),
+        port=int(os.getenv("MYSQL_PORT", 3306)),
+        user=os.getenv("MYSQL_USER"),
+        password=os.getenv("MYSQL_PASSWORD"),
+        database=os.getenv("MYSQL_DATABASE"),
+        charset="utf8mb4", cursorclass=DictCursor,
+        autocommit=True, connect_timeout=10,
+    )
+
+
+# ── DDL ──────────────────────────────────────────────────────────────────────
+
+DDL_SEARCH = """
+CREATE TABLE IF NOT EXISTS search_data (
+  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
+  query_id      VARCHAR(32)   NOT NULL COMMENT 'q0000',
+  query_text    VARCHAR(512)  NULL,
+  case_id       VARCHAR(128)  NOT NULL COMMENT 'platform_channelContentId',
+  platform      VARCHAR(32)   NULL,
+  channel_content_id VARCHAR(128) NULL,
+  title         VARCHAR(512)  NULL,
+  url           VARCHAR(1024) NULL,
+  content_type  VARCHAR(32)   NULL,
+  body          LONGTEXT      NULL,
+  images        JSON          NULL,
+  videos        JSON          NULL,
+  like_count    INT           NULL,
+  publish_time  VARCHAR(64)   NULL,
+  quality_score FLOAT         NULL COMMENT 'post._quality_score',
+  quality_grade VARCHAR(8)    NULL,
+  found_by      JSON          NULL COMMENT '命中的措辞数组',
+  knowledge_type JSON         NULL COMMENT '["能力","工序","工具"] 子集',
+  overall_score FLOAT         NULL COMMENT '(相关均值+质量均值)/2',
+  llm_evaluation JSON         NULL COMMENT '评估全量 blob',
+  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  updated_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+  UNIQUE KEY uk_qid_case (query_id, case_id),
+  KEY idx_platform (platform)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果';
+"""
+
+DDL_PROCESS = """
+CREATE TABLE IF NOT EXISTS mode_process (
+  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
+  query_id      VARCHAR(32)   NOT NULL,
+  case_id       VARCHAR(128)  NOT NULL,
+  platform      VARCHAR(32)   NULL,
+  post_title    VARCHAR(512)  NULL,
+  source        JSON          NULL COMMENT '解构返回的 source 块',
+  procedure_id  VARCHAR(16)   NULL COMMENT 'p1,p2…',
+  name          VARCHAR(255)  NULL,
+  purpose       TEXT          NULL,
+  category      VARCHAR(32)   NULL COMMENT '产物创造/资产建设/自动化/分析/学习',
+  declarations  JSON          NULL,
+  type_registry JSON          NULL,
+  steps         JSON          NULL COMMENT '步骤数组全量',
+  step_count    INT           NULL,
+  tools_used    JSON          NULL COMMENT '从 steps[].via 去重提取',
+  model         VARCHAR(64)   NULL,
+  version       VARCHAR(16)   NULL COMMENT 'v_MMDDHHMM,保留历史',
+  cost_usd      DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
+  duration_s    FLOAT         NULL,
+  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  KEY idx_case_ver (case_id, version),
+  KEY idx_qid (query_id)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
+"""
+
+DDL_TOOLS = """
+CREATE TABLE IF NOT EXISTS mode_tools (
+  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
+  query_id      VARCHAR(32)   NOT NULL,
+  case_id       VARCHAR(128)  NOT NULL,
+  platform      VARCHAR(32)   NULL,
+  post_title    VARCHAR(512)  NULL,
+  tool_name     VARCHAR(255)  NULL,
+  substance_scope JSON        NULL COMMENT '实质作用域(数组)',
+  form_scope    JSON          NULL COMMENT '形式作用域(数组或null)',
+  creation_layer VARCHAR(32)  NULL COMMENT '制作层/创作层',
+  source_link   VARCHAR(1024) NULL,
+  input_desc    TEXT          NULL,
+  output_desc   TEXT          NULL,
+  usage_json    JSON          NULL,
+  cases_json    JSON          NULL,
+  defects_json  JSON          NULL,
+  updated_time  VARCHAR(64)   NULL COMMENT '工具最新更新时间',
+  model         VARCHAR(64)   NULL,
+  version       VARCHAR(16)   NULL,
+  cost_usd      DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
+  duration_s    FLOAT         NULL,
+  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  KEY idx_case_ver (case_id, version),
+  KEY idx_qid (query_id),
+  KEY idx_tool_name (tool_name)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
+"""
+
+
+def init_tables():
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(DDL_SEARCH)
+            cur.execute(DDL_PROCESS)
+            cur.execute(DDL_TOOLS)
+        print("✅ 建表完成:search_data, mode_process, mode_tools")
+    finally:
+        conn.close()
+
+
+# ── 工具函数 ──────────────────────────────────────────────────────────────────
+
+def _loads(v, default=None):
+    """pymysql 的 JSON 列可能返回字符串,统一解析。"""
+    if v is None:
+        return default
+    if isinstance(v, (list, dict)):
+        return v
+    try:
+        return json.loads(v)
+    except Exception:
+        return default
+
+
+def _j(v):
+    """写入 JSON 列:None 保持 NULL,其余 dumps。"""
+    return None if v is None else json.dumps(v, ensure_ascii=False)
+
+
+def _collect_scores(node):
+    """递归收集嵌套评估里所有数值「得分」。"""
+    out = []
+    if isinstance(node, dict):
+        for k, v in node.items():
+            if k == "得分" and isinstance(v, (int, float)):
+                out.append(float(v))
+            else:
+                out.extend(_collect_scores(v))
+    elif isinstance(node, list):
+        for v in node:
+            out.extend(_collect_scores(v))
+    return out
+
+
+def overall_score(e):
+    """综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。"""
+    parts = []
+    for key in ("相关性", "质量"):
+        scores = _collect_scores((e or {}).get(key))
+        if scores:
+            parts.append(sum(scores) / len(scores))
+    return round(sum(parts) / len(parts), 2) if parts else None
+
+
+# ── search_data ──────────────────────────────────────────────────────────────
+
+def upsert_search_posts(query_id, query_text, results):
+    """一组搜索结果写入 search_data(按 (query_id, case_id) upsert)。返回写入条数。"""
+    if not results:
+        return 0
+    rows = []
+    for r in results:
+        post = r.get("post") or {}
+        e = r.get("llm_evaluation") or {}
+        rows.append((
+            query_id, query_text, r.get("case_id"), r.get("platform"),
+            r.get("channel_content_id"),
+            (post.get("title") or post.get("desc") or "")[:500],
+            r.get("source_url"), post.get("content_type"),
+            post.get("body_text") or post.get("desc") or "",
+            _j(post.get("images") or []), _j(post.get("videos") or []),
+            post.get("like_count"),
+            str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
+            post.get("_quality_score"), post.get("_quality_grade"),
+            _j(r.get("found_by_queries") or []),
+            _j(e.get("知识类型") or []),
+            overall_score(e),
+            _j(e),
+        ))
+    sql = """
+    INSERT INTO search_data
+      (query_id, query_text, case_id, platform, channel_content_id, title, url,
+       content_type, body, images, videos, like_count, publish_time,
+       quality_score, quality_grade, found_by, knowledge_type, overall_score, llm_evaluation)
+    VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+    ON DUPLICATE KEY UPDATE
+      query_text=VALUES(query_text), platform=VALUES(platform),
+      channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
+      content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images),
+      videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time),
+      quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade),
+      found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type),
+      overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation);
+    """
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.executemany(sql, rows)
+        return len(rows)
+    finally:
+        conn.close()
+
+
+def fetch_queries():
+    """query 列表 + 帖子数 + 解构进度。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("""SELECT query_id, MAX(query_text) AS query_text,
+                                  COUNT(*) AS post_count
+                           FROM search_data GROUP BY query_id ORDER BY query_id""")
+            queries = cur.fetchall()
+            cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
+            np = {r["query_id"]: r["n"] for r in cur.fetchall()}
+            cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id")
+            nt = {r["query_id"]: r["n"] for r in cur.fetchall()}
+    finally:
+        conn.close()
+    for q in queries:
+        q["process_done"] = np.get(q["query_id"], 0)
+        q["tools_done"] = nt.get(q["query_id"], 0)
+    return queries
+
+
+def fetch_posts(query_id):
+    """某 query 下全部帖子(JSON 列已解析),带 has_process/has_tools 标记。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("""SELECT * FROM search_data WHERE query_id=%s
+                           ORDER BY overall_score DESC, id""", (query_id,))
+            rows = cur.fetchall()
+            cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,))
+            hp = {r["case_id"] for r in cur.fetchall()}
+            cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,))
+            ht = {r["case_id"] for r in cur.fetchall()}
+    finally:
+        conn.close()
+    for r in rows:
+        for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
+            r[col] = _loads(r[col])
+        r["has_process"] = r["case_id"] in hp
+        r["has_tools"] = r["case_id"] in ht
+        r.pop("created_at", None); r.pop("updated_at", None)
+    return rows
+
+
+def fetch_post(query_id, case_id):
+    """单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("SELECT * FROM search_data WHERE query_id=%s AND case_id=%s",
+                        (query_id, case_id))
+            row = cur.fetchone()
+    finally:
+        conn.close()
+    if not row:
+        return None
+    for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
+        row[col] = _loads(row[col])
+    return row
+
+
+# ── mode_process ─────────────────────────────────────────────────────────────
+
+def replace_process(query_id, case_id, platform, post_title, payload,
+                    model, version, cost_usd, duration_s):
+    """写入一帖某版本的工序解构结果(payload = {source, procedures})。
+    删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。"""
+    source = payload.get("source")
+    procedures = payload.get("procedures") or []
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
+                        (case_id, version))
+            if procedures:
+                rows = []
+                for p in procedures:
+                    steps = p.get("steps") or []
+                    vias = []
+                    for s in steps:
+                        v = s.get("via")
+                        if v and v not in vias:
+                            vias.append(v)
+                    rows.append((
+                        query_id, case_id, platform, (post_title or "")[:500],
+                        _j(source), p.get("id"), (p.get("name") or "")[:250],
+                        p.get("purpose"), p.get("category"),
+                        _j(p.get("declarations")), _j(p.get("type_registry")),
+                        _j(steps), len(steps), _j(vias),
+                        model, version, cost_usd, duration_s,
+                    ))
+                cur.executemany("""
+                INSERT INTO mode_process
+                  (query_id, case_id, platform, post_title, source, procedure_id, name,
+                   purpose, category, declarations, type_registry, steps, step_count,
+                   tools_used, model, version, cost_usd, duration_s)
+                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+                """, rows)
+        return len(procedures)
+    finally:
+        conn.close()
+
+
+def fetch_process_versions(case_id):
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
+                           FROM mode_process WHERE case_id=%s
+                           GROUP BY version ORDER BY version DESC""", (case_id,))
+            return cur.fetchall()
+    finally:
+        conn.close()
+
+
+def fetch_process(case_id, version=None):
+    """重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            if version is None:
+                cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
+                               ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+                row = cur.fetchone()
+                if not row:
+                    return None
+                version = row["version"]
+            cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s
+                           ORDER BY id""", (case_id, version))
+            rows = cur.fetchall()
+    finally:
+        conn.close()
+    if not rows:
+        return None
+    procedures = [{
+        "id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"],
+        "category": r["category"], "declarations": _loads(r["declarations"]),
+        "type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []),
+        "tools_used": _loads(r["tools_used"], []),
+    } for r in rows]
+    return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
+            "title": rows[0]["post_title"], "model": rows[0]["model"],
+            "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
+            "duration_s": rows[0]["duration_s"],
+            "source": _loads(rows[0]["source"]), "procedures": procedures}
+
+
+# ── mode_tools ───────────────────────────────────────────────────────────────
+
+def replace_tools(query_id, case_id, platform, post_title, tools,
+                  model, version, cost_usd, duration_s):
+    """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
+                        (case_id, version))
+            if tools:
+                rows = [(
+                    query_id, case_id, platform, (post_title or "")[:500],
+                    (t.get("工具名称") or "")[:250],
+                    _j(t.get("实质作用域")), _j(t.get("形式作用域")),
+                    t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
+                    _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
+                    t.get("最新更新时间"), model, version, cost_usd, duration_s,
+                ) for t in tools]
+                cur.executemany("""
+                INSERT INTO mode_tools
+                  (query_id, case_id, platform, post_title, tool_name, substance_scope,
+                   form_scope, creation_layer, source_link, input_desc, output_desc,
+                   usage_json, cases_json, defects_json, updated_time, model, version,
+                   cost_usd, duration_s)
+                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+                """, rows)
+        return len(tools)
+    finally:
+        conn.close()
+
+
+def fetch_tools_versions(case_id):
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
+                           FROM mode_tools WHERE case_id=%s
+                           GROUP BY version ORDER BY version DESC""", (case_id,))
+            return cur.fetchall()
+    finally:
+        conn.close()
+
+
+def fetch_tools(case_id, version=None):
+    """重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            if version is None:
+                cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
+                               ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+                row = cur.fetchone()
+                if not row:
+                    return None
+                version = row["version"]
+            cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s
+                           ORDER BY id""", (case_id, version))
+            rows = cur.fetchall()
+    finally:
+        conn.close()
+    if not rows:
+        return None
+    tools = [{
+        "工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]),
+        "形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"],
+        "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
+        "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
+        "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
+    } for r in rows]
+    return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
+            "title": rows[0]["post_title"], "model": rows[0]["model"],
+            "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
+            "duration_s": rows[0]["duration_s"],
+            "tool_count": len(tools), "tools": tools}
+
+
+# ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────
+
+def fetch_dashboard_rows():
+    """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("SELECT query_id, case_id, knowledge_type FROM search_data")
+            posts = cur.fetchall()
+            cur.execute("""SELECT case_id, version, steps, tools_used, cost_usd,
+                                  duration_s, created_at FROM mode_process""")
+            procs = cur.fetchall()
+            cur.execute("""SELECT case_id, version, tool_name, substance_scope,
+                                  form_scope, cost_usd, duration_s, created_at
+                           FROM mode_tools""")
+            tools = cur.fetchall()
+    finally:
+        conn.close()
+    for p in posts:
+        p["knowledge_type"] = _loads(p["knowledge_type"], [])
+    for r in procs:
+        r["steps"] = _loads(r["steps"], [])
+        r["tools_used"] = _loads(r["tools_used"], [])
+        r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
+        r["created_at"] = str(r["created_at"]) if r["created_at"] else None
+    for r in tools:
+        r["substance_scope"] = _loads(r["substance_scope"], [])
+        r["form_scope"] = _loads(r["form_scope"], [])
+        r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
+        r["created_at"] = str(r["created_at"]) if r["created_at"] else None
+    return posts, procs, tools
+
+
+def check():
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            for t in ("search_data", "mode_process", "mode_tools"):
+                cur.execute(f"SELECT COUNT(*) AS n FROM {t}")
+                print(f"{t}: {cur.fetchone()['n']} 行")
+    finally:
+        conn.close()
+
+
+if __name__ == "__main__":
+    cmd = sys.argv[1] if len(sys.argv) > 1 else ""
+    if cmd == "init":
+        init_tables()
+    elif cmd == "check":
+        check()
+    else:
+        print("用法:\n  python db.py init    # 建表\n  python db.py check   # 三表行数")
+```
+
+- [ ] **Step 2: 建表并验证**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+python db.py init && python db.py check
+```
+Expected: `✅ 建表完成:search_data, mode_process, mode_tools`,三表各 0 行。失败提示缺 `MYSQL_HOST` 则确认 `.env`。
+
+- [ ] **Step 3: 单元自检(overall_score)**
+
+```bash
+python3 -c "
+import db
+e = {'相关性': {'和内容制作知识相关': {'得分': 8}, '和 query 相关': {'得分': 6}},
+     '质量': {'固定维度': {'热度性': {'得分': 4}}}}
+s = db.overall_score(e)
+assert s == round((7.0 + 4.0) / 2, 2), s
+assert db.overall_score({}) is None
+print('overall_score OK:', s)
+"
+```
+Expected: `overall_score OK: 5.5`
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add examples/mode_workflow/db.py
+git commit -m "feat(mode_workflow): 三表 DDL 与 MySQL 读写层"
+```
+
+---
+
+### Task 3: import_history.py — 历史搜索结果入库
+
+**Files:**
+- Create: `examples/mode_workflow/import_history.py`
+
+- [ ] **Step 1: 写完整文件**
+
+```python
+# -*- coding: utf-8 -*-
+"""一次性导入:fixed_query_eval/runs_full/*/form_A.json → search_data 表。
+幂等(upsert),可反复执行。
+
+用法:
+  python import_history.py
+  python import_history.py --runs-dir /path/to/runs_full
+"""
+import argparse
+import json
+import sys
+from pathlib import Path
+
+HERE = Path(__file__).resolve().parent
+sys.path.insert(0, str(HERE))
+import db
+
+DEFAULT_RUNS = (HERE.parent / "process_pipeline" / "script" / "search_eval"
+                / "fixed_query_eval" / "runs_full")
+
+
+def main():
+    p = argparse.ArgumentParser(description="历史搜索结果导入 search_data")
+    p.add_argument("--runs-dir", default=str(DEFAULT_RUNS))
+    args = p.parse_args()
+
+    runs = Path(args.runs_dir)
+    files = sorted(runs.glob("q*/form_A.json"))
+    if not files:
+        print(f"❌ {runs} 下没有 q*/form_A.json"); return 1
+
+    total = 0
+    for f in files:
+        data = json.loads(f.read_text(encoding="utf-8"))
+        qid = f.parent.name
+        results = data.get("results", [])
+        n = db.upsert_search_posts(qid, data.get("query") or data.get("original_q"), results)
+        print(f"  {qid}: 文件 {len(results)} 条 → 入库 {n} 条")
+        total += n
+    print(f"✅ 共导入 {total} 条 → search_data")
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())
+```
+
+- [ ] **Step 2: 执行并核对行数**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+python import_history.py
+python db.py check
+python3 -c "
+import json, glob
+n = sum(len(json.load(open(f))['results'])
+        for f in glob.glob('../process_pipeline/script/search_eval/fixed_query_eval/runs_full/q*/form_A.json'))
+print('JSON 总条数:', n)
+"
+```
+Expected: 每个 q 打一行;`search_data` 行数 == JSON 总条数(q0000-q0003)。再跑一次 `python import_history.py`,行数不变(幂等)。
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add examples/mode_workflow/import_history.py
+git commit -m "feat(mode_workflow): 历史搜索结果导入脚本"
+```
+
+---
+
+### Task 4: pipeline/tool_extract.py — 工具解构(读库→LLM→写库)
+
+**Files:**
+- Create: `examples/mode_workflow/pipeline/tool_extract.py`
+
+**与旧版差异:** 帖子源从 `search_data` 表读(不再读 form_A.json);结果直接写 `mode_tools`(不再写本地 json 为主),`runs/tools/` 只留调试副本;记录 cost/duration。
+
+- [ ] **Step 1: 写完整文件**
+
+```python
+# -*- coding: utf-8 -*-
+"""工具解构 · search_data 帖子 → 结构化工具条目 → mode_tools 表
+================================================================================
+- 帖子源:search_data 表(--query-id + --case-ids 定位)
+- 模型默认 google/gemini-3.1-flash-lite,可 --model 传任意 OpenRouter id
+- 多模态:复用 search_and_evaluate._attach_image_refs(URL 直传)
+- 写库:db.replace_tools(同版本幂等,跨版本保留);runs/tools/ 留调试副本
+
+用法(一般由 server.py 起子进程调):
+  python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc,gzh_def
+  python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc --model anthropic/claude-sonnet-4-6
+"""
+import argparse
+import asyncio
+import json
+import sys
+import time
+from datetime import datetime
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
+sys.path.insert(0, str(PROJECT_ROOT))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from examples.process_pipeline.script.search_eval.search_and_evaluate import _attach_image_refs
+from examples.process_pipeline.script.llm_evaluate_sources import _format_post_for_eval, build_eval_llm_call
+from examples.process_pipeline.script.llm_helper import call_llm_with_retry
+
+HERE = Path(__file__).resolve().parent          # pipeline/
+MW = HERE.parent                                 # mode_workflow/
+sys.path.insert(0, str(MW))
+import db
+
+TOOL_SYSTEM = (MW / "prompts" / "tool_extract_system.md").read_text(encoding="utf-8")
+DEFAULT_MODEL_CHOICE = "gemini-flash-lite"
+MAX_IMAGES = 6
+
+
+def _row_to_source(row):
+    """search_data 行 → 引擎函数认的 source dict。"""
+    return {
+        "case_id": row["case_id"], "platform": row["platform"],
+        "channel_content_id": row["channel_content_id"],
+        "source_url": row["url"],
+        "post": {
+            "title": row["title"], "body_text": row["body"],
+            "images": row["images"] or [], "like_count": row["like_count"],
+            "publish_timestamp": row["publish_time"], "link": row["url"],
+        },
+    }
+
+
+def _validate_tools(data):
+    if not isinstance(data, dict) or "tools" not in data:
+        return '缺少顶层 "tools" 字段'
+    if not isinstance(data["tools"], list):
+        return '"tools" 必须是数组'
+    return None
+
+
+async def extract_one(source, llm_call, model):
+    """对一条 source 抽工具,返回 (tools_list, cost)。失败返回 ([], cost)。"""
+    user_text = "【内容】\n" + _format_post_for_eval(source)
+    image_urls = source.get("_image_data_urls") or None
+    if image_urls:
+        user_content = [{"type": "text", "text": user_text}]
+        for u in image_urls:
+            user_content.append({"type": "image_url", "image_url": {"url": u}})
+        messages = [{"role": "system", "content": TOOL_SYSTEM},
+                    {"role": "user", "content": user_content}]
+    else:
+        messages = [{"role": "system", "content": TOOL_SYSTEM},
+                    {"role": "user", "content": user_text}]
+    data, cost = await call_llm_with_retry(
+        llm_call=llm_call, messages=messages, model=model,
+        temperature=0.1, max_tokens=4000,
+        validate_fn=_validate_tools,
+        task_name=f"ToolExtract[{source.get('case_id', '?')}]",
+    )
+    if not data:
+        return [], cost
+    return data.get("tools", []), cost
+
+
+async def run(args):
+    qid = args.query_id
+    case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+    sources = []
+    for cid in case_ids:
+        row = db.fetch_post(qid, cid)
+        if row is None:
+            print(f"⚠️ {qid}/{cid} 不在 search_data,跳过")
+            continue
+        sources.append(_row_to_source(row))
+    if not sources:
+        print("❌ 没有可解构的帖子"); return 1
+
+    if args.model and "/" in args.model:
+        from agent.llm.openrouter import create_openrouter_llm_call
+        llm_call, model_id = create_openrouter_llm_call(model=args.model), args.model
+    else:
+        llm_call, model_id = build_eval_llm_call(args.model or DEFAULT_MODEL_CHOICE)
+    version = args.version or ("v_" + datetime.now().strftime("%m%d%H%M"))
+    print(f"🔧 工具解构 {len(sources)} 帖 · 模型 {model_id} · 版本 {version}")
+
+    await _attach_image_refs(sources, MAX_IMAGES, max(2, args.max_concurrent * 2), "url")
+    sem = asyncio.Semaphore(args.max_concurrent)
+    out_dir = MW / "runs" / "tools"
+    out_dir.mkdir(parents=True, exist_ok=True)
+
+    async def _work(s):
+        t0 = time.monotonic()
+        async with sem:
+            tools, cost = await extract_one(s, llm_call, model_id)
+        dur = round(time.monotonic() - t0, 1)
+        n = db.replace_tools(qid, s["case_id"], s.get("platform"),
+                             (s.get("post") or {}).get("title", ""),
+                             tools, model_id, version, cost, dur)
+        (out_dir / f"{s['case_id']}_{version}.json").write_text(json.dumps({
+            "case_id": s["case_id"], "version": version, "model": model_id,
+            "cost_usd": cost, "duration_s": dur, "tools": tools,
+        }, ensure_ascii=False, indent=2), encoding="utf-8")
+        print(f"   ✅ {s['case_id']} → {n} 个工具 · ${cost:.4f} · {dur}s")
+        return cost
+
+    costs = await asyncio.gather(*[_work(s) for s in sources])
+    print(f"\n📊 完成 {len(sources)} 帖 · 总成本 ${sum(costs):.4f}")
+    return 0
+
+
+def main():
+    p = argparse.ArgumentParser(description="工具解构:search_data 帖子 → mode_tools")
+    p.add_argument("--query-id", required=True)
+    p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
+    p.add_argument("--model", default=None, help="默认 gemini-flash-lite,可传 OpenRouter id")
+    p.add_argument("--max-concurrent", type=int, default=3)
+    p.add_argument("--version", default=None, help="默认自动 v_月日时分")
+    args = p.parse_args()
+    raise SystemExit(asyncio.run(run(args)))
+
+
+if __name__ == "__main__":
+    main()
+```
+
+- [ ] **Step 2: 冒烟(需 OPEN_ROUTER_API_KEY;无 key 跳过本步)**
+
+先从库里挑一个真实 case_id:
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+CID=$(python3 -c "import db; print(db.fetch_posts('q0000')[0]['case_id'])")
+python pipeline/tool_extract.py --query-id q0000 --case-ids "$CID"
+python3 -c "import db; r=db.fetch_tools('$CID'); print(r['version'], r['tool_count'], r['cost_usd'], r['duration_s'])"
+```
+Expected: 解构日志含 `✅ <case_id> → N 个工具`;fetch_tools 打出版本号、工具数 ≥0、cost/duration 非 None。
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add examples/mode_workflow/pipeline/tool_extract.py
+git commit -m "feat(mode_workflow): 工具解构 pipeline(读库→LLM→mode_tools)"
+```
+
+---
+
+### Task 5: pipeline/procedure_extract.py — 工序解构
+
+**Files:**
+- Create: `examples/mode_workflow/pipeline/procedure_extract.py`
+
+**与旧版差异:** 帖子源从库读;**不渲染 HTML**(前端直接渲染 DB 数据);写 `mode_process`;支持一次多帖。`_detect_image_mime`/`_fetch_data_url`/`_collect_images`/`_sanitize_workflow` 从 `procedure_model_extract.py:34-126` 原样复制(已验证可用,不重新发明)。
+
+- [ ] **Step 1: 写完整文件**
+
+```python
+# -*- coding: utf-8 -*-
+"""工序解构 · search_data 帖子 → workflow JSON → mode_process 表
+================================================================================
+单次大模型直出(无 agent / 无 validate 循环),prompt 见 prompts/procedure_extract_system.md。
+配图下载转 base64(绕防盗链)随文本一起发。结果按工序拆行写 mode_process。
+
+用法(一般由 server.py 起子进程调):
+  python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc
+  python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc --model google/gemini-3.1-flash-lite
+"""
+import argparse
+import asyncio
+import base64
+import json
+import sys
+import time
+from datetime import datetime
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
+sys.path.insert(0, str(PROJECT_ROOT))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from examples.process_pipeline.script.llm_helper import call_llm_with_retry
+
+HERE = Path(__file__).resolve().parent
+MW = HERE.parent
+sys.path.insert(0, str(MW))
+import db
+
+PROMPT_FILE = MW / "prompts" / "procedure_extract_system.md"
+DEFAULT_MODEL = "anthropic/claude-sonnet-4-6"
+MAX_IMAGES = 8
+
+
+# ── 以下 4 个助手原样取自 mode_procedure/mode-dsl/procedure_model_extract.py ──
+
+def _detect_image_mime(data: bytes):
+    if not data or len(data) < 12:
+        return None
+    if data[:3] == b"\xff\xd8\xff":
+        return "image/jpeg"
+    if data[:8] == b"\x89PNG\r\n\x1a\n":
+        return "image/png"
+    if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
+        return "image/webp"
+    if data[:6] in (b"GIF87a", b"GIF89a"):
+        return "image/gif"
+    return None
+
+
+async def _fetch_data_url(url, sem):
+    from agent.tools.builtin.file.image_cdn import _download_image
+    async with sem:
+        try:
+            data = await _download_image(url)
+        except Exception:
+            return None
+    mime = _detect_image_mime(data)
+    if mime is None:
+        return None
+    return f"data:{mime};base64,{base64.b64encode(data).decode()}"
+
+
+async def _collect_images(urls, max_images, concurrency):
+    urls = [u for u in urls if isinstance(u, str) and u][:max_images]
+    if not urls:
+        return []
+    sem = asyncio.Semaphore(concurrency)
+    results = await asyncio.gather(*[_fetch_data_url(u, sem) for u in urls])
+    return [d for d in results if d]
+
+
+def _validate_wf(data):
+    if not isinstance(data, dict):
+        return "顶层必须是 JSON 对象"
+    if "procedures" not in data:
+        return '缺少 "procedures" 字段'
+    if not isinstance(data["procedures"], list):
+        return '"procedures" 必须是数组'
+    return None
+
+
+def _sanitize_workflow(data):
+    dropped = {"procedures": 0, "steps": 0, "io": 0}
+    procs = data.get("procedures")
+    if not isinstance(procs, list):
+        return data, dropped
+    clean_procs = []
+    for p in procs:
+        if not isinstance(p, dict):
+            dropped["procedures"] += 1
+            continue
+        steps = p.get("steps")
+        if isinstance(steps, list):
+            kept = []
+            for s in steps:
+                if not isinstance(s, dict):
+                    dropped["steps"] += 1
+                    continue
+                for io in ("inputs", "outputs"):
+                    if isinstance(s.get(io), list):
+                        before = len(s[io])
+                        s[io] = [x for x in s[io] if isinstance(x, dict)]
+                        dropped["io"] += before - len(s[io])
+                kept.append(s)
+            p["steps"] = kept
+        if not isinstance(p.get("declarations"), dict):
+            p.pop("declarations", None)
+        if not isinstance(p.get("type_registry"), dict):
+            p.pop("type_registry", None)
+        clean_procs.append(p)
+    data["procedures"] = clean_procs
+    return data, dropped
+
+# ── 助手复制结束 ──────────────────────────────────────────────────────────────
+
+
+async def extract_one(row, system, llm_call, model, args):
+    """单帖工序解构 → 写 mode_process。返回 cost。"""
+    cid = row["case_id"]
+    t0 = time.monotonic()
+    post_text = (f"【标题】{row['title'] or ''}\n【来源】{row['url'] or ''}\n"
+                 f"【正文】\n{row['body'] or ''}")
+    data_urls = [] if args.no_images else await _collect_images(
+        row["images"] or [], args.max_images, args.max_concurrent)
+    print(f"🖼️  {cid} 配图 {len(data_urls)}/{len(row['images'] or [])} 张")
+
+    if data_urls:
+        user_content = [{"type": "text", "text": post_text}]
+        for u in data_urls:
+            user_content.append({"type": "image_url", "image_url": {"url": u}})
+        messages = [{"role": "system", "content": system},
+                    {"role": "user", "content": user_content}]
+    else:
+        messages = [{"role": "system", "content": system},
+                    {"role": "user", "content": post_text}]
+
+    data, cost = await call_llm_with_retry(
+        llm_call=llm_call, messages=messages, model=model,
+        temperature=0.2, max_tokens=args.max_tokens,
+        validate_fn=_validate_wf, task_name=f"ProcExtract[{cid}]",
+    )
+    if not data:
+        print(f"❌ {cid} 解构失败(重试耗尽)")
+        return cost
+
+    data, dropped = _sanitize_workflow(data)
+    if any(dropped.values()):
+        print(f"🧹 {cid} 清洗:丢弃 procedure {dropped['procedures']} / "
+              f"step {dropped['steps']} / io {dropped['io']}")
+
+    dur = round(time.monotonic() - t0, 1)
+    n = db.replace_process(args.query_id, cid, row["platform"], row["title"],
+                           data, model, args.version, cost, dur)
+    out_dir = MW / "runs" / "procedures"
+    out_dir.mkdir(parents=True, exist_ok=True)
+    (out_dir / f"{cid}_{args.version}.json").write_text(
+        json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
+    print(f"   ✅ {cid} → {n} 个工序 · ${cost:.4f} · {dur}s")
+    return cost
+
+
+async def run(args):
+    case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+    rows = []
+    for cid in case_ids:
+        row = db.fetch_post(args.query_id, cid)
+        if row is None:
+            print(f"⚠️ {args.query_id}/{cid} 不在 search_data,跳过")
+            continue
+        rows.append(row)
+    if not rows:
+        print("❌ 没有可解构的帖子"); return 1
+
+    system = PROMPT_FILE.read_text(encoding="utf-8")
+    from agent.llm.openrouter import create_openrouter_llm_call
+    llm_call = create_openrouter_llm_call(model=args.model)
+    args.version = args.version or ("v_" + datetime.now().strftime("%m%d%H%M"))
+    print(f"🤖 工序解构 {len(rows)} 帖 · 模型 {args.model} · 版本 {args.version}")
+
+    costs = []
+    for row in rows:   # 工序解构 token 重,串行跑,避免 OpenRouter 限流
+        costs.append(await extract_one(row, system, llm_call, args.model, args))
+    print(f"\n📊 完成 {len(rows)} 帖 · 总成本 ${sum(costs):.4f}")
+    return 0
+
+
+def main():
+    p = argparse.ArgumentParser(description="工序解构:search_data 帖子 → mode_process")
+    p.add_argument("--query-id", required=True)
+    p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
+    p.add_argument("--model", default=DEFAULT_MODEL)
+    p.add_argument("--version", default=None, help="默认自动 v_月日时分")
+    p.add_argument("--max-images", type=int, default=MAX_IMAGES)
+    p.add_argument("--max-concurrent", type=int, default=4)
+    p.add_argument("--max-tokens", type=int, default=8000)
+    p.add_argument("--no-images", action="store_true")
+    args = p.parse_args()
+    raise SystemExit(asyncio.run(run(args)))
+
+
+if __name__ == "__main__":
+    main()
+```
+
+- [ ] **Step 2: 冒烟(需 API key;工序解构默认模型是 claude-sonnet,单帖约 $0.03-0.1;省钱可加 `--model google/gemini-3.1-flash-lite`)**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+CID=$(python3 -c "import db; print(db.fetch_posts('q0000')[0]['case_id'])")
+python pipeline/procedure_extract.py --query-id q0000 --case-ids "$CID" --model google/gemini-3.1-flash-lite
+python3 -c "
+import db
+r = db.fetch_process('$CID')
+print(r['version'], len(r['procedures']), '个工序')
+p = r['procedures'][0]
+print('steps:', len(p['steps']), 'tools_used:', p['tools_used'])
+"
+```
+Expected: `✅ <cid> → N 个工序`;fetch_process 返回 version、工序数 ≥1、steps 非空、tools_used 列出 via。
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add examples/mode_workflow/pipeline/procedure_extract.py
+git commit -m "feat(mode_workflow): 工序解构 pipeline(读库→LLM→mode_process)"
+```
+
+---
+
+### Task 6: pipeline/search_eval.py — 搜索 + 评估入库
+
+**Files:**
+- Create: `examples/mode_workflow/pipeline/search_eval.py`
+
+**与 run_search.py 的差异:** 不写死 4 组 query,接受任意 `--query`(可带 `--synonyms` 同义措辞);产出直接入库 `search_data`,`runs/search/` 留调试副本。
+
+- [ ] **Step 1: 写完整文件**
+
+```python
+# -*- coding: utf-8 -*-
+"""搜索 + 评估 · 任意 query → 多渠道搜索去重 → LLM 逐帖评估 → search_data 表
+================================================================================
+引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。
+
+用法(一般由 server.py 起子进程调):
+  python pipeline/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做"
+  python pipeline/search_eval.py --query-id q0005 --query "GPT image2 评测" \
+      --synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10
+"""
+import argparse
+import asyncio
+import json
+import sys
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
+sys.path.insert(0, str(PROJECT_ROOT))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from examples.process_pipeline.script.search_eval.search_and_evaluate import (
+    search_all, evaluate_posts, transcribe_video_posts, build_query_overrides,
+)
+from examples.process_pipeline.script.llm_evaluate_sources import (
+    build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL,
+)
+
+HERE = Path(__file__).resolve().parent
+MW = HERE.parent
+sys.path.insert(0, str(MW))
+import db
+
+
+async def run(args):
+    phrasings = [args.query] + [s.strip() for s in (args.synonyms or "").split(",") if s.strip()]
+    # 去重保序
+    seen, uniq = set(), []
+    for q in phrasings:
+        if q not in seen:
+            seen.add(q); uniq.append(q)
+    phrasings = uniq
+    platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
+
+    eval_llm, eval_model_id = build_eval_llm_call(args.eval_model)
+    print(f"▶ {args.query_id}  query={args.query!r}  措辞={phrasings}  渠道={platforms}")
+
+    overrides = await build_query_overrides(platforms, phrasings, eval_llm, eval_model_id)
+    sources = await search_all(platforms, phrasings, args.max_count,
+                               args.max_concurrent, query_overrides=overrides)
+    print(f"🔎 去重后 {len(sources)} 帖")
+    if not sources:
+        print("❌ 搜索无结果"); return 1
+
+    try:
+        from examples.process_pipeline.script.extract_sources import _convert_timestamps
+        _convert_timestamps(sources)
+    except Exception:
+        pass
+
+    if not args.no_transcribe:
+        n = await transcribe_video_posts(sources, concurrency=args.max_concurrent)
+        if n:
+            print(f"🎙️  视频转写 {n} 条")
+
+    cost = 0.0
+    if not args.no_eval:
+        sources, cost = await evaluate_posts(
+            sources, "", eval_llm, eval_model_id, args.max_concurrent,
+            include_images=not args.no_images, max_images=args.max_images,
+            image_mode=args.image_mode, query=args.query,
+        )
+    for s in sources:
+        s.pop("_image_data_urls", None)
+
+    n = db.upsert_search_posts(args.query_id, args.query, sources)
+    print(f"🗄️  search_data 入库 {n} 行 · 评估成本 ${cost:.4f}")
+
+    out_dir = MW / "runs" / "search"
+    out_dir.mkdir(parents=True, exist_ok=True)
+    (out_dir / f"{args.query_id}.json").write_text(json.dumps({
+        "query_id": args.query_id, "query": args.query, "phrasings": phrasings,
+        "platforms": platforms, "total": len(sources), "results": sources,
+    }, ensure_ascii=False, indent=2), encoding="utf-8")
+    return 0
+
+
+def main():
+    p = argparse.ArgumentParser(description="搜索+评估 → search_data")
+    p.add_argument("--query-id", required=True, help="如 q0004(server 自动分配)")
+    p.add_argument("--query", required=True, help="基准 query(评估锚点)")
+    p.add_argument("--synonyms", default="", help="逗号分隔的同义措辞(可选)")
+    p.add_argument("--platforms", default="xhs,gzh")
+    p.add_argument("--max-count", type=int, default=10)
+    p.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS))
+    p.add_argument("--max-concurrent", type=int, default=3)
+    p.add_argument("--max-images", type=int, default=4)
+    p.add_argument("--image-mode", choices=["url", "base64"], default="url")
+    p.add_argument("--no-transcribe", action="store_true")
+    p.add_argument("--no-eval", action="store_true")
+    p.add_argument("--no-images", action="store_true")
+    args = p.parse_args()
+    raise SystemExit(asyncio.run(run(args)))
+
+
+if __name__ == "__main__":
+    main()
+```
+
+- [ ] **Step 2: 冒烟(需 API key + 搜索通道可用;成本与时长都高,可只验 import 与参数解析)**
+
+最小验证(不花钱):
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+python pipeline/search_eval.py --help
+```
+Expected: 正常打出 usage(证明 import 链 OK——这条会触发 agent.tools 注册链,import 错误在这暴露)。
+
+完整冒烟(可选,~1-3 分钟、~$0.05):
+
+```bash
+python pipeline/search_eval.py --query-id q9999 --query "GPT image2 评测" --platforms xhs --max-count 3
+python3 -c "import db; print(len(db.fetch_posts('q9999')), '帖')"
+```
+Expected: 入库 ≥1 行。验完可清理:`python3 -c "import db; c=db._conn(); c.cursor().execute(\"DELETE FROM search_data WHERE query_id='q9999'\"); c.close()"`
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add examples/mode_workflow/pipeline/search_eval.py
+git commit -m "feat(mode_workflow): 搜索评估 pipeline(任意 query → search_data)"
+```
+
+---
+
+### Task 7: server.py — API + 任务管理 + Dashboard 聚合
+
+**Files:**
+- Create: `examples/mode_workflow/server.py`
+
+**API 契约(index.html 依赖,Task 8 不得偏离):**
+
+| Method+Path | 入参 | 返回 |
+|---|---|---|
+| GET `/` | - | index.html |
+| GET `/api/dashboard` | - | 见下方 `_dashboard()` 返回结构 |
+| GET `/api/queries` | - | `[{query_id, query_text, post_count, process_done, tools_done}]` |
+| GET `/api/posts` | `query_id` | `[{...search_data 行, has_process, has_tools}]` |
+| GET `/api/process_versions` | `case_id` | `[{version, n, model}]` |
+| GET `/api/process` | `case_id`, `version?` | `{case_id, version, model, source, procedures:[...]}` 或 404 |
+| GET `/api/tools_versions` | `case_id` | `[{version, n, model}]` |
+| GET `/api/tools` | `case_id`, `version?` | `{case_id, version, model, tool_count, tools:[...]}` 或 404 |
+| POST `/api/extract_process` | `{query_id, case_ids:[..], model?}` | `{task_id}` |
+| POST `/api/extract_tools` | `{query_id, case_ids:[..], model?}` | `{task_id}` |
+| POST `/api/run_search` | `{query, synonyms?, platforms?, max_count?}` | `{task_id, query_id}`(query_id 自动分配下一个 qNNNN) |
+| GET `/api/task_status` | `task_id` | `{status: running\|done\|failed, log_tail}` |
+
+- [ ] **Step 1: 写完整文件**
+
+```python
+# -*- coding: utf-8 -*-
+"""mode_workflow server · 页面 + API + 解构任务管理
+================================================================================
+单服务(默认 8772):
+  - GET  /                    index.html
+  - GET  /api/dashboard       Dashboard 全部聚合指标(含内容树覆盖)
+  - GET  /api/queries|posts|process|tools(+_versions)   Dataset 数据
+  - POST /api/run_search|extract_process|extract_tools  起子进程跑 pipeline
+  - GET  /api/task_status     轮询任务状态(读日志尾部)
+
+用法:python server.py [port]
+"""
+import json
+import subprocess
+import sys
+import threading
+from collections import Counter
+from datetime import datetime
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+from pathlib import Path
+from urllib.parse import urlparse, parse_qs
+
+try:
+    sys.stdout.reconfigure(encoding="utf-8")
+except Exception:
+    pass
+
+HERE = Path(__file__).resolve().parent
+sys.path.insert(0, str(HERE))
+import db
+
+PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772
+MATRIX_FILE = HERE / "reference" / "judged_matrix.json"
+LOG_DIR = HERE / "runs" / "logs"
+
+# ── 任务管理:task_id → {proc, log, status} ──────────────────────────────────
+TASKS = {}
+_TASK_LOCK = threading.Lock()
+
+
+def _spawn_task(kind, cmd):
+    LOG_DIR.mkdir(parents=True, exist_ok=True)
+    task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}"
+    log_path = LOG_DIR / f"{task_id}.log"
+    f = open(log_path, "w", encoding="utf-8")
+    proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT,
+                            cwd=str(HERE), text=True)
+    with _TASK_LOCK:
+        TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running"}
+
+    def _wait():
+        rc = proc.wait()
+        f.close()
+        with _TASK_LOCK:
+            TASKS[task_id]["status"] = "done" if rc == 0 else "failed"
+
+    threading.Thread(target=_wait, daemon=True).start()
+    return task_id
+
+
+def _task_status(task_id):
+    with _TASK_LOCK:
+        t = TASKS.get(task_id)
+    if not t:
+        return None
+    tail = ""
+    try:
+        text = t["log"].read_text(encoding="utf-8", errors="replace")
+        tail = text[-3000:]
+    except Exception:
+        pass
+    return {"status": t["status"], "log_tail": tail}
+
+
+def _next_query_id():
+    qs = [q["query_id"] for q in db.fetch_queries()]
+    nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()]
+    return f"q{(max(nums) + 1 if nums else 0):04d}"
+
+
+# ── Dashboard 聚合 ────────────────────────────────────────────────────────────
+
+def _split_values(v):
+    """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。"""
+    out = []
+    items = v if isinstance(v, list) else [v]
+    for it in items:
+        if not it or not isinstance(it, str):
+            continue
+        for piece in it.replace(",", "、").replace("/", "、").split("、"):
+            piece = piece.strip()
+            if piece:
+                out.append(piece)
+    return out
+
+
+def _dashboard():
+    posts, procs, tools = db.fetch_dashboard_rows()
+
+    # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)
+    def latest(rows):
+        best = {}
+        for r in rows:
+            cid = r["case_id"]
+            if cid not in best or (r["version"] or "") > (best[cid][0] or ""):
+                best[cid] = (r["version"], [])
+        out = []
+        for r in rows:
+            if r["version"] == best[r["case_id"]][0]:
+                out.append(r)
+        return out
+
+    latest_procs = latest(procs)
+    latest_tools = latest(tools)
+
+    # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1)
+    jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8"))
+    a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])}
+    t_idx = {t["name"]: i for i, t in enumerate(jm["types"])}
+    valid = set()
+    for ai, row in enumerate(jm["matrix"]):
+        for ti, cell in enumerate(row):
+            if isinstance(cell, dict) and cell.get("tier", 0) >= 1:
+                valid.add((ai, ti))
+    covered = set()
+    via_counter = Counter()
+    substance_counter = Counter()
+    form_counter = Counter()
+    for r in latest_procs:
+        for s in r["steps"]:
+            leaf = (s.get("action") or "").split("/")[-1].strip()
+            types = []
+            for io in ("inputs", "outputs"):
+                for x in s.get(io) or []:
+                    if isinstance(x, dict) and x.get("type"):
+                        types.append(str(x["type"]).strip())
+            if leaf in a_idx:
+                for tp in types:
+                    if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid:
+                        covered.add((a_idx[leaf], t_idx[tp]))
+            via = (s.get("via") or "").strip()
+            if via:
+                via_counter[via] += 1
+            for v in _split_values(s.get("substance")):
+                substance_counter[v] += 1
+            for v in _split_values(s.get("form")):
+                form_counter[v] += 1
+    for r in latest_tools:
+        for v in _split_values(r["substance_scope"]):
+            substance_counter[v] += 1
+        for v in _split_values(r["form_scope"]):
+            form_counter[v] += 1
+
+    # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值)
+    def cost_groups(rows):
+        g = {}
+        for r in rows:
+            key = (r["case_id"], r["version"])
+            if key not in g and r["cost_usd"] is not None:
+                g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"])
+        return list(g.values())
+
+    runs = cost_groups(procs) + cost_groups(tools)
+    total_cost = round(sum(c for c, _, _ in runs), 4)
+    total_dur = round(sum(d for _, d, _ in runs), 1)
+    # 按日成本趋势
+    daily = Counter()
+    for c, _, ts in runs:
+        if ts:
+            daily[ts[:10]] += c
+    cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())]
+
+    # 进度:分母 = knowledge_type 含对应类型的帖子(distinct case)
+    proc_targets = {p["case_id"] for p in posts if "工序" in (p["knowledge_type"] or [])}
+    tool_targets = {p["case_id"] for p in posts if "工具" in (p["knowledge_type"] or [])}
+    proc_done = {r["case_id"] for r in procs}
+    tool_done = {r["case_id"] for r in tools}
+
+    return {
+        "result": {
+            "matrix_covered": len(covered), "matrix_valid": len(valid),
+            "matrix_cells": sorted([ai, ti] for ai, ti in covered),
+            "matrix_actions": [a["name"] for a in jm["actions"]],
+            "matrix_types": [t["name"] for t in jm["types"]],
+            "substance_count": len(substance_counter),
+            "substance_top": substance_counter.most_common(15),
+            "form_count": len(form_counter),
+            "form_top": form_counter.most_common(15),
+            "post_count": len(posts),
+            "extracted_post_count": len(proc_done | tool_done),
+            "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}),
+            "via_top10": via_counter.most_common(10),
+        },
+        "process_data": {
+            "run_count": len(runs),
+            "avg_cost": round(total_cost / len(runs), 4) if runs else 0,
+            "total_cost": total_cost,
+            "avg_duration": round(total_dur / len(runs), 1) if runs else 0,
+            "total_duration": total_dur,
+            "cost_trend": cost_trend,
+            "process_progress": {"done": len(proc_done), "total": len(proc_targets)},
+            "tools_progress": {"done": len(tool_done), "total": len(tool_targets)},
+        },
+    }
+
+
+# ── HTTP handler ─────────────────────────────────────────────────────────────
+
+class Handler(BaseHTTPRequestHandler):
+
+    def _json(self, data, code=200):
+        body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
+        self.send_response(code)
+        self.send_header("Content-Type", "application/json; charset=utf-8")
+        self.send_header("Content-Length", str(len(body)))
+        self.end_headers()
+        self.wfile.write(body)
+
+    def _err(self, msg, code=400):
+        self._json({"error": msg}, code)
+
+    def do_GET(self):
+        u = urlparse(self.path)
+        qs = {k: v[0] for k, v in parse_qs(u.query).items()}
+        try:
+            if u.path == "/" or u.path == "/index.html":
+                body = (HERE / "index.html").read_bytes()
+                self.send_response(200)
+                self.send_header("Content-Type", "text/html; charset=utf-8")
+                self.send_header("Content-Length", str(len(body)))
+                self.end_headers()
+                self.wfile.write(body)
+            elif u.path == "/api/dashboard":
+                self._json(_dashboard())
+            elif u.path == "/api/queries":
+                self._json(db.fetch_queries())
+            elif u.path == "/api/posts":
+                self._json(db.fetch_posts(qs.get("query_id", "")))
+            elif u.path == "/api/process_versions":
+                self._json(db.fetch_process_versions(qs.get("case_id", "")))
+            elif u.path == "/api/process":
+                r = db.fetch_process(qs.get("case_id", ""), qs.get("version"))
+                self._json(r) if r else self._err("无解构记录", 404)
+            elif u.path == "/api/tools_versions":
+                self._json(db.fetch_tools_versions(qs.get("case_id", "")))
+            elif u.path == "/api/tools":
+                r = db.fetch_tools(qs.get("case_id", ""), qs.get("version"))
+                self._json(r) if r else self._err("无解构记录", 404)
+            elif u.path == "/api/task_status":
+                r = _task_status(qs.get("task_id", ""))
+                self._json(r) if r else self._err("未知 task_id", 404)
+            else:
+                self._err("not found", 404)
+        except Exception as e:
+            self._err(f"{type(e).__name__}: {e}", 500)
+
+    def do_POST(self):
+        u = urlparse(self.path)
+        try:
+            n = int(self.headers.get("Content-Length") or 0)
+            payload = json.loads(self.rfile.read(n) or b"{}")
+        except Exception:
+            return self._err("body 必须是 JSON")
+        try:
+            if u.path in ("/api/extract_process", "/api/extract_tools"):
+                qid = payload.get("query_id")
+                cids = payload.get("case_ids") or []
+                if not qid or not cids:
+                    return self._err("缺 query_id / case_ids")
+                script = ("pipeline/procedure_extract.py" if u.path.endswith("process")
+                          else "pipeline/tool_extract.py")
+                cmd = [sys.executable, script, "--query-id", qid,
+                       "--case-ids", ",".join(cids)]
+                if payload.get("model"):
+                    cmd += ["--model", payload["model"]]
+                kind = "proc" if u.path.endswith("process") else "tool"
+                self._json({"task_id": _spawn_task(kind, cmd)})
+            elif u.path == "/api/run_search":
+                query = (payload.get("query") or "").strip()
+                if not query:
+                    return self._err("缺 query")
+                qid = payload.get("query_id") or _next_query_id()
+                cmd = [sys.executable, "pipeline/search_eval.py",
+                       "--query-id", qid, "--query", query]
+                if payload.get("synonyms"):
+                    cmd += ["--synonyms", payload["synonyms"]]
+                if payload.get("platforms"):
+                    cmd += ["--platforms", payload["platforms"]]
+                if payload.get("max_count"):
+                    cmd += ["--max-count", str(payload["max_count"])]
+                self._json({"task_id": _spawn_task("search", cmd), "query_id": qid})
+            else:
+                self._err("not found", 404)
+        except Exception as e:
+            self._err(f"{type(e).__name__}: {e}", 500)
+
+    def log_message(self, fmt, *a):
+        pass   # 静默访问日志
+
+
+if __name__ == "__main__":
+    print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}")
+    ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
+```
+
+- [ ] **Step 2: 启动 + 逐端点验证**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+python server.py 8772 &
+sleep 1
+curl -s localhost:8772/api/queries | python3 -m json.tool | head -20
+curl -s localhost:8772/api/dashboard | python3 -c "
+import json,sys
+d = json.load(sys.stdin)
+print('采集帖子:', d['result']['post_count'])
+print('内容树:', d['result']['matrix_covered'], '/', d['result']['matrix_valid'])
+print('进度:', d['process_data'])
+"
+curl -s "localhost:8772/api/posts?query_id=q0000" | python3 -c "import json,sys; r=json.load(sys.stdin); print(len(r),'帖, 首帖:', r[0]['case_id'], r[0]['has_process'], r[0]['has_tools'])"
+curl -s "localhost:8772/api/process?case_id=不存在" ; echo
+kill %1
+```
+Expected: queries 返回 q0000-q0003;dashboard 的 post_count>0、matrix_valid==643;posts 返回该 query 帖子;不存在的 case 返回 `{"error": "无解构记录"}` 404。若 Task 4/5 冒烟跑过,对应 case 的 process/tools 端点也应返回数据。
+
+- [ ] **Step 3: 任务接口验证(不花钱,用 --help 不行,改用故意缺参验证错误路径 + 真实任务可选)**
+
+```bash
+curl -s -X POST localhost:8772/api/extract_tools -d '{}' ; echo
+```
+Expected: `{"error": "缺 query_id / case_ids"}`。(真实解构任务在 Task 9 端到端验收时从 UI 触发。)
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add examples/mode_workflow/server.py
+git commit -m "feat(mode_workflow): server(API+任务管理+Dashboard聚合)"
+```
+
+---
+
+### Task 8: index.html — 三 tab 前端
+
+**Files:**
+- Create: `examples/mode_workflow/index.html`
+
+**实现方式:** 本任务**必须先invoke `frontend-design:frontend-design` 技能**再写代码——产出单文件 HTML(原生 JS,ECharts 5 CDN:`https://cdn.jsdelivr.net/npm/echarts@5/dist/echarts.min.js`),禁止引入构建链。API 契约见 Task 7 表格,字段名不得偏离。
+
+**视觉方向(延续用户截图的既有风格):** 白底卡片 + 深色(藏青/石板)表头多色分区表格;标签 pill(类型彩色底);需求区深蓝表头、输入区橙黄表头、输出区绿表头的分区配色;推断字段用「推」角标 + 高亮底色,hover 显示 inferred_reason。整体配中文界面。
+
+**页面结构与行为(验收依据):**
+
+1. **顶部导航**:Dashboard / Dataset / 聚类库 三 tab(hash 路由 `#dashboard` `#dataset` `#cluster`,刷新保位)。
+
+2. **Dashboard tab**(数据源 `GET /api/dashboard`):
+   - 第一行数字卡:采集帖子数量、解构帖子数量、工具数量、内容树覆盖(`covered/valid` + 百分比)。
+   - 第二行数字卡(过程数据):累计解构成本(`$total_cost`,副标题单条均值 `$avg_cost`)、总耗时(副标题平均耗时)、工序进度环(`done/total`)、工具进度环。
+   - 图表区:
+     - 内容树覆盖热力图:ECharts heatmap,x=50 types,y=27 actions,covered cell 高亮(数据来自 `matrix_cells`/`matrix_actions`/`matrix_types`)。
+     - 工序提及工具 Top10:横向条形图(`via_top10`)。
+     - 实质覆盖分布:条形图(`substance_top`,标题带总数 `substance_count`)。
+     - 形式覆盖分布:条形图(`form_top` + `form_count`)。
+     - 成本趋势:折线图(`cost_trend`)。
+   - 全部指标为 0/空时不报错,显示空态文案。
+
+3. **Dataset tab**:顶部「工序 / 工具」子切换 + 「新建搜索」按钮;三栏:
+   - 左栏 query 列表(`/api/queries`):query_text + 帖子数 + 解构进度(`process_done` 或 `tools_done` 随子模式切换);点击选中。
+   - 中栏帖子列表(`/api/posts?query_id=`):卡片含标题、平台徽标(xhs/gzh/zhihu/x)、overall_score、knowledge_type pill、已解构标记(has_process/has_tools);卡片上「解构」按钮 + 多选批量解构;点击卡片在右栏加载。
+   - 右栏解构结果:
+     - 工序模式:头部「大模型工序:已提取」+ 版本下拉(`/api/process_versions`,格式 `v_xxx (最新) · N工序`)+「重新生成」按钮;每个工序渲染:工序名 + 目的 + 类别 pill + 平台/作者;输入/返回声明区;步骤明细表(分区表头:需求[#/目的/作用/实质/形式] 深蓝、输入[类型/值/来源] 橙、实现[外部工具/动作/指令] 绿、输出[类型/值/去处] 深绿),`inferred:true` 的单元格高亮 +「推」角标,hover title 显示 inferred_reason。
+     - 工具模式:版本下拉 + 重新生成;每个工具一张卡:工具名、创作层级 pill、实质/形式作用域 pill 组、输入/输出、用法列表、案例(输入/输出/效果)、缺点列表、来源链接。
+     - 无解构记录(404)时显示空态 +「开始解构」按钮。
+   - 解构/搜索触发后:弹出任务面板,轮询 `/api/task_status`(2s 间隔),显示 log_tail,done 后自动刷新当前栏;failed 显示日志。
+   - 「新建搜索」:弹窗输 query/synonyms/platforms/max_count → POST `/api/run_search` → 任务面板,完成后左栏刷新出现新 query。
+
+4. **聚类库 tab**:居中空态占位(图标 + 「聚类库 · 敬请期待」)。
+
+**最小骨架(代码结构基线,frontend-design 在此骨架上完成完整实现):**
+
+```html
+<!DOCTYPE html>
+<html lang="zh">
+<head>
+<meta charset="utf-8">
+<title>mode_workflow</title>
+<script src="https://cdn.jsdelivr.net/npm/echarts@5/dist/echarts.min.js"></script>
+<style>/* frontend-design 产出 */</style>
+</head>
+<body>
+<nav id="tabs"><!-- Dashboard / Dataset / 聚类库 --></nav>
+<main id="view-dashboard" class="view"></main>
+<main id="view-dataset" class="view">
+  <aside id="query-list"></aside>
+  <section id="post-list"></section>
+  <section id="extract-panel"></section>
+</main>
+<main id="view-cluster" class="view"></main>
+<div id="task-panel" hidden></div>
+<script>
+const api = (p, opt) => fetch(p, opt).then(async r => {
+  if (!r.ok) throw Object.assign(new Error(), {status: r.status, body: await r.json().catch(() => ({}))});
+  return r.json();
+});
+const state = {tab: 'dashboard', mode: 'process', queryId: null, caseId: null, version: null};
+function route() { state.tab = (location.hash || '#dashboard').slice(1); render(); }
+window.addEventListener('hashchange', route);
+async function pollTask(taskId, onDone) {
+  const t = await api(`/api/task_status?task_id=${taskId}`);
+  renderTaskPanel(t);
+  if (t.status === 'running') setTimeout(() => pollTask(taskId, onDone), 2000);
+  else if (t.status === 'done') onDone();
+}
+// render() / renderDashboard() / renderDataset() / renderTaskPanel() … frontend-design 产出
+route();
+</script>
+</body>
+</html>
+```
+
+- [ ] **Step 1: invoke frontend-design 技能,按上述规格产出完整 index.html**
+
+- [ ] **Step 2: 浏览器验收**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+python server.py 8772 &
+```
+打开 `http://localhost:8772`,核对上面"页面结构与行为"逐条:三 tab 切换、Dashboard 全部卡片与 5 张图渲染、Dataset 三栏联动、版本下拉、空态、聚类库占位。若 Task 4/5 冒烟跑过,选对应帖子确认工序表格分区配色与「推」高亮、工具卡片完整。
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add examples/mode_workflow/index.html
+git commit -m "feat(mode_workflow): 三tab单页前端(Dashboard/Dataset/聚类库)"
+```
+
+---
+
+### Task 9: README + 端到端验收
+
+**Files:**
+- Create: `examples/mode_workflow/README.md`
+
+- [ ] **Step 1: 写 README**
+
+```markdown
+# mode_workflow · 搜索评估 + 工序/工具解构工作台
+
+MySQL 三表(search_data / mode_process / mode_tools)为唯一事实源的单页工作台:
+Dashboard(结果/过程指标可视化)、Dataset(query → 帖子 → 工序/工具解构)、聚类库(占位)。
+
+设计文档:`docs/superpowers/specs/2026-06-12-mode-workflow-design.md`
+
+## 启动
+
+```bash
+# 0. 前置:.env 配 MYSQL_* 与 OPEN_ROUTER_API_KEY;pip install -e .
+python db.py init             # 建三张表(幂等)
+python import_history.py      # (可选)导入 fixed_query_eval 历史搜索结果
+python server.py              # http://localhost:8772
+```
+
+## 结构
+
+| 文件 | 职责 |
+|---|---|
+| `db.py` | 三表 DDL + 全部读写(读 .env MYSQL_*) |
+| `server.py` | 页面 + API + 解构任务子进程管理(端口 8772) |
+| `index.html` | 单文件前端:Dashboard / Dataset / 聚类库 |
+| `pipeline/search_eval.py` | 任意 query 搜索+评估 → search_data |
+| `pipeline/procedure_extract.py` | 工序解构(LLM 直出)→ mode_process |
+| `pipeline/tool_extract.py` | 工具解构 → mode_tools |
+| `prompts/` | 工序/工具解构 system prompt(可单独迭代) |
+| `reference/judged_matrix.json` | 内容树(27 动作×50 类型),Dashboard 覆盖度用 |
+| `runs/` | 运行日志与调试副本(gitignore) |
+
+搜索/评估/转写引擎函数只读复用
+`examples/process_pipeline/script/search_eval/search_and_evaluate.py`,本目录不复制引擎代码。
+
+## 与旧 search_eval 的关系
+
+取代 `fixed_query_eval`(8770)+ `mode_procedure`(8771)两套服务的"搜索评估 + 大模型解构"
+部分;procedure-dsl 执行引擎、mode-dsl 模式提取、A/B/C 三形式对比未迁移(按设计裁剪)。
+```
+
+- [ ] **Step 2: 端到端验收(需 API key)**
+
+```bash
+cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
+python server.py 8772 &
+```
+浏览器走完整链路:
+1. Dataset → 选 q0000 → 选 1 帖 → 触发工具解构 → 任务面板出日志 → 完成后右栏出工具卡。
+2. 同帖触发工序解构(模型选 `google/gemini-3.1-flash-lite` 控成本)→ 右栏出工序分区表格。
+3. 回 Dashboard → 解构帖子数量/工具数量/Top10/成本/进度环全部联动更新。
+4. 新建搜索(query 任意,platforms=xhs,max_count=3)→ 完成后左栏出现新 query。
+
+- [ ] **Step 3: 最终 Commit**
+
+```bash
+git add examples/mode_workflow/README.md
+git commit -m "docs(mode_workflow): README 与启动说明"
+```
+
+- [ ] **Step 4: 收尾检查**
+
+```bash
+git status examples/mode_workflow   # runs/ 不应出现在未跟踪列表(被 .gitignore 覆盖)
+```
+
+---
+
+## Self-Review 记录
+
+- **Spec 覆盖**:三表(Task 2)、历史导入(Task 3)、三条 pipeline(Task 4-6)、API+Dashboard 口径(Task 7)、三 tab 前端+截图复刻(Task 8)、聚类库占位(Task 8)、README(Task 9)——spec 全部章节有对应任务。spec 中 `prompts/eval_prompt.md` 一项经核实不需要(rubric 固化在引擎内),已在计划头部标注修正。
+- **占位符扫描**:无 TBD/TODO;Task 8 的 HTML 完整实现委托 frontend-design 技能,但 API 契约、页面行为、骨架代码、验收清单均已写死,不属于开放占位。
+- **类型/命名一致性**:`replace_process(payload)` 与 `fetch_process()` 的 procedures 字段对称;server 调 `db.fetch_*` 名称与 Task 2 定义一致;pipeline 脚本参数名与 server `_spawn_task` 构造的 cmd 一致(`--query-id/--case-ids/--model`);`_dashboard()` 返回字段与 Task 8 前端读取字段一致(`matrix_cells/via_top10/substance_top/cost_trend/process_progress` 等)。