2026-06-12-mode-workflow.md 77 KB

mode_workflow Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal:examples/mode_workflow/ 建一套"搜索评估 + 工序/工具大模型解构"系统:MySQL 三张新表为唯一事实源,单 server(8772) + 单页 HTML(Dashboard / Dataset / 聚类库)。

Architecture: DB-first(区别于旧系统文件为主双写)。搜索/解构引擎函数从 examples/process_pipeline/script/ 只读 import(沿用 run_search.py 先例),mode_workflow 只持有编排脚本、prompts、db 层、server、前端。解构由 server 起子进程跑 pipeline 脚本,结果直接写库,前端轮询任务状态。

Tech Stack: Python(http.server + pymysql + asyncio)、MySQL(.env MYSQL_*)、OpenRouter LLM(经 call_llm_with_retry)、单文件 HTML + 原生 JS + ECharts(CDN)。

Spec: docs/superpowers/specs/2026-06-12-mode-workflow-design.md

前置条件(执行前确认):

  • .envMYSQL_HOST/MYSQL_PORT/MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE(Task 2 起必需)
  • .envOPEN_ROUTER_API_KEY(Task 4-6 的冒烟测试需要,会产生 ~$0.01-0.05 真实费用;没有 key 则跳过冒烟,只做代码审查)
  • pip install -e . 已装(agent.llm.openrouter 等可 import)

Spec 修正(已核实): 评估 rubric 固化在 llm_evaluate_sourceseval_prompt_template.md(见 search_and_evaluate.py:424 注释),不需要复制 eval_prompt.md 到 prompts/。prompts/ 只放工序、工具两个解构 prompt。


既有代码事实(执行者必读,均已核实)

事实 位置
MySQL 连接模式:pymysql + DictCursor + autocommit,_enabled() 检查 MYSQL_HOST examples/process_pipeline/script/search_eval/fixed_query_eval/db.py:32-47
search_all(platforms, queries, max_count, max_concurrent, query_overrides=None) 返回 source 列表(case_id/platform/source_url/post/found_by_queries) search_and_evaluate.py:212
evaluate_posts(sources, requirement, llm_call, model, max_concurrent, include_images, max_images, image_mode, query) 返回 (sources, total_cost),评估挂在 source["llm_evaluation"] search_and_evaluate.py:403
transcribe_video_posts(sources, concurrency)build_query_overrides(platforms, phrasings, llm, model)_attach_image_refs(sources, max_images, concurrency, mode) search_and_evaluate.py
build_eval_llm_call(choice) 返回 (llm_call, model_id);EVAL_MODELS/DEFAULT_EVAL_MODEL;_format_post_for_eval(source);_evaluate_one examples/process_pipeline/script/llm_evaluate_sources.py
call_llm_with_retry(llm_call, messages, model, temperature, max_tokens, validate_fn, task_name) 返回 (data_dict, cost),只认 JSON 对象 examples/process_pipeline/script/llm_helper.py
任意 OpenRouter 模型:from agent.llm.openrouter import create_openrouter_llm_call; llm_call = create_openrouter_llm_call(model=...) tool_extract.py:121-122
工具解构返回 {"tools":[{工具名称,实质作用域,形式作用域,创作层级,来源链接,输入,输出,用法,案例,缺点,最新更新时间}]} fixed_query_eval/prompts/tool_extract_system.md
工序解构返回 {"source":{...},"procedures":[{id,name,purpose,category,declarations,type_registry,steps:[{id,kind,via,effect,action,substance,form,directive,intent,inputs,outputs}]}]} mode_procedure/mode-dsl/prompts/procedure_extract_system.md
工序解构的图片走 base64(绕防盗链),_detect_image_mime/_fetch_data_url/_collect_images/_sanitize_workflow 可整段复制 mode_procedure/mode-dsl/procedure_model_extract.py:34-126
judged_matrix:{actions:[{name,l1,l2}×27], types:[{name,l1,l2}×50], matrix:[27行×50格,每格{tier,r,q}], stats:{valid:643}},有效节点 = tier≥1 search_eval/evaluation/judged_matrix.json
历史搜索数据:fixed_query_eval/runs_full/q0000-q0003/form_A.json,顶层 {form,query,original_q,platforms,total,failed,results:[...]} 同目录
仓库无 pytest 体系(CLAUDE.md),验证用独立命令/冒烟脚本 CLAUDE.md

路径深度:examples/mode_workflow/x.pyPROJECT_ROOT = Path(__file__).resolve().parents[2];examples/mode_workflow/pipeline/x.pyparents[3]


Task 1: 脚手架与资产同步

Files:

  • Create: examples/mode_workflow/.gitignore
  • Create: examples/mode_workflow/prompts/(拷贝 2 个 prompt)
  • Create: examples/mode_workflow/reference/judged_matrix.json(拷贝)
  • Create: examples/mode_workflow/pipeline/examples/mode_workflow/runs/ 目录

  • [ ] Step 1: 建目录并同步资产

cd /Users/max_liu/max_liu/company/Agent
SE=examples/process_pipeline/script/search_eval
mkdir -p examples/mode_workflow/{pipeline,prompts,reference,runs}
cp "$SE/mode_procedure/mode-dsl/prompts/procedure_extract_system.md" examples/mode_workflow/prompts/
cp "$SE/fixed_query_eval/prompts/tool_extract_system.md" examples/mode_workflow/prompts/
cp "$SE/evaluation/judged_matrix.json" examples/mode_workflow/reference/
  • Step 2: 写 .gitignore

examples/mode_workflow/.gitignore:

runs/
__pycache__/
*.pyc
  • Step 3: 验证
ls examples/mode_workflow/prompts/ examples/mode_workflow/reference/
python3 -c "import json; d=json.load(open('examples/mode_workflow/reference/judged_matrix.json')); print(d['stats'])"

Expected: 两个 prompt 文件名 + judged_matrix.json;stats 行含 'valid': 643

  • Step 4: Commit
git add examples/mode_workflow
git commit -m "feat(mode_workflow): 脚手架与 prompt/内容树资产同步"

Task 2: db.py — 三表 DDL + 读写层

Files:

  • Create: examples/mode_workflow/db.py

设计要点:

  • 风格沿用 fixed_query_eval/db.py(pymysql/DictCursor/autocommit),但本系统 DB 是主存储:写失败要 raise(由调用方决定怎么处理),不再静默吞掉。读侧保留防御。
  • mode_process/mode_tools 写入语义:删掉 (case_id, version) 同版本旧行再插(同版本重跑幂等,跨版本保留历史)。
  • cost_usd/duration_s每次解构调用的值,同一 (case_id, version) 的多行重复存同一个值;聚合时必须按 (case_id, version) 去重(见 Task 7)。

  • [ ] Step 1: 写 examples/mode_workflow/db.py(完整文件)

# -*- coding: utf-8 -*-
"""mode_workflow · MySQL 持久化(DB 为唯一事实源)
================================================================================
读 .env 的 MYSQL_* 连接 MySQL。三张表:
  search_data  —— 每行一个 (query, 帖子):搜索 + llm 评估结果
  mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列)
  mode_tools   —— 每行一个解构出的工具

与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise,
不做"失败不阻断"。读侧保留防御(返回空/None)。

用法:
  python db.py init    # 建表(幂等)
  python db.py check   # 打印三表行数
"""
import json
import os
import sys
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))

from dotenv import load_dotenv
load_dotenv()

import pymysql
from pymysql.cursors import DictCursor


def _conn():
    if not os.getenv("MYSQL_HOST"):
        raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置")
    return pymysql.connect(
        host=os.getenv("MYSQL_HOST"),
        port=int(os.getenv("MYSQL_PORT", 3306)),
        user=os.getenv("MYSQL_USER"),
        password=os.getenv("MYSQL_PASSWORD"),
        database=os.getenv("MYSQL_DATABASE"),
        charset="utf8mb4", cursorclass=DictCursor,
        autocommit=True, connect_timeout=10,
    )


# ── DDL ──────────────────────────────────────────────────────────────────────

DDL_SEARCH = """
CREATE TABLE IF NOT EXISTS search_data (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  query_id      VARCHAR(32)   NOT NULL COMMENT 'q0000',
  query_text    VARCHAR(512)  NULL,
  case_id       VARCHAR(128)  NOT NULL COMMENT 'platform_channelContentId',
  platform      VARCHAR(32)   NULL,
  channel_content_id VARCHAR(128) NULL,
  title         VARCHAR(512)  NULL,
  url           VARCHAR(1024) NULL,
  content_type  VARCHAR(32)   NULL,
  body          LONGTEXT      NULL,
  images        JSON          NULL,
  videos        JSON          NULL,
  like_count    INT           NULL,
  publish_time  VARCHAR(64)   NULL,
  quality_score FLOAT         NULL COMMENT 'post._quality_score',
  quality_grade VARCHAR(8)    NULL,
  found_by      JSON          NULL COMMENT '命中的措辞数组',
  knowledge_type JSON         NULL COMMENT '["能力","工序","工具"] 子集',
  overall_score FLOAT         NULL COMMENT '(相关均值+质量均值)/2',
  llm_evaluation JSON         NULL COMMENT '评估全量 blob',
  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  UNIQUE KEY uk_qid_case (query_id, case_id),
  KEY idx_platform (platform)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果';
"""

DDL_PROCESS = """
CREATE TABLE IF NOT EXISTS mode_process (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  query_id      VARCHAR(32)   NOT NULL,
  case_id       VARCHAR(128)  NOT NULL,
  platform      VARCHAR(32)   NULL,
  post_title    VARCHAR(512)  NULL,
  source        JSON          NULL COMMENT '解构返回的 source 块',
  procedure_id  VARCHAR(16)   NULL COMMENT 'p1,p2…',
  name          VARCHAR(255)  NULL,
  purpose       TEXT          NULL,
  category      VARCHAR(32)   NULL COMMENT '产物创造/资产建设/自动化/分析/学习',
  declarations  JSON          NULL,
  type_registry JSON          NULL,
  steps         JSON          NULL COMMENT '步骤数组全量',
  step_count    INT           NULL,
  tools_used    JSON          NULL COMMENT '从 steps[].via 去重提取',
  model         VARCHAR(64)   NULL,
  version       VARCHAR(16)   NULL COMMENT 'v_MMDDHHMM,保留历史',
  cost_usd      DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
  duration_s    FLOAT         NULL,
  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  KEY idx_case_ver (case_id, version),
  KEY idx_qid (query_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
"""

DDL_TOOLS = """
CREATE TABLE IF NOT EXISTS mode_tools (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  query_id      VARCHAR(32)   NOT NULL,
  case_id       VARCHAR(128)  NOT NULL,
  platform      VARCHAR(32)   NULL,
  post_title    VARCHAR(512)  NULL,
  tool_name     VARCHAR(255)  NULL,
  substance_scope JSON        NULL COMMENT '实质作用域(数组)',
  form_scope    JSON          NULL COMMENT '形式作用域(数组或null)',
  creation_layer VARCHAR(32)  NULL COMMENT '制作层/创作层',
  source_link   VARCHAR(1024) NULL,
  input_desc    TEXT          NULL,
  output_desc   TEXT          NULL,
  usage_json    JSON          NULL,
  cases_json    JSON          NULL,
  defects_json  JSON          NULL,
  updated_time  VARCHAR(64)   NULL COMMENT '工具最新更新时间',
  model         VARCHAR(64)   NULL,
  version       VARCHAR(16)   NULL,
  cost_usd      DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
  duration_s    FLOAT         NULL,
  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  KEY idx_case_ver (case_id, version),
  KEY idx_qid (query_id),
  KEY idx_tool_name (tool_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
"""


def init_tables():
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute(DDL_SEARCH)
            cur.execute(DDL_PROCESS)
            cur.execute(DDL_TOOLS)
        print("✅ 建表完成:search_data, mode_process, mode_tools")
    finally:
        conn.close()


# ── 工具函数 ──────────────────────────────────────────────────────────────────

def _loads(v, default=None):
    """pymysql 的 JSON 列可能返回字符串,统一解析。"""
    if v is None:
        return default
    if isinstance(v, (list, dict)):
        return v
    try:
        return json.loads(v)
    except Exception:
        return default


def _j(v):
    """写入 JSON 列:None 保持 NULL,其余 dumps。"""
    return None if v is None else json.dumps(v, ensure_ascii=False)


def _collect_scores(node):
    """递归收集嵌套评估里所有数值「得分」。"""
    out = []
    if isinstance(node, dict):
        for k, v in node.items():
            if k == "得分" and isinstance(v, (int, float)):
                out.append(float(v))
            else:
                out.extend(_collect_scores(v))
    elif isinstance(node, list):
        for v in node:
            out.extend(_collect_scores(v))
    return out


def overall_score(e):
    """综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。"""
    parts = []
    for key in ("相关性", "质量"):
        scores = _collect_scores((e or {}).get(key))
        if scores:
            parts.append(sum(scores) / len(scores))
    return round(sum(parts) / len(parts), 2) if parts else None


# ── search_data ──────────────────────────────────────────────────────────────

def upsert_search_posts(query_id, query_text, results):
    """一组搜索结果写入 search_data(按 (query_id, case_id) upsert)。返回写入条数。"""
    if not results:
        return 0
    rows = []
    for r in results:
        post = r.get("post") or {}
        e = r.get("llm_evaluation") or {}
        rows.append((
            query_id, query_text, r.get("case_id"), r.get("platform"),
            r.get("channel_content_id"),
            (post.get("title") or post.get("desc") or "")[:500],
            r.get("source_url"), post.get("content_type"),
            post.get("body_text") or post.get("desc") or "",
            _j(post.get("images") or []), _j(post.get("videos") or []),
            post.get("like_count"),
            str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
            post.get("_quality_score"), post.get("_quality_grade"),
            _j(r.get("found_by_queries") or []),
            _j(e.get("知识类型") or []),
            overall_score(e),
            _j(e),
        ))
    sql = """
    INSERT INTO search_data
      (query_id, query_text, case_id, platform, channel_content_id, title, url,
       content_type, body, images, videos, like_count, publish_time,
       quality_score, quality_grade, found_by, knowledge_type, overall_score, llm_evaluation)
    VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
    ON DUPLICATE KEY UPDATE
      query_text=VALUES(query_text), platform=VALUES(platform),
      channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
      content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images),
      videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time),
      quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade),
      found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type),
      overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation);
    """
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.executemany(sql, rows)
        return len(rows)
    finally:
        conn.close()


def fetch_queries():
    """query 列表 + 帖子数 + 解构进度。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("""SELECT query_id, MAX(query_text) AS query_text,
                                  COUNT(*) AS post_count
                           FROM search_data GROUP BY query_id ORDER BY query_id""")
            queries = cur.fetchall()
            cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
            np = {r["query_id"]: r["n"] for r in cur.fetchall()}
            cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id")
            nt = {r["query_id"]: r["n"] for r in cur.fetchall()}
    finally:
        conn.close()
    for q in queries:
        q["process_done"] = np.get(q["query_id"], 0)
        q["tools_done"] = nt.get(q["query_id"], 0)
    return queries


def fetch_posts(query_id):
    """某 query 下全部帖子(JSON 列已解析),带 has_process/has_tools 标记。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("""SELECT * FROM search_data WHERE query_id=%s
                           ORDER BY overall_score DESC, id""", (query_id,))
            rows = cur.fetchall()
            cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,))
            hp = {r["case_id"] for r in cur.fetchall()}
            cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,))
            ht = {r["case_id"] for r in cur.fetchall()}
    finally:
        conn.close()
    for r in rows:
        for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
            r[col] = _loads(r[col])
        r["has_process"] = r["case_id"] in hp
        r["has_tools"] = r["case_id"] in ht
        r.pop("created_at", None); r.pop("updated_at", None)
    return rows


def fetch_post(query_id, case_id):
    """单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("SELECT * FROM search_data WHERE query_id=%s AND case_id=%s",
                        (query_id, case_id))
            row = cur.fetchone()
    finally:
        conn.close()
    if not row:
        return None
    for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
        row[col] = _loads(row[col])
    return row


# ── mode_process ─────────────────────────────────────────────────────────────

def replace_process(query_id, case_id, platform, post_title, payload,
                    model, version, cost_usd, duration_s):
    """写入一帖某版本的工序解构结果(payload = {source, procedures})。
    删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。"""
    source = payload.get("source")
    procedures = payload.get("procedures") or []
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
                        (case_id, version))
            if procedures:
                rows = []
                for p in procedures:
                    steps = p.get("steps") or []
                    vias = []
                    for s in steps:
                        v = s.get("via")
                        if v and v not in vias:
                            vias.append(v)
                    rows.append((
                        query_id, case_id, platform, (post_title or "")[:500],
                        _j(source), p.get("id"), (p.get("name") or "")[:250],
                        p.get("purpose"), p.get("category"),
                        _j(p.get("declarations")), _j(p.get("type_registry")),
                        _j(steps), len(steps), _j(vias),
                        model, version, cost_usd, duration_s,
                    ))
                cur.executemany("""
                INSERT INTO mode_process
                  (query_id, case_id, platform, post_title, source, procedure_id, name,
                   purpose, category, declarations, type_registry, steps, step_count,
                   tools_used, model, version, cost_usd, duration_s)
                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
                """, rows)
        return len(procedures)
    finally:
        conn.close()


def fetch_process_versions(case_id):
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
                           FROM mode_process WHERE case_id=%s
                           GROUP BY version ORDER BY version DESC""", (case_id,))
            return cur.fetchall()
    finally:
        conn.close()


def fetch_process(case_id, version=None):
    """重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            if version is None:
                cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
                               ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
                row = cur.fetchone()
                if not row:
                    return None
                version = row["version"]
            cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s
                           ORDER BY id""", (case_id, version))
            rows = cur.fetchall()
    finally:
        conn.close()
    if not rows:
        return None
    procedures = [{
        "id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"],
        "category": r["category"], "declarations": _loads(r["declarations"]),
        "type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []),
        "tools_used": _loads(r["tools_used"], []),
    } for r in rows]
    return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
            "title": rows[0]["post_title"], "model": rows[0]["model"],
            "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
            "duration_s": rows[0]["duration_s"],
            "source": _loads(rows[0]["source"]), "procedures": procedures}


# ── mode_tools ───────────────────────────────────────────────────────────────

def replace_tools(query_id, case_id, platform, post_title, tools,
                  model, version, cost_usd, duration_s):
    """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
                        (case_id, version))
            if tools:
                rows = [(
                    query_id, case_id, platform, (post_title or "")[:500],
                    (t.get("工具名称") or "")[:250],
                    _j(t.get("实质作用域")), _j(t.get("形式作用域")),
                    t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
                    _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
                    t.get("最新更新时间"), model, version, cost_usd, duration_s,
                ) for t in tools]
                cur.executemany("""
                INSERT INTO mode_tools
                  (query_id, case_id, platform, post_title, tool_name, substance_scope,
                   form_scope, creation_layer, source_link, input_desc, output_desc,
                   usage_json, cases_json, defects_json, updated_time, model, version,
                   cost_usd, duration_s)
                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
                """, rows)
        return len(tools)
    finally:
        conn.close()


def fetch_tools_versions(case_id):
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
                           FROM mode_tools WHERE case_id=%s
                           GROUP BY version ORDER BY version DESC""", (case_id,))
            return cur.fetchall()
    finally:
        conn.close()


def fetch_tools(case_id, version=None):
    """重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            if version is None:
                cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
                               ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
                row = cur.fetchone()
                if not row:
                    return None
                version = row["version"]
            cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s
                           ORDER BY id""", (case_id, version))
            rows = cur.fetchall()
    finally:
        conn.close()
    if not rows:
        return None
    tools = [{
        "工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]),
        "形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"],
        "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
        "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
        "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
    } for r in rows]
    return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
            "title": rows[0]["post_title"], "model": rows[0]["model"],
            "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
            "duration_s": rows[0]["duration_s"],
            "tool_count": len(tools), "tools": tools}


# ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────

def fetch_dashboard_rows():
    """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。"""
    conn = _conn()
    try:
        with conn.cursor() as cur:
            cur.execute("SELECT query_id, case_id, knowledge_type FROM search_data")
            posts = cur.fetchall()
            cur.execute("""SELECT case_id, version, steps, tools_used, cost_usd,
                                  duration_s, created_at FROM mode_process""")
            procs = cur.fetchall()
            cur.execute("""SELECT case_id, version, tool_name, substance_scope,
                                  form_scope, cost_usd, duration_s, created_at
                           FROM mode_tools""")
            tools = cur.fetchall()
    finally:
        conn.close()
    for p in posts:
        p["knowledge_type"] = _loads(p["knowledge_type"], [])
    for r in procs:
        r["steps"] = _loads(r["steps"], [])
        r["tools_used"] = _loads(r["tools_used"], [])
        r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
        r["created_at"] = str(r["created_at"]) if r["created_at"] else None
    for r in tools:
        r["substance_scope"] = _loads(r["substance_scope"], [])
        r["form_scope"] = _loads(r["form_scope"], [])
        r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
        r["created_at"] = str(r["created_at"]) if r["created_at"] else None
    return posts, procs, tools


def check():
    conn = _conn()
    try:
        with conn.cursor() as cur:
            for t in ("search_data", "mode_process", "mode_tools"):
                cur.execute(f"SELECT COUNT(*) AS n FROM {t}")
                print(f"{t}: {cur.fetchone()['n']} 行")
    finally:
        conn.close()


if __name__ == "__main__":
    cmd = sys.argv[1] if len(sys.argv) > 1 else ""
    if cmd == "init":
        init_tables()
    elif cmd == "check":
        check()
    else:
        print("用法:\n  python db.py init    # 建表\n  python db.py check   # 三表行数")
  • Step 2: 建表并验证
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python db.py init && python db.py check

Expected: ✅ 建表完成:search_data, mode_process, mode_tools,三表各 0 行。失败提示缺 MYSQL_HOST 则确认 .env

  • Step 3: 单元自检(overall_score)
python3 -c "
import db
e = {'相关性': {'和内容制作知识相关': {'得分': 8}, '和 query 相关': {'得分': 6}},
     '质量': {'固定维度': {'热度性': {'得分': 4}}}}
s = db.overall_score(e)
assert s == round((7.0 + 4.0) / 2, 2), s
assert db.overall_score({}) is None
print('overall_score OK:', s)
"

Expected: overall_score OK: 5.5

  • Step 4: Commit
git add examples/mode_workflow/db.py
git commit -m "feat(mode_workflow): 三表 DDL 与 MySQL 读写层"

Task 3: import_history.py — 历史搜索结果入库

Files:

  • Create: examples/mode_workflow/import_history.py

  • [ ] Step 1: 写完整文件

# -*- coding: utf-8 -*-
"""一次性导入:fixed_query_eval/runs_full/*/form_A.json → search_data 表。
幂等(upsert),可反复执行。

用法:
  python import_history.py
  python import_history.py --runs-dir /path/to/runs_full
"""
import argparse
import json
import sys
from pathlib import Path

HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(HERE))
import db

DEFAULT_RUNS = (HERE.parent / "process_pipeline" / "script" / "search_eval"
                / "fixed_query_eval" / "runs_full")


def main():
    p = argparse.ArgumentParser(description="历史搜索结果导入 search_data")
    p.add_argument("--runs-dir", default=str(DEFAULT_RUNS))
    args = p.parse_args()

    runs = Path(args.runs_dir)
    files = sorted(runs.glob("q*/form_A.json"))
    if not files:
        print(f"❌ {runs} 下没有 q*/form_A.json"); return 1

    total = 0
    for f in files:
        data = json.loads(f.read_text(encoding="utf-8"))
        qid = f.parent.name
        results = data.get("results", [])
        n = db.upsert_search_posts(qid, data.get("query") or data.get("original_q"), results)
        print(f"  {qid}: 文件 {len(results)} 条 → 入库 {n} 条")
        total += n
    print(f"✅ 共导入 {total} 条 → search_data")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
  • Step 2: 执行并核对行数
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python import_history.py
python db.py check
python3 -c "
import json, glob
n = sum(len(json.load(open(f))['results'])
        for f in glob.glob('../process_pipeline/script/search_eval/fixed_query_eval/runs_full/q*/form_A.json'))
print('JSON 总条数:', n)
"

Expected: 每个 q 打一行;search_data 行数 == JSON 总条数(q0000-q0003)。再跑一次 python import_history.py,行数不变(幂等)。

  • Step 3: Commit
git add examples/mode_workflow/import_history.py
git commit -m "feat(mode_workflow): 历史搜索结果导入脚本"

Task 4: pipeline/tool_extract.py — 工具解构(读库→LLM→写库)

Files:

  • Create: examples/mode_workflow/pipeline/tool_extract.py

与旧版差异: 帖子源从 search_data 表读(不再读 form_A.json);结果直接写 mode_tools(不再写本地 json 为主),runs/tools/ 只留调试副本;记录 cost/duration。

  • Step 1: 写完整文件
# -*- coding: utf-8 -*-
"""工具解构 · search_data 帖子 → 结构化工具条目 → mode_tools 表
================================================================================
- 帖子源:search_data 表(--query-id + --case-ids 定位)
- 模型默认 google/gemini-3.1-flash-lite,可 --model 传任意 OpenRouter id
- 多模态:复用 search_and_evaluate._attach_image_refs(URL 直传)
- 写库:db.replace_tools(同版本幂等,跨版本保留);runs/tools/ 留调试副本

用法(一般由 server.py 起子进程调):
  python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc,gzh_def
  python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc --model anthropic/claude-sonnet-4-6
"""
import argparse
import asyncio
import json
import sys
import time
from datetime import datetime
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
sys.path.insert(0, str(PROJECT_ROOT))

from dotenv import load_dotenv
load_dotenv()

from examples.process_pipeline.script.search_eval.search_and_evaluate import _attach_image_refs
from examples.process_pipeline.script.llm_evaluate_sources import _format_post_for_eval, build_eval_llm_call
from examples.process_pipeline.script.llm_helper import call_llm_with_retry

HERE = Path(__file__).resolve().parent          # pipeline/
MW = HERE.parent                                 # mode_workflow/
sys.path.insert(0, str(MW))
import db

TOOL_SYSTEM = (MW / "prompts" / "tool_extract_system.md").read_text(encoding="utf-8")
DEFAULT_MODEL_CHOICE = "gemini-flash-lite"
MAX_IMAGES = 6


def _row_to_source(row):
    """search_data 行 → 引擎函数认的 source dict。"""
    return {
        "case_id": row["case_id"], "platform": row["platform"],
        "channel_content_id": row["channel_content_id"],
        "source_url": row["url"],
        "post": {
            "title": row["title"], "body_text": row["body"],
            "images": row["images"] or [], "like_count": row["like_count"],
            "publish_timestamp": row["publish_time"], "link": row["url"],
        },
    }


def _validate_tools(data):
    if not isinstance(data, dict) or "tools" not in data:
        return '缺少顶层 "tools" 字段'
    if not isinstance(data["tools"], list):
        return '"tools" 必须是数组'
    return None


async def extract_one(source, llm_call, model):
    """对一条 source 抽工具,返回 (tools_list, cost)。失败返回 ([], cost)。"""
    user_text = "【内容】\n" + _format_post_for_eval(source)
    image_urls = source.get("_image_data_urls") or None
    if image_urls:
        user_content = [{"type": "text", "text": user_text}]
        for u in image_urls:
            user_content.append({"type": "image_url", "image_url": {"url": u}})
        messages = [{"role": "system", "content": TOOL_SYSTEM},
                    {"role": "user", "content": user_content}]
    else:
        messages = [{"role": "system", "content": TOOL_SYSTEM},
                    {"role": "user", "content": user_text}]
    data, cost = await call_llm_with_retry(
        llm_call=llm_call, messages=messages, model=model,
        temperature=0.1, max_tokens=4000,
        validate_fn=_validate_tools,
        task_name=f"ToolExtract[{source.get('case_id', '?')}]",
    )
    if not data:
        return [], cost
    return data.get("tools", []), cost


async def run(args):
    qid = args.query_id
    case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
    sources = []
    for cid in case_ids:
        row = db.fetch_post(qid, cid)
        if row is None:
            print(f"⚠️ {qid}/{cid} 不在 search_data,跳过")
            continue
        sources.append(_row_to_source(row))
    if not sources:
        print("❌ 没有可解构的帖子"); return 1

    if args.model and "/" in args.model:
        from agent.llm.openrouter import create_openrouter_llm_call
        llm_call, model_id = create_openrouter_llm_call(model=args.model), args.model
    else:
        llm_call, model_id = build_eval_llm_call(args.model or DEFAULT_MODEL_CHOICE)
    version = args.version or ("v_" + datetime.now().strftime("%m%d%H%M"))
    print(f"🔧 工具解构 {len(sources)} 帖 · 模型 {model_id} · 版本 {version}")

    await _attach_image_refs(sources, MAX_IMAGES, max(2, args.max_concurrent * 2), "url")
    sem = asyncio.Semaphore(args.max_concurrent)
    out_dir = MW / "runs" / "tools"
    out_dir.mkdir(parents=True, exist_ok=True)

    async def _work(s):
        t0 = time.monotonic()
        async with sem:
            tools, cost = await extract_one(s, llm_call, model_id)
        dur = round(time.monotonic() - t0, 1)
        n = db.replace_tools(qid, s["case_id"], s.get("platform"),
                             (s.get("post") or {}).get("title", ""),
                             tools, model_id, version, cost, dur)
        (out_dir / f"{s['case_id']}_{version}.json").write_text(json.dumps({
            "case_id": s["case_id"], "version": version, "model": model_id,
            "cost_usd": cost, "duration_s": dur, "tools": tools,
        }, ensure_ascii=False, indent=2), encoding="utf-8")
        print(f"   ✅ {s['case_id']} → {n} 个工具 · ${cost:.4f} · {dur}s")
        return cost

    costs = await asyncio.gather(*[_work(s) for s in sources])
    print(f"\n📊 完成 {len(sources)} 帖 · 总成本 ${sum(costs):.4f}")
    return 0


def main():
    p = argparse.ArgumentParser(description="工具解构:search_data 帖子 → mode_tools")
    p.add_argument("--query-id", required=True)
    p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
    p.add_argument("--model", default=None, help="默认 gemini-flash-lite,可传 OpenRouter id")
    p.add_argument("--max-concurrent", type=int, default=3)
    p.add_argument("--version", default=None, help="默认自动 v_月日时分")
    args = p.parse_args()
    raise SystemExit(asyncio.run(run(args)))


if __name__ == "__main__":
    main()
  • Step 2: 冒烟(需 OPEN_ROUTER_API_KEY;无 key 跳过本步)

先从库里挑一个真实 case_id:

cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
CID=$(python3 -c "import db; print(db.fetch_posts('q0000')[0]['case_id'])")
python pipeline/tool_extract.py --query-id q0000 --case-ids "$CID"
python3 -c "import db; r=db.fetch_tools('$CID'); print(r['version'], r['tool_count'], r['cost_usd'], r['duration_s'])"

Expected: 解构日志含 ✅ <case_id> → N 个工具;fetch_tools 打出版本号、工具数 ≥0、cost/duration 非 None。

  • Step 3: Commit
git add examples/mode_workflow/pipeline/tool_extract.py
git commit -m "feat(mode_workflow): 工具解构 pipeline(读库→LLM→mode_tools)"

Task 5: pipeline/procedure_extract.py — 工序解构

Files:

  • Create: examples/mode_workflow/pipeline/procedure_extract.py

与旧版差异: 帖子源从库读;不渲染 HTML(前端直接渲染 DB 数据);写 mode_process;支持一次多帖。_detect_image_mime/_fetch_data_url/_collect_images/_sanitize_workflowprocedure_model_extract.py:34-126 原样复制(已验证可用,不重新发明)。

  • Step 1: 写完整文件
# -*- coding: utf-8 -*-
"""工序解构 · search_data 帖子 → workflow JSON → mode_process 表
================================================================================
单次大模型直出(无 agent / 无 validate 循环),prompt 见 prompts/procedure_extract_system.md。
配图下载转 base64(绕防盗链)随文本一起发。结果按工序拆行写 mode_process。

用法(一般由 server.py 起子进程调):
  python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc
  python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc --model google/gemini-3.1-flash-lite
"""
import argparse
import asyncio
import base64
import json
import sys
import time
from datetime import datetime
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
sys.path.insert(0, str(PROJECT_ROOT))

from dotenv import load_dotenv
load_dotenv()

from examples.process_pipeline.script.llm_helper import call_llm_with_retry

HERE = Path(__file__).resolve().parent
MW = HERE.parent
sys.path.insert(0, str(MW))
import db

PROMPT_FILE = MW / "prompts" / "procedure_extract_system.md"
DEFAULT_MODEL = "anthropic/claude-sonnet-4-6"
MAX_IMAGES = 8


# ── 以下 4 个助手原样取自 mode_procedure/mode-dsl/procedure_model_extract.py ──

def _detect_image_mime(data: bytes):
    if not data or len(data) < 12:
        return None
    if data[:3] == b"\xff\xd8\xff":
        return "image/jpeg"
    if data[:8] == b"\x89PNG\r\n\x1a\n":
        return "image/png"
    if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
        return "image/webp"
    if data[:6] in (b"GIF87a", b"GIF89a"):
        return "image/gif"
    return None


async def _fetch_data_url(url, sem):
    from agent.tools.builtin.file.image_cdn import _download_image
    async with sem:
        try:
            data = await _download_image(url)
        except Exception:
            return None
    mime = _detect_image_mime(data)
    if mime is None:
        return None
    return f"data:{mime};base64,{base64.b64encode(data).decode()}"


async def _collect_images(urls, max_images, concurrency):
    urls = [u for u in urls if isinstance(u, str) and u][:max_images]
    if not urls:
        return []
    sem = asyncio.Semaphore(concurrency)
    results = await asyncio.gather(*[_fetch_data_url(u, sem) for u in urls])
    return [d for d in results if d]


def _validate_wf(data):
    if not isinstance(data, dict):
        return "顶层必须是 JSON 对象"
    if "procedures" not in data:
        return '缺少 "procedures" 字段'
    if not isinstance(data["procedures"], list):
        return '"procedures" 必须是数组'
    return None


def _sanitize_workflow(data):
    dropped = {"procedures": 0, "steps": 0, "io": 0}
    procs = data.get("procedures")
    if not isinstance(procs, list):
        return data, dropped
    clean_procs = []
    for p in procs:
        if not isinstance(p, dict):
            dropped["procedures"] += 1
            continue
        steps = p.get("steps")
        if isinstance(steps, list):
            kept = []
            for s in steps:
                if not isinstance(s, dict):
                    dropped["steps"] += 1
                    continue
                for io in ("inputs", "outputs"):
                    if isinstance(s.get(io), list):
                        before = len(s[io])
                        s[io] = [x for x in s[io] if isinstance(x, dict)]
                        dropped["io"] += before - len(s[io])
                kept.append(s)
            p["steps"] = kept
        if not isinstance(p.get("declarations"), dict):
            p.pop("declarations", None)
        if not isinstance(p.get("type_registry"), dict):
            p.pop("type_registry", None)
        clean_procs.append(p)
    data["procedures"] = clean_procs
    return data, dropped

# ── 助手复制结束 ──────────────────────────────────────────────────────────────


async def extract_one(row, system, llm_call, model, args):
    """单帖工序解构 → 写 mode_process。返回 cost。"""
    cid = row["case_id"]
    t0 = time.monotonic()
    post_text = (f"【标题】{row['title'] or ''}\n【来源】{row['url'] or ''}\n"
                 f"【正文】\n{row['body'] or ''}")
    data_urls = [] if args.no_images else await _collect_images(
        row["images"] or [], args.max_images, args.max_concurrent)
    print(f"🖼️  {cid} 配图 {len(data_urls)}/{len(row['images'] or [])} 张")

    if data_urls:
        user_content = [{"type": "text", "text": post_text}]
        for u in data_urls:
            user_content.append({"type": "image_url", "image_url": {"url": u}})
        messages = [{"role": "system", "content": system},
                    {"role": "user", "content": user_content}]
    else:
        messages = [{"role": "system", "content": system},
                    {"role": "user", "content": post_text}]

    data, cost = await call_llm_with_retry(
        llm_call=llm_call, messages=messages, model=model,
        temperature=0.2, max_tokens=args.max_tokens,
        validate_fn=_validate_wf, task_name=f"ProcExtract[{cid}]",
    )
    if not data:
        print(f"❌ {cid} 解构失败(重试耗尽)")
        return cost

    data, dropped = _sanitize_workflow(data)
    if any(dropped.values()):
        print(f"🧹 {cid} 清洗:丢弃 procedure {dropped['procedures']} / "
              f"step {dropped['steps']} / io {dropped['io']}")

    dur = round(time.monotonic() - t0, 1)
    n = db.replace_process(args.query_id, cid, row["platform"], row["title"],
                           data, model, args.version, cost, dur)
    out_dir = MW / "runs" / "procedures"
    out_dir.mkdir(parents=True, exist_ok=True)
    (out_dir / f"{cid}_{args.version}.json").write_text(
        json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"   ✅ {cid} → {n} 个工序 · ${cost:.4f} · {dur}s")
    return cost


async def run(args):
    case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
    rows = []
    for cid in case_ids:
        row = db.fetch_post(args.query_id, cid)
        if row is None:
            print(f"⚠️ {args.query_id}/{cid} 不在 search_data,跳过")
            continue
        rows.append(row)
    if not rows:
        print("❌ 没有可解构的帖子"); return 1

    system = PROMPT_FILE.read_text(encoding="utf-8")
    from agent.llm.openrouter import create_openrouter_llm_call
    llm_call = create_openrouter_llm_call(model=args.model)
    args.version = args.version or ("v_" + datetime.now().strftime("%m%d%H%M"))
    print(f"🤖 工序解构 {len(rows)} 帖 · 模型 {args.model} · 版本 {args.version}")

    costs = []
    for row in rows:   # 工序解构 token 重,串行跑,避免 OpenRouter 限流
        costs.append(await extract_one(row, system, llm_call, args.model, args))
    print(f"\n📊 完成 {len(rows)} 帖 · 总成本 ${sum(costs):.4f}")
    return 0


def main():
    p = argparse.ArgumentParser(description="工序解构:search_data 帖子 → mode_process")
    p.add_argument("--query-id", required=True)
    p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
    p.add_argument("--model", default=DEFAULT_MODEL)
    p.add_argument("--version", default=None, help="默认自动 v_月日时分")
    p.add_argument("--max-images", type=int, default=MAX_IMAGES)
    p.add_argument("--max-concurrent", type=int, default=4)
    p.add_argument("--max-tokens", type=int, default=8000)
    p.add_argument("--no-images", action="store_true")
    args = p.parse_args()
    raise SystemExit(asyncio.run(run(args)))


if __name__ == "__main__":
    main()
  • Step 2: 冒烟(需 API key;工序解构默认模型是 claude-sonnet,单帖约 $0.03-0.1;省钱可加 --model google/gemini-3.1-flash-lite)
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
CID=$(python3 -c "import db; print(db.fetch_posts('q0000')[0]['case_id'])")
python pipeline/procedure_extract.py --query-id q0000 --case-ids "$CID" --model google/gemini-3.1-flash-lite
python3 -c "
import db
r = db.fetch_process('$CID')
print(r['version'], len(r['procedures']), '个工序')
p = r['procedures'][0]
print('steps:', len(p['steps']), 'tools_used:', p['tools_used'])
"

Expected: ✅ <cid> → N 个工序;fetch_process 返回 version、工序数 ≥1、steps 非空、tools_used 列出 via。

  • Step 3: Commit
git add examples/mode_workflow/pipeline/procedure_extract.py
git commit -m "feat(mode_workflow): 工序解构 pipeline(读库→LLM→mode_process)"

Task 6: pipeline/search_eval.py — 搜索 + 评估入库

Files:

  • Create: examples/mode_workflow/pipeline/search_eval.py

与 run_search.py 的差异: 不写死 4 组 query,接受任意 --query(可带 --synonyms 同义措辞);产出直接入库 search_data,runs/search/ 留调试副本。

  • Step 1: 写完整文件
# -*- coding: utf-8 -*-
"""搜索 + 评估 · 任意 query → 多渠道搜索去重 → LLM 逐帖评估 → search_data 表
================================================================================
引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。

用法(一般由 server.py 起子进程调):
  python pipeline/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做"
  python pipeline/search_eval.py --query-id q0005 --query "GPT image2 评测" \
      --synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
sys.path.insert(0, str(PROJECT_ROOT))

from dotenv import load_dotenv
load_dotenv()

from examples.process_pipeline.script.search_eval.search_and_evaluate import (
    search_all, evaluate_posts, transcribe_video_posts, build_query_overrides,
)
from examples.process_pipeline.script.llm_evaluate_sources import (
    build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL,
)

HERE = Path(__file__).resolve().parent
MW = HERE.parent
sys.path.insert(0, str(MW))
import db


async def run(args):
    phrasings = [args.query] + [s.strip() for s in (args.synonyms or "").split(",") if s.strip()]
    # 去重保序
    seen, uniq = set(), []
    for q in phrasings:
        if q not in seen:
            seen.add(q); uniq.append(q)
    phrasings = uniq
    platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]

    eval_llm, eval_model_id = build_eval_llm_call(args.eval_model)
    print(f"▶ {args.query_id}  query={args.query!r}  措辞={phrasings}  渠道={platforms}")

    overrides = await build_query_overrides(platforms, phrasings, eval_llm, eval_model_id)
    sources = await search_all(platforms, phrasings, args.max_count,
                               args.max_concurrent, query_overrides=overrides)
    print(f"🔎 去重后 {len(sources)} 帖")
    if not sources:
        print("❌ 搜索无结果"); return 1

    try:
        from examples.process_pipeline.script.extract_sources import _convert_timestamps
        _convert_timestamps(sources)
    except Exception:
        pass

    if not args.no_transcribe:
        n = await transcribe_video_posts(sources, concurrency=args.max_concurrent)
        if n:
            print(f"🎙️  视频转写 {n} 条")

    cost = 0.0
    if not args.no_eval:
        sources, cost = await evaluate_posts(
            sources, "", eval_llm, eval_model_id, args.max_concurrent,
            include_images=not args.no_images, max_images=args.max_images,
            image_mode=args.image_mode, query=args.query,
        )
    for s in sources:
        s.pop("_image_data_urls", None)

    n = db.upsert_search_posts(args.query_id, args.query, sources)
    print(f"🗄️  search_data 入库 {n} 行 · 评估成本 ${cost:.4f}")

    out_dir = MW / "runs" / "search"
    out_dir.mkdir(parents=True, exist_ok=True)
    (out_dir / f"{args.query_id}.json").write_text(json.dumps({
        "query_id": args.query_id, "query": args.query, "phrasings": phrasings,
        "platforms": platforms, "total": len(sources), "results": sources,
    }, ensure_ascii=False, indent=2), encoding="utf-8")
    return 0


def main():
    p = argparse.ArgumentParser(description="搜索+评估 → search_data")
    p.add_argument("--query-id", required=True, help="如 q0004(server 自动分配)")
    p.add_argument("--query", required=True, help="基准 query(评估锚点)")
    p.add_argument("--synonyms", default="", help="逗号分隔的同义措辞(可选)")
    p.add_argument("--platforms", default="xhs,gzh")
    p.add_argument("--max-count", type=int, default=10)
    p.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS))
    p.add_argument("--max-concurrent", type=int, default=3)
    p.add_argument("--max-images", type=int, default=4)
    p.add_argument("--image-mode", choices=["url", "base64"], default="url")
    p.add_argument("--no-transcribe", action="store_true")
    p.add_argument("--no-eval", action="store_true")
    p.add_argument("--no-images", action="store_true")
    args = p.parse_args()
    raise SystemExit(asyncio.run(run(args)))


if __name__ == "__main__":
    main()
  • Step 2: 冒烟(需 API key + 搜索通道可用;成本与时长都高,可只验 import 与参数解析)

最小验证(不花钱):

cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python pipeline/search_eval.py --help

Expected: 正常打出 usage(证明 import 链 OK——这条会触发 agent.tools 注册链,import 错误在这暴露)。

完整冒烟(可选,~1-3 分钟、~$0.05):

python pipeline/search_eval.py --query-id q9999 --query "GPT image2 评测" --platforms xhs --max-count 3
python3 -c "import db; print(len(db.fetch_posts('q9999')), '帖')"

Expected: 入库 ≥1 行。验完可清理:python3 -c "import db; c=db._conn(); c.cursor().execute(\"DELETE FROM search_data WHERE query_id='q9999'\"); c.close()"

  • Step 3: Commit
git add examples/mode_workflow/pipeline/search_eval.py
git commit -m "feat(mode_workflow): 搜索评估 pipeline(任意 query → search_data)"

Task 7: server.py — API + 任务管理 + Dashboard 聚合

Files:

  • Create: examples/mode_workflow/server.py

API 契约(index.html 依赖,Task 8 不得偏离):

Method+Path 入参 返回
GET / - index.html
GET /api/dashboard - 见下方 _dashboard() 返回结构
GET /api/queries - [{query_id, query_text, post_count, process_done, tools_done}]
GET /api/posts query_id [{...search_data 行, has_process, has_tools}]
GET /api/process_versions case_id [{version, n, model}]
GET /api/process case_id, version? {case_id, version, model, source, procedures:[...]} 或 404
GET /api/tools_versions case_id [{version, n, model}]
GET /api/tools case_id, version? {case_id, version, model, tool_count, tools:[...]} 或 404
POST /api/extract_process {query_id, case_ids:[..], model?} {task_id}
POST /api/extract_tools {query_id, case_ids:[..], model?} {task_id}
POST /api/run_search {query, synonyms?, platforms?, max_count?} {task_id, query_id}(query_id 自动分配下一个 qNNNN)
GET /api/task_status task_id {status: running\|done\|failed, log_tail}
  • Step 1: 写完整文件
# -*- coding: utf-8 -*-
"""mode_workflow server · 页面 + API + 解构任务管理
================================================================================
单服务(默认 8772):
  - GET  /                    index.html
  - GET  /api/dashboard       Dashboard 全部聚合指标(含内容树覆盖)
  - GET  /api/queries|posts|process|tools(+_versions)   Dataset 数据
  - POST /api/run_search|extract_process|extract_tools  起子进程跑 pipeline
  - GET  /api/task_status     轮询任务状态(读日志尾部)

用法:python server.py [port]
"""
import json
import subprocess
import sys
import threading
from collections import Counter
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import urlparse, parse_qs

try:
    sys.stdout.reconfigure(encoding="utf-8")
except Exception:
    pass

HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(HERE))
import db

PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772
MATRIX_FILE = HERE / "reference" / "judged_matrix.json"
LOG_DIR = HERE / "runs" / "logs"

# ── 任务管理:task_id → {proc, log, status} ──────────────────────────────────
TASKS = {}
_TASK_LOCK = threading.Lock()


def _spawn_task(kind, cmd):
    LOG_DIR.mkdir(parents=True, exist_ok=True)
    task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}"
    log_path = LOG_DIR / f"{task_id}.log"
    f = open(log_path, "w", encoding="utf-8")
    proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT,
                            cwd=str(HERE), text=True)
    with _TASK_LOCK:
        TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running"}

    def _wait():
        rc = proc.wait()
        f.close()
        with _TASK_LOCK:
            TASKS[task_id]["status"] = "done" if rc == 0 else "failed"

    threading.Thread(target=_wait, daemon=True).start()
    return task_id


def _task_status(task_id):
    with _TASK_LOCK:
        t = TASKS.get(task_id)
    if not t:
        return None
    tail = ""
    try:
        text = t["log"].read_text(encoding="utf-8", errors="replace")
        tail = text[-3000:]
    except Exception:
        pass
    return {"status": t["status"], "log_tail": tail}


def _next_query_id():
    qs = [q["query_id"] for q in db.fetch_queries()]
    nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()]
    return f"q{(max(nums) + 1 if nums else 0):04d}"


# ── Dashboard 聚合 ────────────────────────────────────────────────────────────

def _split_values(v):
    """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。"""
    out = []
    items = v if isinstance(v, list) else [v]
    for it in items:
        if not it or not isinstance(it, str):
            continue
        for piece in it.replace(",", "、").replace("/", "、").split("、"):
            piece = piece.strip()
            if piece:
                out.append(piece)
    return out


def _dashboard():
    posts, procs, tools = db.fetch_dashboard_rows()

    # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)
    def latest(rows):
        best = {}
        for r in rows:
            cid = r["case_id"]
            if cid not in best or (r["version"] or "") > (best[cid][0] or ""):
                best[cid] = (r["version"], [])
        out = []
        for r in rows:
            if r["version"] == best[r["case_id"]][0]:
                out.append(r)
        return out

    latest_procs = latest(procs)
    latest_tools = latest(tools)

    # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1)
    jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8"))
    a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])}
    t_idx = {t["name"]: i for i, t in enumerate(jm["types"])}
    valid = set()
    for ai, row in enumerate(jm["matrix"]):
        for ti, cell in enumerate(row):
            if isinstance(cell, dict) and cell.get("tier", 0) >= 1:
                valid.add((ai, ti))
    covered = set()
    via_counter = Counter()
    substance_counter = Counter()
    form_counter = Counter()
    for r in latest_procs:
        for s in r["steps"]:
            leaf = (s.get("action") or "").split("/")[-1].strip()
            types = []
            for io in ("inputs", "outputs"):
                for x in s.get(io) or []:
                    if isinstance(x, dict) and x.get("type"):
                        types.append(str(x["type"]).strip())
            if leaf in a_idx:
                for tp in types:
                    if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid:
                        covered.add((a_idx[leaf], t_idx[tp]))
            via = (s.get("via") or "").strip()
            if via:
                via_counter[via] += 1
            for v in _split_values(s.get("substance")):
                substance_counter[v] += 1
            for v in _split_values(s.get("form")):
                form_counter[v] += 1
    for r in latest_tools:
        for v in _split_values(r["substance_scope"]):
            substance_counter[v] += 1
        for v in _split_values(r["form_scope"]):
            form_counter[v] += 1

    # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值)
    def cost_groups(rows):
        g = {}
        for r in rows:
            key = (r["case_id"], r["version"])
            if key not in g and r["cost_usd"] is not None:
                g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"])
        return list(g.values())

    runs = cost_groups(procs) + cost_groups(tools)
    total_cost = round(sum(c for c, _, _ in runs), 4)
    total_dur = round(sum(d for _, d, _ in runs), 1)
    # 按日成本趋势
    daily = Counter()
    for c, _, ts in runs:
        if ts:
            daily[ts[:10]] += c
    cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())]

    # 进度:分母 = knowledge_type 含对应类型的帖子(distinct case)
    proc_targets = {p["case_id"] for p in posts if "工序" in (p["knowledge_type"] or [])}
    tool_targets = {p["case_id"] for p in posts if "工具" in (p["knowledge_type"] or [])}
    proc_done = {r["case_id"] for r in procs}
    tool_done = {r["case_id"] for r in tools}

    return {
        "result": {
            "matrix_covered": len(covered), "matrix_valid": len(valid),
            "matrix_cells": sorted([ai, ti] for ai, ti in covered),
            "matrix_actions": [a["name"] for a in jm["actions"]],
            "matrix_types": [t["name"] for t in jm["types"]],
            "substance_count": len(substance_counter),
            "substance_top": substance_counter.most_common(15),
            "form_count": len(form_counter),
            "form_top": form_counter.most_common(15),
            "post_count": len(posts),
            "extracted_post_count": len(proc_done | tool_done),
            "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}),
            "via_top10": via_counter.most_common(10),
        },
        "process_data": {
            "run_count": len(runs),
            "avg_cost": round(total_cost / len(runs), 4) if runs else 0,
            "total_cost": total_cost,
            "avg_duration": round(total_dur / len(runs), 1) if runs else 0,
            "total_duration": total_dur,
            "cost_trend": cost_trend,
            "process_progress": {"done": len(proc_done), "total": len(proc_targets)},
            "tools_progress": {"done": len(tool_done), "total": len(tool_targets)},
        },
    }


# ── HTTP handler ─────────────────────────────────────────────────────────────

class Handler(BaseHTTPRequestHandler):

    def _json(self, data, code=200):
        body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _err(self, msg, code=400):
        self._json({"error": msg}, code)

    def do_GET(self):
        u = urlparse(self.path)
        qs = {k: v[0] for k, v in parse_qs(u.query).items()}
        try:
            if u.path == "/" or u.path == "/index.html":
                body = (HERE / "index.html").read_bytes()
                self.send_response(200)
                self.send_header("Content-Type", "text/html; charset=utf-8")
                self.send_header("Content-Length", str(len(body)))
                self.end_headers()
                self.wfile.write(body)
            elif u.path == "/api/dashboard":
                self._json(_dashboard())
            elif u.path == "/api/queries":
                self._json(db.fetch_queries())
            elif u.path == "/api/posts":
                self._json(db.fetch_posts(qs.get("query_id", "")))
            elif u.path == "/api/process_versions":
                self._json(db.fetch_process_versions(qs.get("case_id", "")))
            elif u.path == "/api/process":
                r = db.fetch_process(qs.get("case_id", ""), qs.get("version"))
                self._json(r) if r else self._err("无解构记录", 404)
            elif u.path == "/api/tools_versions":
                self._json(db.fetch_tools_versions(qs.get("case_id", "")))
            elif u.path == "/api/tools":
                r = db.fetch_tools(qs.get("case_id", ""), qs.get("version"))
                self._json(r) if r else self._err("无解构记录", 404)
            elif u.path == "/api/task_status":
                r = _task_status(qs.get("task_id", ""))
                self._json(r) if r else self._err("未知 task_id", 404)
            else:
                self._err("not found", 404)
        except Exception as e:
            self._err(f"{type(e).__name__}: {e}", 500)

    def do_POST(self):
        u = urlparse(self.path)
        try:
            n = int(self.headers.get("Content-Length") or 0)
            payload = json.loads(self.rfile.read(n) or b"{}")
        except Exception:
            return self._err("body 必须是 JSON")
        try:
            if u.path in ("/api/extract_process", "/api/extract_tools"):
                qid = payload.get("query_id")
                cids = payload.get("case_ids") or []
                if not qid or not cids:
                    return self._err("缺 query_id / case_ids")
                script = ("pipeline/procedure_extract.py" if u.path.endswith("process")
                          else "pipeline/tool_extract.py")
                cmd = [sys.executable, script, "--query-id", qid,
                       "--case-ids", ",".join(cids)]
                if payload.get("model"):
                    cmd += ["--model", payload["model"]]
                kind = "proc" if u.path.endswith("process") else "tool"
                self._json({"task_id": _spawn_task(kind, cmd)})
            elif u.path == "/api/run_search":
                query = (payload.get("query") or "").strip()
                if not query:
                    return self._err("缺 query")
                qid = payload.get("query_id") or _next_query_id()
                cmd = [sys.executable, "pipeline/search_eval.py",
                       "--query-id", qid, "--query", query]
                if payload.get("synonyms"):
                    cmd += ["--synonyms", payload["synonyms"]]
                if payload.get("platforms"):
                    cmd += ["--platforms", payload["platforms"]]
                if payload.get("max_count"):
                    cmd += ["--max-count", str(payload["max_count"])]
                self._json({"task_id": _spawn_task("search", cmd), "query_id": qid})
            else:
                self._err("not found", 404)
        except Exception as e:
            self._err(f"{type(e).__name__}: {e}", 500)

    def log_message(self, fmt, *a):
        pass   # 静默访问日志


if __name__ == "__main__":
    print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}")
    ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
  • Step 2: 启动 + 逐端点验证
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python server.py 8772 &
sleep 1
curl -s localhost:8772/api/queries | python3 -m json.tool | head -20
curl -s localhost:8772/api/dashboard | python3 -c "
import json,sys
d = json.load(sys.stdin)
print('采集帖子:', d['result']['post_count'])
print('内容树:', d['result']['matrix_covered'], '/', d['result']['matrix_valid'])
print('进度:', d['process_data'])
"
curl -s "localhost:8772/api/posts?query_id=q0000" | python3 -c "import json,sys; r=json.load(sys.stdin); print(len(r),'帖, 首帖:', r[0]['case_id'], r[0]['has_process'], r[0]['has_tools'])"
curl -s "localhost:8772/api/process?case_id=不存在" ; echo
kill %1

Expected: queries 返回 q0000-q0003;dashboard 的 post_count>0、matrix_valid==643;posts 返回该 query 帖子;不存在的 case 返回 {"error": "无解构记录"} 404。若 Task 4/5 冒烟跑过,对应 case 的 process/tools 端点也应返回数据。

  • Step 3: 任务接口验证(不花钱,用 --help 不行,改用故意缺参验证错误路径 + 真实任务可选)
curl -s -X POST localhost:8772/api/extract_tools -d '{}' ; echo

Expected: {"error": "缺 query_id / case_ids"}。(真实解构任务在 Task 9 端到端验收时从 UI 触发。)

  • Step 4: Commit
git add examples/mode_workflow/server.py
git commit -m "feat(mode_workflow): server(API+任务管理+Dashboard聚合)"

Task 8: index.html — 三 tab 前端

Files:

  • Create: examples/mode_workflow/index.html

实现方式: 本任务必须先invoke frontend-design:frontend-design 技能再写代码——产出单文件 HTML(原生 JS,ECharts 5 CDN:https://cdn.jsdelivr.net/npm/echarts@5/dist/echarts.min.js),禁止引入构建链。API 契约见 Task 7 表格,字段名不得偏离。

视觉方向(延续用户截图的既有风格): 白底卡片 + 深色(藏青/石板)表头多色分区表格;标签 pill(类型彩色底);需求区深蓝表头、输入区橙黄表头、输出区绿表头的分区配色;推断字段用「推」角标 + 高亮底色,hover 显示 inferred_reason。整体配中文界面。

页面结构与行为(验收依据):

  1. 顶部导航:Dashboard / Dataset / 聚类库 三 tab(hash 路由 #dashboard #dataset #cluster,刷新保位)。

  2. Dashboard tab(数据源 GET /api/dashboard):

    • 第一行数字卡:采集帖子数量、解构帖子数量、工具数量、内容树覆盖(covered/valid + 百分比)。
    • 第二行数字卡(过程数据):累计解构成本($total_cost,副标题单条均值 $avg_cost)、总耗时(副标题平均耗时)、工序进度环(done/total)、工具进度环。
    • 图表区:
      • 内容树覆盖热力图:ECharts heatmap,x=50 types,y=27 actions,covered cell 高亮(数据来自 matrix_cells/matrix_actions/matrix_types)。
      • 工序提及工具 Top10:横向条形图(via_top10)。
      • 实质覆盖分布:条形图(substance_top,标题带总数 substance_count)。
      • 形式覆盖分布:条形图(form_top + form_count)。
      • 成本趋势:折线图(cost_trend)。
    • 全部指标为 0/空时不报错,显示空态文案。
  3. Dataset tab:顶部「工序 / 工具」子切换 + 「新建搜索」按钮;三栏:

    • 左栏 query 列表(/api/queries):query_text + 帖子数 + 解构进度(process_donetools_done 随子模式切换);点击选中。
    • 中栏帖子列表(/api/posts?query_id=):卡片含标题、平台徽标(xhs/gzh/zhihu/x)、overall_score、knowledge_type pill、已解构标记(has_process/has_tools);卡片上「解构」按钮 + 多选批量解构;点击卡片在右栏加载。
    • 右栏解构结果:
      • 工序模式:头部「大模型工序:已提取」+ 版本下拉(/api/process_versions,格式 v_xxx (最新) · N工序)+「重新生成」按钮;每个工序渲染:工序名 + 目的 + 类别 pill + 平台/作者;输入/返回声明区;步骤明细表(分区表头:需求[#/目的/作用/实质/形式] 深蓝、输入[类型/值/来源] 橙、实现[外部工具/动作/指令] 绿、输出[类型/值/去处] 深绿),inferred:true 的单元格高亮 +「推」角标,hover title 显示 inferred_reason。
      • 工具模式:版本下拉 + 重新生成;每个工具一张卡:工具名、创作层级 pill、实质/形式作用域 pill 组、输入/输出、用法列表、案例(输入/输出/效果)、缺点列表、来源链接。
      • 无解构记录(404)时显示空态 +「开始解构」按钮。
    • 解构/搜索触发后:弹出任务面板,轮询 /api/task_status(2s 间隔),显示 log_tail,done 后自动刷新当前栏;failed 显示日志。
    • 「新建搜索」:弹窗输 query/synonyms/platforms/max_count → POST /api/run_search → 任务面板,完成后左栏刷新出现新 query。
  4. 聚类库 tab:居中空态占位(图标 + 「聚类库 · 敬请期待」)。

最小骨架(代码结构基线,frontend-design 在此骨架上完成完整实现):

<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="utf-8">
<title>mode_workflow</title>
<script src="https://cdn.jsdelivr.net/npm/echarts@5/dist/echarts.min.js"></script>
<style>/* frontend-design 产出 */</style>
</head>
<body>
<nav id="tabs"><!-- Dashboard / Dataset / 聚类库 --></nav>
<main id="view-dashboard" class="view"></main>
<main id="view-dataset" class="view">
  <aside id="query-list"></aside>
  <section id="post-list"></section>
  <section id="extract-panel"></section>
</main>
<main id="view-cluster" class="view"></main>
<div id="task-panel" hidden></div>
<script>
const api = (p, opt) => fetch(p, opt).then(async r => {
  if (!r.ok) throw Object.assign(new Error(), {status: r.status, body: await r.json().catch(() => ({}))});
  return r.json();
});
const state = {tab: 'dashboard', mode: 'process', queryId: null, caseId: null, version: null};
function route() { state.tab = (location.hash || '#dashboard').slice(1); render(); }
window.addEventListener('hashchange', route);
async function pollTask(taskId, onDone) {
  const t = await api(`/api/task_status?task_id=${taskId}`);
  renderTaskPanel(t);
  if (t.status === 'running') setTimeout(() => pollTask(taskId, onDone), 2000);
  else if (t.status === 'done') onDone();
}
// render() / renderDashboard() / renderDataset() / renderTaskPanel() … frontend-design 产出
route();
</script>
</body>
</html>
  • [ ] Step 1: invoke frontend-design 技能,按上述规格产出完整 index.html

  • [ ] Step 2: 浏览器验收

cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python server.py 8772 &

打开 http://localhost:8772,核对上面"页面结构与行为"逐条:三 tab 切换、Dashboard 全部卡片与 5 张图渲染、Dataset 三栏联动、版本下拉、空态、聚类库占位。若 Task 4/5 冒烟跑过,选对应帖子确认工序表格分区配色与「推」高亮、工具卡片完整。

  • Step 3: Commit
git add examples/mode_workflow/index.html
git commit -m "feat(mode_workflow): 三tab单页前端(Dashboard/Dataset/聚类库)"

Task 9: README + 端到端验收

Files:

  • Create: examples/mode_workflow/README.md

  • [ ] Step 1: 写 README

# mode_workflow · 搜索评估 + 工序/工具解构工作台

MySQL 三表(search_data / mode_process / mode_tools)为唯一事实源的单页工作台:
Dashboard(结果/过程指标可视化)、Dataset(query → 帖子 → 工序/工具解构)、聚类库(占位)。

设计文档:`docs/superpowers/specs/2026-06-12-mode-workflow-design.md`

## 启动

```bash
# 0. 前置:.env 配 MYSQL_* 与 OPEN_ROUTER_API_KEY;pip install -e .
python db.py init             # 建三张表(幂等)
python import_history.py      # (可选)导入 fixed_query_eval 历史搜索结果
python server.py              # http://localhost:8772

结构

文件 职责
db.py 三表 DDL + 全部读写(读 .env MYSQL_*)
server.py 页面 + API + 解构任务子进程管理(端口 8772)
index.html 单文件前端:Dashboard / Dataset / 聚类库
pipeline/search_eval.py 任意 query 搜索+评估 → search_data
pipeline/procedure_extract.py 工序解构(LLM 直出)→ mode_process
pipeline/tool_extract.py 工具解构 → mode_tools
prompts/ 工序/工具解构 system prompt(可单独迭代)
reference/judged_matrix.json 内容树(27 动作×50 类型),Dashboard 覆盖度用
runs/ 运行日志与调试副本(gitignore)

搜索/评估/转写引擎函数只读复用 examples/process_pipeline/script/search_eval/search_and_evaluate.py,本目录不复制引擎代码。

与旧 search_eval 的关系

取代 fixed_query_eval(8770)+ mode_procedure(8771)两套服务的"搜索评估 + 大模型解构" 部分;procedure-dsl 执行引擎、mode-dsl 模式提取、A/B/C 三形式对比未迁移(按设计裁剪)。


- [ ] **Step 2: 端到端验收(需 API key)**

```bash
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python server.py 8772 &

浏览器走完整链路:

  1. Dataset → 选 q0000 → 选 1 帖 → 触发工具解构 → 任务面板出日志 → 完成后右栏出工具卡。
  2. 同帖触发工序解构(模型选 google/gemini-3.1-flash-lite 控成本)→ 右栏出工序分区表格。
  3. 回 Dashboard → 解构帖子数量/工具数量/Top10/成本/进度环全部联动更新。
  4. 新建搜索(query 任意,platforms=xhs,max_count=3)→ 完成后左栏出现新 query。
  • Step 3: 最终 Commit
git add examples/mode_workflow/README.md
git commit -m "docs(mode_workflow): README 与启动说明"
  • Step 4: 收尾检查
git status examples/mode_workflow   # runs/ 不应出现在未跟踪列表(被 .gitignore 覆盖)

Self-Review 记录

  • Spec 覆盖:三表(Task 2)、历史导入(Task 3)、三条 pipeline(Task 4-6)、API+Dashboard 口径(Task 7)、三 tab 前端+截图复刻(Task 8)、聚类库占位(Task 8)、README(Task 9)——spec 全部章节有对应任务。spec 中 prompts/eval_prompt.md 一项经核实不需要(rubric 固化在引擎内),已在计划头部标注修正。
  • 占位符扫描:无 TBD/TODO;Task 8 的 HTML 完整实现委托 frontend-design 技能,但 API 契约、页面行为、骨架代码、验收清单均已写死,不属于开放占位。
  • 类型/命名一致性:replace_process(payload)fetch_process() 的 procedures 字段对称;server 调 db.fetch_* 名称与 Task 2 定义一致;pipeline 脚本参数名与 server _spawn_task 构造的 cmd 一致(--query-id/--case-ids/--model);_dashboard() 返回字段与 Task 8 前端读取字段一致(matrix_cells/via_top10/substance_top/cost_trend/process_progress 等)。