For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 在 examples/mode_workflow/ 建一套"搜索评估 + 工序/工具大模型解构"系统:MySQL 三张新表为唯一事实源,单 server(8772) + 单页 HTML(Dashboard / Dataset / 聚类库)。
Architecture: DB-first(区别于旧系统文件为主双写)。搜索/解构引擎函数从 examples/process_pipeline/script/ 只读 import(沿用 run_search.py 先例),mode_workflow 只持有编排脚本、prompts、db 层、server、前端。解构由 server 起子进程跑 pipeline 脚本,结果直接写库,前端轮询任务状态。
Tech Stack: Python(http.server + pymysql + asyncio)、MySQL(.env MYSQL_*)、OpenRouter LLM(经 call_llm_with_retry)、单文件 HTML + 原生 JS + ECharts(CDN)。
Spec: docs/superpowers/specs/2026-06-12-mode-workflow-design.md
前置条件(执行前确认):
.env 含 MYSQL_HOST/MYSQL_PORT/MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE(Task 2 起必需).env 含 OPEN_ROUTER_API_KEY(Task 4-6 的冒烟测试需要,会产生 ~$0.01-0.05 真实费用;没有 key 则跳过冒烟,只做代码审查)pip install -e . 已装(agent.llm.openrouter 等可 import)Spec 修正(已核实): 评估 rubric 固化在 llm_evaluate_sources 的 eval_prompt_template.md(见 search_and_evaluate.py:424 注释),不需要复制 eval_prompt.md 到 prompts/。prompts/ 只放工序、工具两个解构 prompt。
| 事实 | 位置 |
|---|---|
MySQL 连接模式:pymysql + DictCursor + autocommit,_enabled() 检查 MYSQL_HOST |
examples/process_pipeline/script/search_eval/fixed_query_eval/db.py:32-47 |
search_all(platforms, queries, max_count, max_concurrent, query_overrides=None) 返回 source 列表(case_id/platform/source_url/post/found_by_queries) |
search_and_evaluate.py:212 |
evaluate_posts(sources, requirement, llm_call, model, max_concurrent, include_images, max_images, image_mode, query) 返回 (sources, total_cost),评估挂在 source["llm_evaluation"] |
search_and_evaluate.py:403 |
transcribe_video_posts(sources, concurrency)、build_query_overrides(platforms, phrasings, llm, model)、_attach_image_refs(sources, max_images, concurrency, mode) |
search_and_evaluate.py |
build_eval_llm_call(choice) 返回 (llm_call, model_id);EVAL_MODELS/DEFAULT_EVAL_MODEL;_format_post_for_eval(source);_evaluate_one |
examples/process_pipeline/script/llm_evaluate_sources.py |
call_llm_with_retry(llm_call, messages, model, temperature, max_tokens, validate_fn, task_name) 返回 (data_dict, cost),只认 JSON 对象 |
examples/process_pipeline/script/llm_helper.py |
任意 OpenRouter 模型:from agent.llm.openrouter import create_openrouter_llm_call; llm_call = create_openrouter_llm_call(model=...) |
tool_extract.py:121-122 |
工具解构返回 {"tools":[{工具名称,实质作用域,形式作用域,创作层级,来源链接,输入,输出,用法,案例,缺点,最新更新时间}]} |
fixed_query_eval/prompts/tool_extract_system.md |
工序解构返回 {"source":{...},"procedures":[{id,name,purpose,category,declarations,type_registry,steps:[{id,kind,via,effect,action,substance,form,directive,intent,inputs,outputs}]}]} |
mode_procedure/mode-dsl/prompts/procedure_extract_system.md |
工序解构的图片走 base64(绕防盗链),_detect_image_mime/_fetch_data_url/_collect_images/_sanitize_workflow 可整段复制 |
mode_procedure/mode-dsl/procedure_model_extract.py:34-126 |
judged_matrix:{actions:[{name,l1,l2}×27], types:[{name,l1,l2}×50], matrix:[27行×50格,每格{tier,r,q}], stats:{valid:643}},有效节点 = tier≥1 |
search_eval/evaluation/judged_matrix.json |
历史搜索数据:fixed_query_eval/runs_full/q0000-q0003/form_A.json,顶层 {form,query,original_q,platforms,total,failed,results:[...]} |
同目录 |
| 仓库无 pytest 体系(CLAUDE.md),验证用独立命令/冒烟脚本 | CLAUDE.md |
路径深度:examples/mode_workflow/x.py 的 PROJECT_ROOT = Path(__file__).resolve().parents[2];examples/mode_workflow/pipeline/x.py 是 parents[3]。
Files:
examples/mode_workflow/.gitignoreexamples/mode_workflow/prompts/(拷贝 2 个 prompt)examples/mode_workflow/reference/judged_matrix.json(拷贝)Create: examples/mode_workflow/pipeline/、examples/mode_workflow/runs/ 目录
[ ] Step 1: 建目录并同步资产
cd /Users/max_liu/max_liu/company/Agent
SE=examples/process_pipeline/script/search_eval
mkdir -p examples/mode_workflow/{pipeline,prompts,reference,runs}
cp "$SE/mode_procedure/mode-dsl/prompts/procedure_extract_system.md" examples/mode_workflow/prompts/
cp "$SE/fixed_query_eval/prompts/tool_extract_system.md" examples/mode_workflow/prompts/
cp "$SE/evaluation/judged_matrix.json" examples/mode_workflow/reference/
examples/mode_workflow/.gitignore:
runs/
__pycache__/
*.pyc
ls examples/mode_workflow/prompts/ examples/mode_workflow/reference/
python3 -c "import json; d=json.load(open('examples/mode_workflow/reference/judged_matrix.json')); print(d['stats'])"
Expected: 两个 prompt 文件名 + judged_matrix.json;stats 行含 'valid': 643。
git add examples/mode_workflow
git commit -m "feat(mode_workflow): 脚手架与 prompt/内容树资产同步"
Files:
examples/mode_workflow/db.py设计要点:
fixed_query_eval/db.py(pymysql/DictCursor/autocommit),但本系统 DB 是主存储:写失败要 raise(由调用方决定怎么处理),不再静默吞掉。读侧保留防御。mode_process/mode_tools 写入语义:删掉 (case_id, version) 同版本旧行再插(同版本重跑幂等,跨版本保留历史)。cost_usd/duration_s 是每次解构调用的值,同一 (case_id, version) 的多行重复存同一个值;聚合时必须按 (case_id, version) 去重(见 Task 7)。
[ ] Step 1: 写 examples/mode_workflow/db.py(完整文件)
# -*- coding: utf-8 -*-
"""mode_workflow · MySQL 持久化(DB 为唯一事实源)
================================================================================
读 .env 的 MYSQL_* 连接 MySQL。三张表:
search_data —— 每行一个 (query, 帖子):搜索 + llm 评估结果
mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列)
mode_tools —— 每行一个解构出的工具
与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise,
不做"失败不阻断"。读侧保留防御(返回空/None)。
用法:
python db.py init # 建表(幂等)
python db.py check # 打印三表行数
"""
import json
import os
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv()
import pymysql
from pymysql.cursors import DictCursor
def _conn():
if not os.getenv("MYSQL_HOST"):
raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置")
return pymysql.connect(
host=os.getenv("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", 3306)),
user=os.getenv("MYSQL_USER"),
password=os.getenv("MYSQL_PASSWORD"),
database=os.getenv("MYSQL_DATABASE"),
charset="utf8mb4", cursorclass=DictCursor,
autocommit=True, connect_timeout=10,
)
# ── DDL ──────────────────────────────────────────────────────────────────────
DDL_SEARCH = """
CREATE TABLE IF NOT EXISTS search_data (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
query_id VARCHAR(32) NOT NULL COMMENT 'q0000',
query_text VARCHAR(512) NULL,
case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
platform VARCHAR(32) NULL,
channel_content_id VARCHAR(128) NULL,
title VARCHAR(512) NULL,
url VARCHAR(1024) NULL,
content_type VARCHAR(32) NULL,
body LONGTEXT NULL,
images JSON NULL,
videos JSON NULL,
like_count INT NULL,
publish_time VARCHAR(64) NULL,
quality_score FLOAT NULL COMMENT 'post._quality_score',
quality_grade VARCHAR(8) NULL,
found_by JSON NULL COMMENT '命中的措辞数组',
knowledge_type JSON NULL COMMENT '["能力","工序","工具"] 子集',
overall_score FLOAT NULL COMMENT '(相关均值+质量均值)/2',
llm_evaluation JSON NULL COMMENT '评估全量 blob',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_qid_case (query_id, case_id),
KEY idx_platform (platform)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果';
"""
DDL_PROCESS = """
CREATE TABLE IF NOT EXISTS mode_process (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
query_id VARCHAR(32) NOT NULL,
case_id VARCHAR(128) NOT NULL,
platform VARCHAR(32) NULL,
post_title VARCHAR(512) NULL,
source JSON NULL COMMENT '解构返回的 source 块',
procedure_id VARCHAR(16) NULL COMMENT 'p1,p2…',
name VARCHAR(255) NULL,
purpose TEXT NULL,
category VARCHAR(32) NULL COMMENT '产物创造/资产建设/自动化/分析/学习',
declarations JSON NULL,
type_registry JSON NULL,
steps JSON NULL COMMENT '步骤数组全量',
step_count INT NULL,
tools_used JSON NULL COMMENT '从 steps[].via 去重提取',
model VARCHAR(64) NULL,
version VARCHAR(16) NULL COMMENT 'v_MMDDHHMM,保留历史',
cost_usd DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
duration_s FLOAT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
KEY idx_case_ver (case_id, version),
KEY idx_qid (query_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
"""
DDL_TOOLS = """
CREATE TABLE IF NOT EXISTS mode_tools (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
query_id VARCHAR(32) NOT NULL,
case_id VARCHAR(128) NOT NULL,
platform VARCHAR(32) NULL,
post_title VARCHAR(512) NULL,
tool_name VARCHAR(255) NULL,
substance_scope JSON NULL COMMENT '实质作用域(数组)',
form_scope JSON NULL COMMENT '形式作用域(数组或null)',
creation_layer VARCHAR(32) NULL COMMENT '制作层/创作层',
source_link VARCHAR(1024) NULL,
input_desc TEXT NULL,
output_desc TEXT NULL,
usage_json JSON NULL,
cases_json JSON NULL,
defects_json JSON NULL,
updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
model VARCHAR(64) NULL,
version VARCHAR(16) NULL,
cost_usd DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
duration_s FLOAT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
KEY idx_case_ver (case_id, version),
KEY idx_qid (query_id),
KEY idx_tool_name (tool_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
"""
def init_tables():
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute(DDL_SEARCH)
cur.execute(DDL_PROCESS)
cur.execute(DDL_TOOLS)
print("✅ 建表完成:search_data, mode_process, mode_tools")
finally:
conn.close()
# ── 工具函数 ──────────────────────────────────────────────────────────────────
def _loads(v, default=None):
"""pymysql 的 JSON 列可能返回字符串,统一解析。"""
if v is None:
return default
if isinstance(v, (list, dict)):
return v
try:
return json.loads(v)
except Exception:
return default
def _j(v):
"""写入 JSON 列:None 保持 NULL,其余 dumps。"""
return None if v is None else json.dumps(v, ensure_ascii=False)
def _collect_scores(node):
"""递归收集嵌套评估里所有数值「得分」。"""
out = []
if isinstance(node, dict):
for k, v in node.items():
if k == "得分" and isinstance(v, (int, float)):
out.append(float(v))
else:
out.extend(_collect_scores(v))
elif isinstance(node, list):
for v in node:
out.extend(_collect_scores(v))
return out
def overall_score(e):
"""综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。"""
parts = []
for key in ("相关性", "质量"):
scores = _collect_scores((e or {}).get(key))
if scores:
parts.append(sum(scores) / len(scores))
return round(sum(parts) / len(parts), 2) if parts else None
# ── search_data ──────────────────────────────────────────────────────────────
def upsert_search_posts(query_id, query_text, results):
"""一组搜索结果写入 search_data(按 (query_id, case_id) upsert)。返回写入条数。"""
if not results:
return 0
rows = []
for r in results:
post = r.get("post") or {}
e = r.get("llm_evaluation") or {}
rows.append((
query_id, query_text, r.get("case_id"), r.get("platform"),
r.get("channel_content_id"),
(post.get("title") or post.get("desc") or "")[:500],
r.get("source_url"), post.get("content_type"),
post.get("body_text") or post.get("desc") or "",
_j(post.get("images") or []), _j(post.get("videos") or []),
post.get("like_count"),
str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
post.get("_quality_score"), post.get("_quality_grade"),
_j(r.get("found_by_queries") or []),
_j(e.get("知识类型") or []),
overall_score(e),
_j(e),
))
sql = """
INSERT INTO search_data
(query_id, query_text, case_id, platform, channel_content_id, title, url,
content_type, body, images, videos, like_count, publish_time,
quality_score, quality_grade, found_by, knowledge_type, overall_score, llm_evaluation)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
query_text=VALUES(query_text), platform=VALUES(platform),
channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images),
videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time),
quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade),
found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type),
overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation);
"""
conn = _conn()
try:
with conn.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
finally:
conn.close()
def fetch_queries():
"""query 列表 + 帖子数 + 解构进度。"""
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("""SELECT query_id, MAX(query_text) AS query_text,
COUNT(*) AS post_count
FROM search_data GROUP BY query_id ORDER BY query_id""")
queries = cur.fetchall()
cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
np = {r["query_id"]: r["n"] for r in cur.fetchall()}
cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id")
nt = {r["query_id"]: r["n"] for r in cur.fetchall()}
finally:
conn.close()
for q in queries:
q["process_done"] = np.get(q["query_id"], 0)
q["tools_done"] = nt.get(q["query_id"], 0)
return queries
def fetch_posts(query_id):
"""某 query 下全部帖子(JSON 列已解析),带 has_process/has_tools 标记。"""
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("""SELECT * FROM search_data WHERE query_id=%s
ORDER BY overall_score DESC, id""", (query_id,))
rows = cur.fetchall()
cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,))
hp = {r["case_id"] for r in cur.fetchall()}
cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,))
ht = {r["case_id"] for r in cur.fetchall()}
finally:
conn.close()
for r in rows:
for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
r[col] = _loads(r[col])
r["has_process"] = r["case_id"] in hp
r["has_tools"] = r["case_id"] in ht
r.pop("created_at", None); r.pop("updated_at", None)
return rows
def fetch_post(query_id, case_id):
"""单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。"""
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM search_data WHERE query_id=%s AND case_id=%s",
(query_id, case_id))
row = cur.fetchone()
finally:
conn.close()
if not row:
return None
for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
row[col] = _loads(row[col])
return row
# ── mode_process ─────────────────────────────────────────────────────────────
def replace_process(query_id, case_id, platform, post_title, payload,
model, version, cost_usd, duration_s):
"""写入一帖某版本的工序解构结果(payload = {source, procedures})。
删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。"""
source = payload.get("source")
procedures = payload.get("procedures") or []
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
(case_id, version))
if procedures:
rows = []
for p in procedures:
steps = p.get("steps") or []
vias = []
for s in steps:
v = s.get("via")
if v and v not in vias:
vias.append(v)
rows.append((
query_id, case_id, platform, (post_title or "")[:500],
_j(source), p.get("id"), (p.get("name") or "")[:250],
p.get("purpose"), p.get("category"),
_j(p.get("declarations")), _j(p.get("type_registry")),
_j(steps), len(steps), _j(vias),
model, version, cost_usd, duration_s,
))
cur.executemany("""
INSERT INTO mode_process
(query_id, case_id, platform, post_title, source, procedure_id, name,
purpose, category, declarations, type_registry, steps, step_count,
tools_used, model, version, cost_usd, duration_s)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", rows)
return len(procedures)
finally:
conn.close()
def fetch_process_versions(case_id):
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
FROM mode_process WHERE case_id=%s
GROUP BY version ORDER BY version DESC""", (case_id,))
return cur.fetchall()
finally:
conn.close()
def fetch_process(case_id, version=None):
"""重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。"""
conn = _conn()
try:
with conn.cursor() as cur:
if version is None:
cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
row = cur.fetchone()
if not row:
return None
version = row["version"]
cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s
ORDER BY id""", (case_id, version))
rows = cur.fetchall()
finally:
conn.close()
if not rows:
return None
procedures = [{
"id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"],
"category": r["category"], "declarations": _loads(r["declarations"]),
"type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []),
"tools_used": _loads(r["tools_used"], []),
} for r in rows]
return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
"title": rows[0]["post_title"], "model": rows[0]["model"],
"cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
"duration_s": rows[0]["duration_s"],
"source": _loads(rows[0]["source"]), "procedures": procedures}
# ── mode_tools ───────────────────────────────────────────────────────────────
def replace_tools(query_id, case_id, platform, post_title, tools,
model, version, cost_usd, duration_s):
"""写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。"""
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
(case_id, version))
if tools:
rows = [(
query_id, case_id, platform, (post_title or "")[:500],
(t.get("工具名称") or "")[:250],
_j(t.get("实质作用域")), _j(t.get("形式作用域")),
t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
_j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
t.get("最新更新时间"), model, version, cost_usd, duration_s,
) for t in tools]
cur.executemany("""
INSERT INTO mode_tools
(query_id, case_id, platform, post_title, tool_name, substance_scope,
form_scope, creation_layer, source_link, input_desc, output_desc,
usage_json, cases_json, defects_json, updated_time, model, version,
cost_usd, duration_s)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", rows)
return len(tools)
finally:
conn.close()
def fetch_tools_versions(case_id):
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
FROM mode_tools WHERE case_id=%s
GROUP BY version ORDER BY version DESC""", (case_id,))
return cur.fetchall()
finally:
conn.close()
def fetch_tools(case_id, version=None):
"""重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。"""
conn = _conn()
try:
with conn.cursor() as cur:
if version is None:
cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
row = cur.fetchone()
if not row:
return None
version = row["version"]
cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s
ORDER BY id""", (case_id, version))
rows = cur.fetchall()
finally:
conn.close()
if not rows:
return None
tools = [{
"工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]),
"形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"],
"来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
"用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
"缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
} for r in rows]
return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
"title": rows[0]["post_title"], "model": rows[0]["model"],
"cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
"duration_s": rows[0]["duration_s"],
"tool_count": len(tools), "tools": tools}
# ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────
def fetch_dashboard_rows():
"""拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。"""
conn = _conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT query_id, case_id, knowledge_type FROM search_data")
posts = cur.fetchall()
cur.execute("""SELECT case_id, version, steps, tools_used, cost_usd,
duration_s, created_at FROM mode_process""")
procs = cur.fetchall()
cur.execute("""SELECT case_id, version, tool_name, substance_scope,
form_scope, cost_usd, duration_s, created_at
FROM mode_tools""")
tools = cur.fetchall()
finally:
conn.close()
for p in posts:
p["knowledge_type"] = _loads(p["knowledge_type"], [])
for r in procs:
r["steps"] = _loads(r["steps"], [])
r["tools_used"] = _loads(r["tools_used"], [])
r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
r["created_at"] = str(r["created_at"]) if r["created_at"] else None
for r in tools:
r["substance_scope"] = _loads(r["substance_scope"], [])
r["form_scope"] = _loads(r["form_scope"], [])
r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
r["created_at"] = str(r["created_at"]) if r["created_at"] else None
return posts, procs, tools
def check():
conn = _conn()
try:
with conn.cursor() as cur:
for t in ("search_data", "mode_process", "mode_tools"):
cur.execute(f"SELECT COUNT(*) AS n FROM {t}")
print(f"{t}: {cur.fetchone()['n']} 行")
finally:
conn.close()
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else ""
if cmd == "init":
init_tables()
elif cmd == "check":
check()
else:
print("用法:\n python db.py init # 建表\n python db.py check # 三表行数")
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python db.py init && python db.py check
Expected: ✅ 建表完成:search_data, mode_process, mode_tools,三表各 0 行。失败提示缺 MYSQL_HOST 则确认 .env。
python3 -c "
import db
e = {'相关性': {'和内容制作知识相关': {'得分': 8}, '和 query 相关': {'得分': 6}},
'质量': {'固定维度': {'热度性': {'得分': 4}}}}
s = db.overall_score(e)
assert s == round((7.0 + 4.0) / 2, 2), s
assert db.overall_score({}) is None
print('overall_score OK:', s)
"
Expected: overall_score OK: 5.5
git add examples/mode_workflow/db.py
git commit -m "feat(mode_workflow): 三表 DDL 与 MySQL 读写层"
Files:
Create: examples/mode_workflow/import_history.py
[ ] Step 1: 写完整文件
# -*- coding: utf-8 -*-
"""一次性导入:fixed_query_eval/runs_full/*/form_A.json → search_data 表。
幂等(upsert),可反复执行。
用法:
python import_history.py
python import_history.py --runs-dir /path/to/runs_full
"""
import argparse
import json
import sys
from pathlib import Path
HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(HERE))
import db
DEFAULT_RUNS = (HERE.parent / "process_pipeline" / "script" / "search_eval"
/ "fixed_query_eval" / "runs_full")
def main():
p = argparse.ArgumentParser(description="历史搜索结果导入 search_data")
p.add_argument("--runs-dir", default=str(DEFAULT_RUNS))
args = p.parse_args()
runs = Path(args.runs_dir)
files = sorted(runs.glob("q*/form_A.json"))
if not files:
print(f"❌ {runs} 下没有 q*/form_A.json"); return 1
total = 0
for f in files:
data = json.loads(f.read_text(encoding="utf-8"))
qid = f.parent.name
results = data.get("results", [])
n = db.upsert_search_posts(qid, data.get("query") or data.get("original_q"), results)
print(f" {qid}: 文件 {len(results)} 条 → 入库 {n} 条")
total += n
print(f"✅ 共导入 {total} 条 → search_data")
return 0
if __name__ == "__main__":
raise SystemExit(main())
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python import_history.py
python db.py check
python3 -c "
import json, glob
n = sum(len(json.load(open(f))['results'])
for f in glob.glob('../process_pipeline/script/search_eval/fixed_query_eval/runs_full/q*/form_A.json'))
print('JSON 总条数:', n)
"
Expected: 每个 q 打一行;search_data 行数 == JSON 总条数(q0000-q0003)。再跑一次 python import_history.py,行数不变(幂等)。
git add examples/mode_workflow/import_history.py
git commit -m "feat(mode_workflow): 历史搜索结果导入脚本"
Files:
examples/mode_workflow/pipeline/tool_extract.py与旧版差异: 帖子源从 search_data 表读(不再读 form_A.json);结果直接写 mode_tools(不再写本地 json 为主),runs/tools/ 只留调试副本;记录 cost/duration。
# -*- coding: utf-8 -*-
"""工具解构 · search_data 帖子 → 结构化工具条目 → mode_tools 表
================================================================================
- 帖子源:search_data 表(--query-id + --case-ids 定位)
- 模型默认 google/gemini-3.1-flash-lite,可 --model 传任意 OpenRouter id
- 多模态:复用 search_and_evaluate._attach_image_refs(URL 直传)
- 写库:db.replace_tools(同版本幂等,跨版本保留);runs/tools/ 留调试副本
用法(一般由 server.py 起子进程调):
python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc,gzh_def
python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc --model anthropic/claude-sonnet-4-6
"""
import argparse
import asyncio
import json
import sys
import time
from datetime import datetime
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[3] # …/Agent
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv()
from examples.process_pipeline.script.search_eval.search_and_evaluate import _attach_image_refs
from examples.process_pipeline.script.llm_evaluate_sources import _format_post_for_eval, build_eval_llm_call
from examples.process_pipeline.script.llm_helper import call_llm_with_retry
HERE = Path(__file__).resolve().parent # pipeline/
MW = HERE.parent # mode_workflow/
sys.path.insert(0, str(MW))
import db
TOOL_SYSTEM = (MW / "prompts" / "tool_extract_system.md").read_text(encoding="utf-8")
DEFAULT_MODEL_CHOICE = "gemini-flash-lite"
MAX_IMAGES = 6
def _row_to_source(row):
"""search_data 行 → 引擎函数认的 source dict。"""
return {
"case_id": row["case_id"], "platform": row["platform"],
"channel_content_id": row["channel_content_id"],
"source_url": row["url"],
"post": {
"title": row["title"], "body_text": row["body"],
"images": row["images"] or [], "like_count": row["like_count"],
"publish_timestamp": row["publish_time"], "link": row["url"],
},
}
def _validate_tools(data):
if not isinstance(data, dict) or "tools" not in data:
return '缺少顶层 "tools" 字段'
if not isinstance(data["tools"], list):
return '"tools" 必须是数组'
return None
async def extract_one(source, llm_call, model):
"""对一条 source 抽工具,返回 (tools_list, cost)。失败返回 ([], cost)。"""
user_text = "【内容】\n" + _format_post_for_eval(source)
image_urls = source.get("_image_data_urls") or None
if image_urls:
user_content = [{"type": "text", "text": user_text}]
for u in image_urls:
user_content.append({"type": "image_url", "image_url": {"url": u}})
messages = [{"role": "system", "content": TOOL_SYSTEM},
{"role": "user", "content": user_content}]
else:
messages = [{"role": "system", "content": TOOL_SYSTEM},
{"role": "user", "content": user_text}]
data, cost = await call_llm_with_retry(
llm_call=llm_call, messages=messages, model=model,
temperature=0.1, max_tokens=4000,
validate_fn=_validate_tools,
task_name=f"ToolExtract[{source.get('case_id', '?')}]",
)
if not data:
return [], cost
return data.get("tools", []), cost
async def run(args):
qid = args.query_id
case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
sources = []
for cid in case_ids:
row = db.fetch_post(qid, cid)
if row is None:
print(f"⚠️ {qid}/{cid} 不在 search_data,跳过")
continue
sources.append(_row_to_source(row))
if not sources:
print("❌ 没有可解构的帖子"); return 1
if args.model and "/" in args.model:
from agent.llm.openrouter import create_openrouter_llm_call
llm_call, model_id = create_openrouter_llm_call(model=args.model), args.model
else:
llm_call, model_id = build_eval_llm_call(args.model or DEFAULT_MODEL_CHOICE)
version = args.version or ("v_" + datetime.now().strftime("%m%d%H%M"))
print(f"🔧 工具解构 {len(sources)} 帖 · 模型 {model_id} · 版本 {version}")
await _attach_image_refs(sources, MAX_IMAGES, max(2, args.max_concurrent * 2), "url")
sem = asyncio.Semaphore(args.max_concurrent)
out_dir = MW / "runs" / "tools"
out_dir.mkdir(parents=True, exist_ok=True)
async def _work(s):
t0 = time.monotonic()
async with sem:
tools, cost = await extract_one(s, llm_call, model_id)
dur = round(time.monotonic() - t0, 1)
n = db.replace_tools(qid, s["case_id"], s.get("platform"),
(s.get("post") or {}).get("title", ""),
tools, model_id, version, cost, dur)
(out_dir / f"{s['case_id']}_{version}.json").write_text(json.dumps({
"case_id": s["case_id"], "version": version, "model": model_id,
"cost_usd": cost, "duration_s": dur, "tools": tools,
}, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" ✅ {s['case_id']} → {n} 个工具 · ${cost:.4f} · {dur}s")
return cost
costs = await asyncio.gather(*[_work(s) for s in sources])
print(f"\n📊 完成 {len(sources)} 帖 · 总成本 ${sum(costs):.4f}")
return 0
def main():
p = argparse.ArgumentParser(description="工具解构:search_data 帖子 → mode_tools")
p.add_argument("--query-id", required=True)
p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
p.add_argument("--model", default=None, help="默认 gemini-flash-lite,可传 OpenRouter id")
p.add_argument("--max-concurrent", type=int, default=3)
p.add_argument("--version", default=None, help="默认自动 v_月日时分")
args = p.parse_args()
raise SystemExit(asyncio.run(run(args)))
if __name__ == "__main__":
main()
先从库里挑一个真实 case_id:
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
CID=$(python3 -c "import db; print(db.fetch_posts('q0000')[0]['case_id'])")
python pipeline/tool_extract.py --query-id q0000 --case-ids "$CID"
python3 -c "import db; r=db.fetch_tools('$CID'); print(r['version'], r['tool_count'], r['cost_usd'], r['duration_s'])"
Expected: 解构日志含 ✅ <case_id> → N 个工具;fetch_tools 打出版本号、工具数 ≥0、cost/duration 非 None。
git add examples/mode_workflow/pipeline/tool_extract.py
git commit -m "feat(mode_workflow): 工具解构 pipeline(读库→LLM→mode_tools)"
Files:
examples/mode_workflow/pipeline/procedure_extract.py与旧版差异: 帖子源从库读;不渲染 HTML(前端直接渲染 DB 数据);写 mode_process;支持一次多帖。_detect_image_mime/_fetch_data_url/_collect_images/_sanitize_workflow 从 procedure_model_extract.py:34-126 原样复制(已验证可用,不重新发明)。
# -*- coding: utf-8 -*-
"""工序解构 · search_data 帖子 → workflow JSON → mode_process 表
================================================================================
单次大模型直出(无 agent / 无 validate 循环),prompt 见 prompts/procedure_extract_system.md。
配图下载转 base64(绕防盗链)随文本一起发。结果按工序拆行写 mode_process。
用法(一般由 server.py 起子进程调):
python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc
python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc --model google/gemini-3.1-flash-lite
"""
import argparse
import asyncio
import base64
import json
import sys
import time
from datetime import datetime
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[3] # …/Agent
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv()
from examples.process_pipeline.script.llm_helper import call_llm_with_retry
HERE = Path(__file__).resolve().parent
MW = HERE.parent
sys.path.insert(0, str(MW))
import db
PROMPT_FILE = MW / "prompts" / "procedure_extract_system.md"
DEFAULT_MODEL = "anthropic/claude-sonnet-4-6"
MAX_IMAGES = 8
# ── 以下 4 个助手原样取自 mode_procedure/mode-dsl/procedure_model_extract.py ──
def _detect_image_mime(data: bytes):
if not data or len(data) < 12:
return None
if data[:3] == b"\xff\xd8\xff":
return "image/jpeg"
if data[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
return "image/webp"
if data[:6] in (b"GIF87a", b"GIF89a"):
return "image/gif"
return None
async def _fetch_data_url(url, sem):
from agent.tools.builtin.file.image_cdn import _download_image
async with sem:
try:
data = await _download_image(url)
except Exception:
return None
mime = _detect_image_mime(data)
if mime is None:
return None
return f"data:{mime};base64,{base64.b64encode(data).decode()}"
async def _collect_images(urls, max_images, concurrency):
urls = [u for u in urls if isinstance(u, str) and u][:max_images]
if not urls:
return []
sem = asyncio.Semaphore(concurrency)
results = await asyncio.gather(*[_fetch_data_url(u, sem) for u in urls])
return [d for d in results if d]
def _validate_wf(data):
if not isinstance(data, dict):
return "顶层必须是 JSON 对象"
if "procedures" not in data:
return '缺少 "procedures" 字段'
if not isinstance(data["procedures"], list):
return '"procedures" 必须是数组'
return None
def _sanitize_workflow(data):
dropped = {"procedures": 0, "steps": 0, "io": 0}
procs = data.get("procedures")
if not isinstance(procs, list):
return data, dropped
clean_procs = []
for p in procs:
if not isinstance(p, dict):
dropped["procedures"] += 1
continue
steps = p.get("steps")
if isinstance(steps, list):
kept = []
for s in steps:
if not isinstance(s, dict):
dropped["steps"] += 1
continue
for io in ("inputs", "outputs"):
if isinstance(s.get(io), list):
before = len(s[io])
s[io] = [x for x in s[io] if isinstance(x, dict)]
dropped["io"] += before - len(s[io])
kept.append(s)
p["steps"] = kept
if not isinstance(p.get("declarations"), dict):
p.pop("declarations", None)
if not isinstance(p.get("type_registry"), dict):
p.pop("type_registry", None)
clean_procs.append(p)
data["procedures"] = clean_procs
return data, dropped
# ── 助手复制结束 ──────────────────────────────────────────────────────────────
async def extract_one(row, system, llm_call, model, args):
"""单帖工序解构 → 写 mode_process。返回 cost。"""
cid = row["case_id"]
t0 = time.monotonic()
post_text = (f"【标题】{row['title'] or ''}\n【来源】{row['url'] or ''}\n"
f"【正文】\n{row['body'] or ''}")
data_urls = [] if args.no_images else await _collect_images(
row["images"] or [], args.max_images, args.max_concurrent)
print(f"🖼️ {cid} 配图 {len(data_urls)}/{len(row['images'] or [])} 张")
if data_urls:
user_content = [{"type": "text", "text": post_text}]
for u in data_urls:
user_content.append({"type": "image_url", "image_url": {"url": u}})
messages = [{"role": "system", "content": system},
{"role": "user", "content": user_content}]
else:
messages = [{"role": "system", "content": system},
{"role": "user", "content": post_text}]
data, cost = await call_llm_with_retry(
llm_call=llm_call, messages=messages, model=model,
temperature=0.2, max_tokens=args.max_tokens,
validate_fn=_validate_wf, task_name=f"ProcExtract[{cid}]",
)
if not data:
print(f"❌ {cid} 解构失败(重试耗尽)")
return cost
data, dropped = _sanitize_workflow(data)
if any(dropped.values()):
print(f"🧹 {cid} 清洗:丢弃 procedure {dropped['procedures']} / "
f"step {dropped['steps']} / io {dropped['io']}")
dur = round(time.monotonic() - t0, 1)
n = db.replace_process(args.query_id, cid, row["platform"], row["title"],
data, model, args.version, cost, dur)
out_dir = MW / "runs" / "procedures"
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / f"{cid}_{args.version}.json").write_text(
json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" ✅ {cid} → {n} 个工序 · ${cost:.4f} · {dur}s")
return cost
async def run(args):
case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
rows = []
for cid in case_ids:
row = db.fetch_post(args.query_id, cid)
if row is None:
print(f"⚠️ {args.query_id}/{cid} 不在 search_data,跳过")
continue
rows.append(row)
if not rows:
print("❌ 没有可解构的帖子"); return 1
system = PROMPT_FILE.read_text(encoding="utf-8")
from agent.llm.openrouter import create_openrouter_llm_call
llm_call = create_openrouter_llm_call(model=args.model)
args.version = args.version or ("v_" + datetime.now().strftime("%m%d%H%M"))
print(f"🤖 工序解构 {len(rows)} 帖 · 模型 {args.model} · 版本 {args.version}")
costs = []
for row in rows: # 工序解构 token 重,串行跑,避免 OpenRouter 限流
costs.append(await extract_one(row, system, llm_call, args.model, args))
print(f"\n📊 完成 {len(rows)} 帖 · 总成本 ${sum(costs):.4f}")
return 0
def main():
p = argparse.ArgumentParser(description="工序解构:search_data 帖子 → mode_process")
p.add_argument("--query-id", required=True)
p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
p.add_argument("--model", default=DEFAULT_MODEL)
p.add_argument("--version", default=None, help="默认自动 v_月日时分")
p.add_argument("--max-images", type=int, default=MAX_IMAGES)
p.add_argument("--max-concurrent", type=int, default=4)
p.add_argument("--max-tokens", type=int, default=8000)
p.add_argument("--no-images", action="store_true")
args = p.parse_args()
raise SystemExit(asyncio.run(run(args)))
if __name__ == "__main__":
main()
--model google/gemini-3.1-flash-lite)cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
CID=$(python3 -c "import db; print(db.fetch_posts('q0000')[0]['case_id'])")
python pipeline/procedure_extract.py --query-id q0000 --case-ids "$CID" --model google/gemini-3.1-flash-lite
python3 -c "
import db
r = db.fetch_process('$CID')
print(r['version'], len(r['procedures']), '个工序')
p = r['procedures'][0]
print('steps:', len(p['steps']), 'tools_used:', p['tools_used'])
"
Expected: ✅ <cid> → N 个工序;fetch_process 返回 version、工序数 ≥1、steps 非空、tools_used 列出 via。
git add examples/mode_workflow/pipeline/procedure_extract.py
git commit -m "feat(mode_workflow): 工序解构 pipeline(读库→LLM→mode_process)"
Files:
examples/mode_workflow/pipeline/search_eval.py与 run_search.py 的差异: 不写死 4 组 query,接受任意 --query(可带 --synonyms 同义措辞);产出直接入库 search_data,runs/search/ 留调试副本。
# -*- coding: utf-8 -*-
"""搜索 + 评估 · 任意 query → 多渠道搜索去重 → LLM 逐帖评估 → search_data 表
================================================================================
引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。
用法(一般由 server.py 起子进程调):
python pipeline/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做"
python pipeline/search_eval.py --query-id q0005 --query "GPT image2 评测" \
--synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[3] # …/Agent
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv()
from examples.process_pipeline.script.search_eval.search_and_evaluate import (
search_all, evaluate_posts, transcribe_video_posts, build_query_overrides,
)
from examples.process_pipeline.script.llm_evaluate_sources import (
build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL,
)
HERE = Path(__file__).resolve().parent
MW = HERE.parent
sys.path.insert(0, str(MW))
import db
async def run(args):
phrasings = [args.query] + [s.strip() for s in (args.synonyms or "").split(",") if s.strip()]
# 去重保序
seen, uniq = set(), []
for q in phrasings:
if q not in seen:
seen.add(q); uniq.append(q)
phrasings = uniq
platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
eval_llm, eval_model_id = build_eval_llm_call(args.eval_model)
print(f"▶ {args.query_id} query={args.query!r} 措辞={phrasings} 渠道={platforms}")
overrides = await build_query_overrides(platforms, phrasings, eval_llm, eval_model_id)
sources = await search_all(platforms, phrasings, args.max_count,
args.max_concurrent, query_overrides=overrides)
print(f"🔎 去重后 {len(sources)} 帖")
if not sources:
print("❌ 搜索无结果"); return 1
try:
from examples.process_pipeline.script.extract_sources import _convert_timestamps
_convert_timestamps(sources)
except Exception:
pass
if not args.no_transcribe:
n = await transcribe_video_posts(sources, concurrency=args.max_concurrent)
if n:
print(f"🎙️ 视频转写 {n} 条")
cost = 0.0
if not args.no_eval:
sources, cost = await evaluate_posts(
sources, "", eval_llm, eval_model_id, args.max_concurrent,
include_images=not args.no_images, max_images=args.max_images,
image_mode=args.image_mode, query=args.query,
)
for s in sources:
s.pop("_image_data_urls", None)
n = db.upsert_search_posts(args.query_id, args.query, sources)
print(f"🗄️ search_data 入库 {n} 行 · 评估成本 ${cost:.4f}")
out_dir = MW / "runs" / "search"
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / f"{args.query_id}.json").write_text(json.dumps({
"query_id": args.query_id, "query": args.query, "phrasings": phrasings,
"platforms": platforms, "total": len(sources), "results": sources,
}, ensure_ascii=False, indent=2), encoding="utf-8")
return 0
def main():
p = argparse.ArgumentParser(description="搜索+评估 → search_data")
p.add_argument("--query-id", required=True, help="如 q0004(server 自动分配)")
p.add_argument("--query", required=True, help="基准 query(评估锚点)")
p.add_argument("--synonyms", default="", help="逗号分隔的同义措辞(可选)")
p.add_argument("--platforms", default="xhs,gzh")
p.add_argument("--max-count", type=int, default=10)
p.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS))
p.add_argument("--max-concurrent", type=int, default=3)
p.add_argument("--max-images", type=int, default=4)
p.add_argument("--image-mode", choices=["url", "base64"], default="url")
p.add_argument("--no-transcribe", action="store_true")
p.add_argument("--no-eval", action="store_true")
p.add_argument("--no-images", action="store_true")
args = p.parse_args()
raise SystemExit(asyncio.run(run(args)))
if __name__ == "__main__":
main()
最小验证(不花钱):
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python pipeline/search_eval.py --help
Expected: 正常打出 usage(证明 import 链 OK——这条会触发 agent.tools 注册链,import 错误在这暴露)。
完整冒烟(可选,~1-3 分钟、~$0.05):
python pipeline/search_eval.py --query-id q9999 --query "GPT image2 评测" --platforms xhs --max-count 3
python3 -c "import db; print(len(db.fetch_posts('q9999')), '帖')"
Expected: 入库 ≥1 行。验完可清理:python3 -c "import db; c=db._conn(); c.cursor().execute(\"DELETE FROM search_data WHERE query_id='q9999'\"); c.close()"
git add examples/mode_workflow/pipeline/search_eval.py
git commit -m "feat(mode_workflow): 搜索评估 pipeline(任意 query → search_data)"
Files:
examples/mode_workflow/server.pyAPI 契约(index.html 依赖,Task 8 不得偏离):
| Method+Path | 入参 | 返回 |
|---|---|---|
GET / |
- | index.html |
GET /api/dashboard |
- | 见下方 _dashboard() 返回结构 |
GET /api/queries |
- | [{query_id, query_text, post_count, process_done, tools_done}] |
GET /api/posts |
query_id |
[{...search_data 行, has_process, has_tools}] |
GET /api/process_versions |
case_id |
[{version, n, model}] |
GET /api/process |
case_id, version? |
{case_id, version, model, source, procedures:[...]} 或 404 |
GET /api/tools_versions |
case_id |
[{version, n, model}] |
GET /api/tools |
case_id, version? |
{case_id, version, model, tool_count, tools:[...]} 或 404 |
POST /api/extract_process |
{query_id, case_ids:[..], model?} |
{task_id} |
POST /api/extract_tools |
{query_id, case_ids:[..], model?} |
{task_id} |
POST /api/run_search |
{query, synonyms?, platforms?, max_count?} |
{task_id, query_id}(query_id 自动分配下一个 qNNNN) |
GET /api/task_status |
task_id |
{status: running\|done\|failed, log_tail} |
# -*- coding: utf-8 -*-
"""mode_workflow server · 页面 + API + 解构任务管理
================================================================================
单服务(默认 8772):
- GET / index.html
- GET /api/dashboard Dashboard 全部聚合指标(含内容树覆盖)
- GET /api/queries|posts|process|tools(+_versions) Dataset 数据
- POST /api/run_search|extract_process|extract_tools 起子进程跑 pipeline
- GET /api/task_status 轮询任务状态(读日志尾部)
用法:python server.py [port]
"""
import json
import subprocess
import sys
import threading
from collections import Counter
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import urlparse, parse_qs
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(HERE))
import db
PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772
MATRIX_FILE = HERE / "reference" / "judged_matrix.json"
LOG_DIR = HERE / "runs" / "logs"
# ── 任务管理:task_id → {proc, log, status} ──────────────────────────────────
TASKS = {}
_TASK_LOCK = threading.Lock()
def _spawn_task(kind, cmd):
LOG_DIR.mkdir(parents=True, exist_ok=True)
task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}"
log_path = LOG_DIR / f"{task_id}.log"
f = open(log_path, "w", encoding="utf-8")
proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT,
cwd=str(HERE), text=True)
with _TASK_LOCK:
TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running"}
def _wait():
rc = proc.wait()
f.close()
with _TASK_LOCK:
TASKS[task_id]["status"] = "done" if rc == 0 else "failed"
threading.Thread(target=_wait, daemon=True).start()
return task_id
def _task_status(task_id):
with _TASK_LOCK:
t = TASKS.get(task_id)
if not t:
return None
tail = ""
try:
text = t["log"].read_text(encoding="utf-8", errors="replace")
tail = text[-3000:]
except Exception:
pass
return {"status": t["status"], "log_tail": tail}
def _next_query_id():
qs = [q["query_id"] for q in db.fetch_queries()]
nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()]
return f"q{(max(nums) + 1 if nums else 0):04d}"
# ── Dashboard 聚合 ────────────────────────────────────────────────────────────
def _split_values(v):
"""substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。"""
out = []
items = v if isinstance(v, list) else [v]
for it in items:
if not it or not isinstance(it, str):
continue
for piece in it.replace(",", "、").replace("/", "、").split("、"):
piece = piece.strip()
if piece:
out.append(piece)
return out
def _dashboard():
posts, procs, tools = db.fetch_dashboard_rows()
# 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)
def latest(rows):
best = {}
for r in rows:
cid = r["case_id"]
if cid not in best or (r["version"] or "") > (best[cid][0] or ""):
best[cid] = (r["version"], [])
out = []
for r in rows:
if r["version"] == best[r["case_id"]][0]:
out.append(r)
return out
latest_procs = latest(procs)
latest_tools = latest(tools)
# 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1)
jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8"))
a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])}
t_idx = {t["name"]: i for i, t in enumerate(jm["types"])}
valid = set()
for ai, row in enumerate(jm["matrix"]):
for ti, cell in enumerate(row):
if isinstance(cell, dict) and cell.get("tier", 0) >= 1:
valid.add((ai, ti))
covered = set()
via_counter = Counter()
substance_counter = Counter()
form_counter = Counter()
for r in latest_procs:
for s in r["steps"]:
leaf = (s.get("action") or "").split("/")[-1].strip()
types = []
for io in ("inputs", "outputs"):
for x in s.get(io) or []:
if isinstance(x, dict) and x.get("type"):
types.append(str(x["type"]).strip())
if leaf in a_idx:
for tp in types:
if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid:
covered.add((a_idx[leaf], t_idx[tp]))
via = (s.get("via") or "").strip()
if via:
via_counter[via] += 1
for v in _split_values(s.get("substance")):
substance_counter[v] += 1
for v in _split_values(s.get("form")):
form_counter[v] += 1
for r in latest_tools:
for v in _split_values(r["substance_scope"]):
substance_counter[v] += 1
for v in _split_values(r["form_scope"]):
form_counter[v] += 1
# 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值)
def cost_groups(rows):
g = {}
for r in rows:
key = (r["case_id"], r["version"])
if key not in g and r["cost_usd"] is not None:
g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"])
return list(g.values())
runs = cost_groups(procs) + cost_groups(tools)
total_cost = round(sum(c for c, _, _ in runs), 4)
total_dur = round(sum(d for _, d, _ in runs), 1)
# 按日成本趋势
daily = Counter()
for c, _, ts in runs:
if ts:
daily[ts[:10]] += c
cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())]
# 进度:分母 = knowledge_type 含对应类型的帖子(distinct case)
proc_targets = {p["case_id"] for p in posts if "工序" in (p["knowledge_type"] or [])}
tool_targets = {p["case_id"] for p in posts if "工具" in (p["knowledge_type"] or [])}
proc_done = {r["case_id"] for r in procs}
tool_done = {r["case_id"] for r in tools}
return {
"result": {
"matrix_covered": len(covered), "matrix_valid": len(valid),
"matrix_cells": sorted([ai, ti] for ai, ti in covered),
"matrix_actions": [a["name"] for a in jm["actions"]],
"matrix_types": [t["name"] for t in jm["types"]],
"substance_count": len(substance_counter),
"substance_top": substance_counter.most_common(15),
"form_count": len(form_counter),
"form_top": form_counter.most_common(15),
"post_count": len(posts),
"extracted_post_count": len(proc_done | tool_done),
"tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}),
"via_top10": via_counter.most_common(10),
},
"process_data": {
"run_count": len(runs),
"avg_cost": round(total_cost / len(runs), 4) if runs else 0,
"total_cost": total_cost,
"avg_duration": round(total_dur / len(runs), 1) if runs else 0,
"total_duration": total_dur,
"cost_trend": cost_trend,
"process_progress": {"done": len(proc_done), "total": len(proc_targets)},
"tools_progress": {"done": len(tool_done), "total": len(tool_targets)},
},
}
# ── HTTP handler ─────────────────────────────────────────────────────────────
class Handler(BaseHTTPRequestHandler):
def _json(self, data, code=200):
body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _err(self, msg, code=400):
self._json({"error": msg}, code)
def do_GET(self):
u = urlparse(self.path)
qs = {k: v[0] for k, v in parse_qs(u.query).items()}
try:
if u.path == "/" or u.path == "/index.html":
body = (HERE / "index.html").read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
elif u.path == "/api/dashboard":
self._json(_dashboard())
elif u.path == "/api/queries":
self._json(db.fetch_queries())
elif u.path == "/api/posts":
self._json(db.fetch_posts(qs.get("query_id", "")))
elif u.path == "/api/process_versions":
self._json(db.fetch_process_versions(qs.get("case_id", "")))
elif u.path == "/api/process":
r = db.fetch_process(qs.get("case_id", ""), qs.get("version"))
self._json(r) if r else self._err("无解构记录", 404)
elif u.path == "/api/tools_versions":
self._json(db.fetch_tools_versions(qs.get("case_id", "")))
elif u.path == "/api/tools":
r = db.fetch_tools(qs.get("case_id", ""), qs.get("version"))
self._json(r) if r else self._err("无解构记录", 404)
elif u.path == "/api/task_status":
r = _task_status(qs.get("task_id", ""))
self._json(r) if r else self._err("未知 task_id", 404)
else:
self._err("not found", 404)
except Exception as e:
self._err(f"{type(e).__name__}: {e}", 500)
def do_POST(self):
u = urlparse(self.path)
try:
n = int(self.headers.get("Content-Length") or 0)
payload = json.loads(self.rfile.read(n) or b"{}")
except Exception:
return self._err("body 必须是 JSON")
try:
if u.path in ("/api/extract_process", "/api/extract_tools"):
qid = payload.get("query_id")
cids = payload.get("case_ids") or []
if not qid or not cids:
return self._err("缺 query_id / case_ids")
script = ("pipeline/procedure_extract.py" if u.path.endswith("process")
else "pipeline/tool_extract.py")
cmd = [sys.executable, script, "--query-id", qid,
"--case-ids", ",".join(cids)]
if payload.get("model"):
cmd += ["--model", payload["model"]]
kind = "proc" if u.path.endswith("process") else "tool"
self._json({"task_id": _spawn_task(kind, cmd)})
elif u.path == "/api/run_search":
query = (payload.get("query") or "").strip()
if not query:
return self._err("缺 query")
qid = payload.get("query_id") or _next_query_id()
cmd = [sys.executable, "pipeline/search_eval.py",
"--query-id", qid, "--query", query]
if payload.get("synonyms"):
cmd += ["--synonyms", payload["synonyms"]]
if payload.get("platforms"):
cmd += ["--platforms", payload["platforms"]]
if payload.get("max_count"):
cmd += ["--max-count", str(payload["max_count"])]
self._json({"task_id": _spawn_task("search", cmd), "query_id": qid})
else:
self._err("not found", 404)
except Exception as e:
self._err(f"{type(e).__name__}: {e}", 500)
def log_message(self, fmt, *a):
pass # 静默访问日志
if __name__ == "__main__":
print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}")
ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python server.py 8772 &
sleep 1
curl -s localhost:8772/api/queries | python3 -m json.tool | head -20
curl -s localhost:8772/api/dashboard | python3 -c "
import json,sys
d = json.load(sys.stdin)
print('采集帖子:', d['result']['post_count'])
print('内容树:', d['result']['matrix_covered'], '/', d['result']['matrix_valid'])
print('进度:', d['process_data'])
"
curl -s "localhost:8772/api/posts?query_id=q0000" | python3 -c "import json,sys; r=json.load(sys.stdin); print(len(r),'帖, 首帖:', r[0]['case_id'], r[0]['has_process'], r[0]['has_tools'])"
curl -s "localhost:8772/api/process?case_id=不存在" ; echo
kill %1
Expected: queries 返回 q0000-q0003;dashboard 的 post_count>0、matrix_valid==643;posts 返回该 query 帖子;不存在的 case 返回 {"error": "无解构记录"} 404。若 Task 4/5 冒烟跑过,对应 case 的 process/tools 端点也应返回数据。
curl -s -X POST localhost:8772/api/extract_tools -d '{}' ; echo
Expected: {"error": "缺 query_id / case_ids"}。(真实解构任务在 Task 9 端到端验收时从 UI 触发。)
git add examples/mode_workflow/server.py
git commit -m "feat(mode_workflow): server(API+任务管理+Dashboard聚合)"
Files:
examples/mode_workflow/index.html实现方式: 本任务必须先invoke frontend-design:frontend-design 技能再写代码——产出单文件 HTML(原生 JS,ECharts 5 CDN:https://cdn.jsdelivr.net/npm/echarts@5/dist/echarts.min.js),禁止引入构建链。API 契约见 Task 7 表格,字段名不得偏离。
视觉方向(延续用户截图的既有风格): 白底卡片 + 深色(藏青/石板)表头多色分区表格;标签 pill(类型彩色底);需求区深蓝表头、输入区橙黄表头、输出区绿表头的分区配色;推断字段用「推」角标 + 高亮底色,hover 显示 inferred_reason。整体配中文界面。
页面结构与行为(验收依据):
顶部导航:Dashboard / Dataset / 聚类库 三 tab(hash 路由 #dashboard #dataset #cluster,刷新保位)。
Dashboard tab(数据源 GET /api/dashboard):
covered/valid + 百分比)。$total_cost,副标题单条均值 $avg_cost)、总耗时(副标题平均耗时)、工序进度环(done/total)、工具进度环。matrix_cells/matrix_actions/matrix_types)。via_top10)。substance_top,标题带总数 substance_count)。form_top + form_count)。cost_trend)。Dataset tab:顶部「工序 / 工具」子切换 + 「新建搜索」按钮;三栏:
/api/queries):query_text + 帖子数 + 解构进度(process_done 或 tools_done 随子模式切换);点击选中。/api/posts?query_id=):卡片含标题、平台徽标(xhs/gzh/zhihu/x)、overall_score、knowledge_type pill、已解构标记(has_process/has_tools);卡片上「解构」按钮 + 多选批量解构;点击卡片在右栏加载。/api/process_versions,格式 v_xxx (最新) · N工序)+「重新生成」按钮;每个工序渲染:工序名 + 目的 + 类别 pill + 平台/作者;输入/返回声明区;步骤明细表(分区表头:需求[#/目的/作用/实质/形式] 深蓝、输入[类型/值/来源] 橙、实现[外部工具/动作/指令] 绿、输出[类型/值/去处] 深绿),inferred:true 的单元格高亮 +「推」角标,hover title 显示 inferred_reason。/api/task_status(2s 间隔),显示 log_tail,done 后自动刷新当前栏;failed 显示日志。/api/run_search → 任务面板,完成后左栏刷新出现新 query。聚类库 tab:居中空态占位(图标 + 「聚类库 · 敬请期待」)。
最小骨架(代码结构基线,frontend-design 在此骨架上完成完整实现):
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="utf-8">
<title>mode_workflow</title>
<script src="https://cdn.jsdelivr.net/npm/echarts@5/dist/echarts.min.js"></script>
<style>/* frontend-design 产出 */</style>
</head>
<body>
<nav id="tabs"><!-- Dashboard / Dataset / 聚类库 --></nav>
<main id="view-dashboard" class="view"></main>
<main id="view-dataset" class="view">
<aside id="query-list"></aside>
<section id="post-list"></section>
<section id="extract-panel"></section>
</main>
<main id="view-cluster" class="view"></main>
<div id="task-panel" hidden></div>
<script>
const api = (p, opt) => fetch(p, opt).then(async r => {
if (!r.ok) throw Object.assign(new Error(), {status: r.status, body: await r.json().catch(() => ({}))});
return r.json();
});
const state = {tab: 'dashboard', mode: 'process', queryId: null, caseId: null, version: null};
function route() { state.tab = (location.hash || '#dashboard').slice(1); render(); }
window.addEventListener('hashchange', route);
async function pollTask(taskId, onDone) {
const t = await api(`/api/task_status?task_id=${taskId}`);
renderTaskPanel(t);
if (t.status === 'running') setTimeout(() => pollTask(taskId, onDone), 2000);
else if (t.status === 'done') onDone();
}
// render() / renderDashboard() / renderDataset() / renderTaskPanel() … frontend-design 产出
route();
</script>
</body>
</html>
[ ] Step 1: invoke frontend-design 技能,按上述规格产出完整 index.html
[ ] Step 2: 浏览器验收
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python server.py 8772 &
打开 http://localhost:8772,核对上面"页面结构与行为"逐条:三 tab 切换、Dashboard 全部卡片与 5 张图渲染、Dataset 三栏联动、版本下拉、空态、聚类库占位。若 Task 4/5 冒烟跑过,选对应帖子确认工序表格分区配色与「推」高亮、工具卡片完整。
git add examples/mode_workflow/index.html
git commit -m "feat(mode_workflow): 三tab单页前端(Dashboard/Dataset/聚类库)"
Files:
Create: examples/mode_workflow/README.md
[ ] Step 1: 写 README
# mode_workflow · 搜索评估 + 工序/工具解构工作台
MySQL 三表(search_data / mode_process / mode_tools)为唯一事实源的单页工作台:
Dashboard(结果/过程指标可视化)、Dataset(query → 帖子 → 工序/工具解构)、聚类库(占位)。
设计文档:`docs/superpowers/specs/2026-06-12-mode-workflow-design.md`
## 启动
```bash
# 0. 前置:.env 配 MYSQL_* 与 OPEN_ROUTER_API_KEY;pip install -e .
python db.py init # 建三张表(幂等)
python import_history.py # (可选)导入 fixed_query_eval 历史搜索结果
python server.py # http://localhost:8772
| 文件 | 职责 |
|---|---|
db.py |
三表 DDL + 全部读写(读 .env MYSQL_*) |
server.py |
页面 + API + 解构任务子进程管理(端口 8772) |
index.html |
单文件前端:Dashboard / Dataset / 聚类库 |
pipeline/search_eval.py |
任意 query 搜索+评估 → search_data |
pipeline/procedure_extract.py |
工序解构(LLM 直出)→ mode_process |
pipeline/tool_extract.py |
工具解构 → mode_tools |
prompts/ |
工序/工具解构 system prompt(可单独迭代) |
reference/judged_matrix.json |
内容树(27 动作×50 类型),Dashboard 覆盖度用 |
runs/ |
运行日志与调试副本(gitignore) |
搜索/评估/转写引擎函数只读复用
examples/process_pipeline/script/search_eval/search_and_evaluate.py,本目录不复制引擎代码。
取代 fixed_query_eval(8770)+ mode_procedure(8771)两套服务的"搜索评估 + 大模型解构"
部分;procedure-dsl 执行引擎、mode-dsl 模式提取、A/B/C 三形式对比未迁移(按设计裁剪)。
- [ ] **Step 2: 端到端验收(需 API key)**
```bash
cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
python server.py 8772 &
浏览器走完整链路:
google/gemini-3.1-flash-lite 控成本)→ 右栏出工序分区表格。git add examples/mode_workflow/README.md
git commit -m "docs(mode_workflow): README 与启动说明"
git status examples/mode_workflow # runs/ 不应出现在未跟踪列表(被 .gitignore 覆盖)
prompts/eval_prompt.md 一项经核实不需要(rubric 固化在引擎内),已在计划头部标注修正。replace_process(payload) 与 fetch_process() 的 procedures 字段对称;server 调 db.fetch_* 名称与 Task 2 定义一致;pipeline 脚本参数名与 server _spawn_task 构造的 cmd 一致(--query-id/--case-ids/--model);_dashboard() 返回字段与 Task 8 前端读取字段一致(matrix_cells/via_top10/substance_top/cost_trend/process_progress 等)。