Browse Source

chore(mode_workflow): 整理目录结构,归档一次性脚本

- pipeline/ 改名 stages/(搜索→评估→解构→上传四阶段),并入 import_process_knowledge.py;
  该脚本原为裸 import db,补 sys.path 修复;同步 server.py 子进程硬编码、README、
  流程执行手册、各脚本 docstring、db.py 注释的全部引用
- 一次性重评/对比脚本(_reeval_one/_batch_reeval_q0000/q0020/eval_compare)移至
  gitignored runs/ 归档(脱离追踪,路径锚点已改为 runs/ 约定)
- 移除已废弃的 import_history.py 及 README 启动步骤中的相关说明
- 取消追踪误提交的运行时文件 .server_8772.out,清理空日志残留

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
刘文武 1 day ago
parent
commit
31daefe53c

+ 0 - 0
examples/mode_workflow/.server_8772.out


+ 12 - 13
examples/mode_workflow/README.md

@@ -10,7 +10,6 @@ Dashboard(结果/过程指标可视化)、Dataset(query → 帖子 → 工序/
 ```bash
 ```bash
 # 0. 前置:.env 配 MYSQL_* 与 OPEN_ROUTER_API_KEY;pip install -e .
 # 0. 前置:.env 配 MYSQL_* 与 OPEN_ROUTER_API_KEY;pip install -e .
 python db.py init             # 建四张表(幂等);db.py clear 清空数据
 python db.py init             # 建四张表(幂等);db.py clear 清空数据
-python import_history.py      # (可选)导入 fixed_query_eval 历史搜索结果
 python server.py              # http://localhost:8772
 python server.py              # http://localhost:8772
 ```
 ```
 
 
@@ -21,10 +20,10 @@ python server.py              # http://localhost:8772
 | `db.py` | 四表 DDL + 全部读写(读 .env MYSQL_*);连接走 `PooledDB` 池(远程 RDS 每次握手 ~0.5s,池复用避免每请求重连) |
 | `db.py` | 四表 DDL + 全部读写(读 .env MYSQL_*);连接走 `PooledDB` 池(远程 RDS 每次握手 ~0.5s,池复用避免每请求重连) |
 | `server.py` | 页面 + API + 解构任务子进程管理(端口 8772);`/api/dashboard` 结果带缓存(任务完成时作废 + 60s 兜底 TTL),`/api/extract` 等带 ETag/304 |
 | `server.py` | 页面 + API + 解构任务子进程管理(端口 8772);`/api/dashboard` 结果带缓存(任务完成时作废 + 60s 兜底 TTL),`/api/extract` 等带 ETag/304 |
 | `index.html` | 单文件前端:Dashboard / Dataset / 聚类库 |
 | `index.html` | 单文件前端:Dashboard / Dataset / 聚类库 |
-| `pipeline/search_eval.py` | 任意 query 搜索+评估 → search_process / search_tools(按解构方向分表) |
-| `pipeline/procedure_extract.py` | 工序解构(LLM 直出)→ mode_process |
-| `pipeline/tool_extract.py` | 工具解构 → mode_tools |
-| `import_process_knowledge.py` | 已采纳工序(mode_process 最新版)→ 知识导入接口;**读 DB 非本地文件**,采纳口径同 Dashboard(`db.is_adopted_rel`) |
+| `stages/search_eval.py` | 任意 query 搜索+评估 → search_process / search_tools(按解构方向分表) |
+| `stages/procedure_extract.py` | 工序解构(LLM 直出)→ mode_process |
+| `stages/tool_extract.py` | 工具解构 → mode_tools |
+| `stages/import_process_knowledge.py` | 已采纳工序(mode_process 最新版)→ 知识导入接口;**读 DB 非本地文件**,采纳口径同 Dashboard(`db.is_adopted_rel`) |
 | `prompts/` | 工序/工具解构 system prompt(可单独迭代) |
 | `prompts/` | 工序/工具解构 system prompt(可单独迭代) |
 | `reference/judged_matrix.json` | 内容树(27 动作×50 类型),Dashboard 覆盖度用 |
 | `reference/judged_matrix.json` | 内容树(27 动作×50 类型),Dashboard 覆盖度用 |
 | `runs/` | 运行日志与调试副本(gitignore):search_process / search_tools / mode_process / mode_tools / logs |
 | `runs/` | 运行日志与调试副本(gitignore):search_process / search_tools / mode_process / mode_tools / logs |
@@ -35,8 +34,8 @@ python server.py              # http://localhost:8772
 ## 数据流
 ## 数据流
 
 
 ```
 ```
-新建搜索(UI) → server 子进程 pipeline/search_eval.py → search_process / search_tools(方向分表)
-选帖解构(UI) → server 子进程 pipeline/{procedure,tool}_extract.py → mode_process / mode_tools
+新建搜索(UI) → server 子进程 stages/search_eval.py → search_process / search_tools(方向分表)
+选帖解构(UI) → server 子进程 stages/{procedure,tool}_extract.py → mode_process / mode_tools
 Dashboard    → /api/dashboard 实时聚合四表(内容树覆盖按 steps 的 action×type 命中有效节点)
 Dashboard    → /api/dashboard 实时聚合四表(内容树覆盖按 steps 的 action×type 命中有效节点)
 ```
 ```
 
 
@@ -45,7 +44,7 @@ Dashboard    → /api/dashboard 实时聚合四表(内容树覆盖按 steps 的
 聚合统计时按该键去重(见 `server.py:_dashboard` 的 `cost_groups`)。
 聚合统计时按该键去重(见 `server.py:_dashboard` 的 `cost_groups`)。
 
 
 **解构前按 case 全局去重(省钱):** `case_id` 是帖子物理身份,与 query 无关。同一帖被多个
 **解构前按 case 全局去重(省钱):** `case_id` 是帖子物理身份,与 query 无关。同一帖被多个
-query 搜到时只真实解构一次——`pipeline/{procedure,tool}_extract.py` 在调 LLM 前先查
+query 搜到时只真实解构一次——`stages/{procedure,tool}_extract.py` 在调 LLM 前先查
 `db.latest_real_version(case_id)`,已解构过的帖跨 query 用 `db.link_process` 复制 `link_*`
 `db.latest_real_version(case_id)`,已解构过的帖跨 query 用 `db.link_process` 复制 `link_*`
 行补齐关联(`cost=0`),不再付费重跑。换 prompt/模型要对比时传 `--force`(API `force:true`)
 行补齐关联(`cost=0`),不再付费重跑。换 prompt/模型要对比时传 `--force`(API `force:true`)
 跳过去重。`runs/backfill_links.py` 是事后扫尾工具,复用同一 `link_process`。
 跳过去重。`runs/backfill_links.py` 是事后扫尾工具,复用同一 `link_process`。
@@ -59,11 +58,11 @@ query 搜到时只真实解构一次——`pipeline/{procedure,tool}_extract.py`
 `Downloads/import/how_process_knowledge/main.py` 一致(steps→scopes/custom_ext)。
 `Downloads/import/how_process_knowledge/main.py` 一致(steps→scopes/custom_ext)。
 
 
 ```bash
 ```bash
-python import_process_knowledge.py --dry-run            # 只取数+组装 payload,不调接口(先验证)
-python import_process_knowledge.py --dry-run -v         # 同上,打印完整 payload JSON
-python import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
-python import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
-python import_process_knowledge.py                      # 真实导入(去掉 --dry-run)
+python stages/import_process_knowledge.py --dry-run            # 只取数+组装 payload,不调接口(先验证)
+python stages/import_process_knowledge.py --dry-run -v         # 同上,打印完整 payload JSON
+python stages/import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+python stages/import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
+python stages/import_process_knowledge.py                      # 真实导入(去掉 --dry-run)
 # 其它:--api-url <根地址>(默认 47.236.83.130:8001)  --delay <毫秒>(调用间隔,默认 100)
 # 其它:--api-url <根地址>(默认 47.236.83.130:8001)  --delay <毫秒>(调用间隔,默认 100)
 ```
 ```
 
 

+ 0 - 141
examples/mode_workflow/_batch_reeval_q0000.py

@@ -1,141 +0,0 @@
-# -*- coding: utf-8 -*-
-"""批量重评 q0000 下当前【命中(is_adopted)】的帖子,用 flash-lite+sonnet 组合(模糊带升级),
-跑完定向替换 DB 的得分相关字段(overall_score / knowledge_type / llm_evaluation)。
-先备份旧值到 runs/search_process/q0000.score_backup.<ts>.json,可回滚。"""
-import asyncio, copy, json, sys
-from datetime import datetime
-from pathlib import Path
-
-PROJECT_ROOT = Path(__file__).resolve().parents[3]
-sys.path.insert(0, str(PROJECT_ROOT))
-from dotenv import load_dotenv
-load_dotenv()
-
-MW = Path(__file__).resolve().parent
-sys.path.insert(0, str(MW))
-import db
-from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts
-from examples.process_pipeline.script.llm_evaluate_sources import (
-    _EVAL_PRODUCT_FIELDS, build_eval_llm_call,
-)
-
-QUERY_ID = "q0000"
-TABLE = "search_process"
-INIT_MODEL = "gemini-flash-lite"
-ESC_MODEL = "sonnet"
-BAND = (4.0, 6.0)
-
-
-def _load_db_rows():
-    conn = db._conn()
-    try:
-        with conn.cursor() as c:
-            c.execute(f"SELECT case_id, overall_score, knowledge_type, publish_time, "
-                      f"llm_evaluation FROM {TABLE} WHERE query_id=%s", (QUERY_ID,))
-            return c.fetchall()
-    finally:
-        conn.close()
-
-
-def _update_scores(case_id, overall, knowledge_type, evaluation):
-    conn = db._conn()
-    try:
-        with conn.cursor() as c:
-            c.execute(
-                f"UPDATE {TABLE} SET overall_score=%s, knowledge_type=%s, llm_evaluation=%s, "
-                f"updated_at=CURRENT_TIMESTAMP WHERE query_id=%s AND case_id=%s",
-                (overall, db._j(knowledge_type or []), db._j(evaluation), QUERY_ID, case_id))
-    finally:
-        conn.close()
-
-
-async def main():
-    rows = _load_db_rows()
-    def _ev(r):
-        e = r["llm_evaluation"]
-        return json.loads(e) if isinstance(e, str) else (e or {})
-    adopted = [r for r in rows if db.is_adopted(r["overall_score"], _ev(r), r["publish_time"])]
-    adopted_ids = {r["case_id"] for r in adopted}
-    print(f"q0000 共 {len(rows)} 帖,当前命中 {len(adopted)} 帖 → 重评这些\n")
-
-    # 备份旧得分字段
-    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
-    backup = [{"case_id": r["case_id"], "overall_score": r["overall_score"],
-               "knowledge_type": r["knowledge_type"], "publish_time": r["publish_time"],
-               "llm_evaluation": _ev(r)} for r in adopted]
-    bpath = MW / "runs" / TABLE / f"{QUERY_ID}.score_backup.{ts}.json"
-    bpath.write_text(json.dumps(backup, ensure_ascii=False, indent=2), encoding="utf-8")
-    print(f"💾 旧得分已备份 → {bpath.name}\n")
-
-    # 从 runs json 取完整帖子(含配图)作为重评输入
-    data = json.loads((MW / "runs" / TABLE / f"{QUERY_ID}.json").read_text(encoding="utf-8"))
-    query = data.get("query", "")
-    by_id = {s["case_id"]: s for s in data.get("results", [])}
-    missing = [cid for cid in adopted_ids if cid not in by_id]
-    if missing:
-        print(f"⚠️ runs json 缺 {len(missing)} 条,将跳过: {missing}")
-    targets = []
-    for cid in adopted_ids:
-        if cid not in by_id:
-            continue
-        s = copy.deepcopy(by_id[cid])
-        for k in _EVAL_PRODUCT_FIELDS:
-            s.pop(k, None)
-        s.pop("_image_data_urls", None)
-        targets.append(s)
-
-    eval_llm, eval_model = build_eval_llm_call(INIT_MODEL)
-    esc_llm, esc_model = build_eval_llm_call(ESC_MODEL)
-    print(f"🧠 组合评估:{eval_model} 初评 → {esc_model} 复核(带 [{BAND[0]:g},{BAND[1]:g}])\n")
-    sources, cost = await evaluate_posts(
-        targets, "", eval_llm, eval_model, max_concurrent=4,
-        include_images=True, max_images=4, image_mode="url", query=query,
-        escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=BAND)
-
-    # 旧分查表
-    old_by_id = {r["case_id"]: r for r in adopted}
-    report = []
-    for s in sources:
-        cid = s["case_id"]
-        ev = s["llm_evaluation"]
-        if not isinstance(ev, dict) or ev.get("_error"):
-            print(f"   ⚠️ 评估失败,跳过更新: {cid}")
-            continue
-        kt = ev.get("知识类型") or []
-        ov = db.overall_score(ev)
-        pub = (s.get("post") or {}).get("publish_timestamp") or old_by_id[cid]["publish_time"]
-        new_adopt = db.is_adopted(ov, ev, pub)
-        _update_scores(cid, ov, kt, ev)            # 定向替换 DB
-        by_id[cid]["llm_evaluation"] = ev          # 同步 runs json
-        report.append({
-            "case_id": cid, "escalated": bool(s.get("_escalated")),
-            "old_overall": old_by_id[cid]["overall_score"], "new_overall": ov,
-            "repro": db._fixed_dim_score(ev, "可复现性"),
-            "intent": db._fixed_dim_score(ev, "意图可控性"),
-            "new_adopted": new_adopt,
-            "title": (s.get("post") or {}).get("title", "")[:22],
-        })
-
-    # 同步 runs json
-    (MW / "runs" / TABLE / f"{QUERY_ID}.json").write_text(
-        json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
-
-    # 报告
-    print("\n" + "=" * 92)
-    print(f"{'case_id':26} {'升级':4} {'旧综':>5} {'新综':>5} {'复现':>4} {'意图':>4} {'命中':>5}  标题")
-    still = 0
-    for r in sorted(report, key=lambda x: x["new_overall"]):
-        still += int(r["new_adopted"])
-        print(f"{r['case_id'][:26]:26} {'★' if r['escalated'] else ' ':^4} "
-              f"{(r['old_overall'] or 0):5.2f} {(r['new_overall'] or 0):5.2f} "
-              f"{str(r['repro']):>4} {str(r['intent']):>4} "
-              f"{'是' if r['new_adopted'] else '否':>4}  {r['title']}")
-    esc_n = sum(r["escalated"] for r in report)
-    print("=" * 92)
-    print(f"重评 {len(report)} 帖 · 升级 sonnet {esc_n} 帖 · 命中 {len(adopted)}→{still} · "
-          f"总成本 ${cost:.4f}")
-    print(f"DB 已更新,旧值备份在 {bpath.name}")
-
-
-if __name__ == "__main__":
-    asyncio.run(main())

+ 0 - 141
examples/mode_workflow/_batch_reeval_q0020.py

@@ -1,141 +0,0 @@
-# -*- coding: utf-8 -*-
-"""批量重评 q0000 下当前【命中(is_adopted)】的帖子,用 flash-lite+sonnet 组合(模糊带升级),
-跑完定向替换 DB 的得分相关字段(overall_score / knowledge_type / llm_evaluation)。
-先备份旧值到 runs/search_process/q0000.score_backup.<ts>.json,可回滚。"""
-import asyncio, copy, json, sys
-from datetime import datetime
-from pathlib import Path
-
-PROJECT_ROOT = Path(__file__).resolve().parents[3]
-sys.path.insert(0, str(PROJECT_ROOT))
-from dotenv import load_dotenv
-load_dotenv()
-
-MW = Path(__file__).resolve().parent
-sys.path.insert(0, str(MW))
-import db
-from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts
-from examples.process_pipeline.script.llm_evaluate_sources import (
-    _EVAL_PRODUCT_FIELDS, build_eval_llm_call,
-)
-
-QUERY_ID = "q0020"
-TABLE = "search_process"
-INIT_MODEL = "gemini-flash-lite"
-ESC_MODEL = "sonnet"
-BAND = (4.0, 6.0)
-
-
-def _load_db_rows():
-    conn = db._conn()
-    try:
-        with conn.cursor() as c:
-            c.execute(f"SELECT case_id, overall_score, knowledge_type, publish_time, "
-                      f"llm_evaluation FROM {TABLE} WHERE query_id=%s", (QUERY_ID,))
-            return c.fetchall()
-    finally:
-        conn.close()
-
-
-def _update_scores(case_id, overall, knowledge_type, evaluation):
-    conn = db._conn()
-    try:
-        with conn.cursor() as c:
-            c.execute(
-                f"UPDATE {TABLE} SET overall_score=%s, knowledge_type=%s, llm_evaluation=%s, "
-                f"updated_at=CURRENT_TIMESTAMP WHERE query_id=%s AND case_id=%s",
-                (overall, db._j(knowledge_type or []), db._j(evaluation), QUERY_ID, case_id))
-    finally:
-        conn.close()
-
-
-async def main():
-    rows = _load_db_rows()
-    def _ev(r):
-        e = r["llm_evaluation"]
-        return json.loads(e) if isinstance(e, str) else (e or {})
-    adopted = [r for r in rows if db.is_adopted(r["overall_score"], _ev(r), r["publish_time"])]
-    adopted_ids = {r["case_id"] for r in adopted}
-    print(f"q0000 共 {len(rows)} 帖,当前命中 {len(adopted)} 帖 → 重评这些\n")
-
-    # 备份旧得分字段
-    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
-    backup = [{"case_id": r["case_id"], "overall_score": r["overall_score"],
-               "knowledge_type": r["knowledge_type"], "publish_time": r["publish_time"],
-               "llm_evaluation": _ev(r)} for r in adopted]
-    bpath = MW / "runs" / TABLE / f"{QUERY_ID}.score_backup.{ts}.json"
-    bpath.write_text(json.dumps(backup, ensure_ascii=False, indent=2), encoding="utf-8")
-    print(f"💾 旧得分已备份 → {bpath.name}\n")
-
-    # 从 runs json 取完整帖子(含配图)作为重评输入
-    data = json.loads((MW / "runs" / TABLE / f"{QUERY_ID}.json").read_text(encoding="utf-8"))
-    query = data.get("query", "")
-    by_id = {s["case_id"]: s for s in data.get("results", [])}
-    missing = [cid for cid in adopted_ids if cid not in by_id]
-    if missing:
-        print(f"⚠️ runs json 缺 {len(missing)} 条,将跳过: {missing}")
-    targets = []
-    for cid in adopted_ids:
-        if cid not in by_id:
-            continue
-        s = copy.deepcopy(by_id[cid])
-        for k in _EVAL_PRODUCT_FIELDS:
-            s.pop(k, None)
-        s.pop("_image_data_urls", None)
-        targets.append(s)
-
-    eval_llm, eval_model = build_eval_llm_call(INIT_MODEL)
-    esc_llm, esc_model = build_eval_llm_call(ESC_MODEL)
-    print(f"🧠 组合评估:{eval_model} 初评 → {esc_model} 复核(带 [{BAND[0]:g},{BAND[1]:g}])\n")
-    sources, cost = await evaluate_posts(
-        targets, "", eval_llm, eval_model, max_concurrent=4,
-        include_images=True, max_images=4, image_mode="url", query=query,
-        escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=BAND)
-
-    # 旧分查表
-    old_by_id = {r["case_id"]: r for r in adopted}
-    report = []
-    for s in sources:
-        cid = s["case_id"]
-        ev = s["llm_evaluation"]
-        if not isinstance(ev, dict) or ev.get("_error"):
-            print(f"   ⚠️ 评估失败,跳过更新: {cid}")
-            continue
-        kt = ev.get("知识类型") or []
-        ov = db.overall_score(ev)
-        pub = (s.get("post") or {}).get("publish_timestamp") or old_by_id[cid]["publish_time"]
-        new_adopt = db.is_adopted(ov, ev, pub)
-        _update_scores(cid, ov, kt, ev)            # 定向替换 DB
-        by_id[cid]["llm_evaluation"] = ev          # 同步 runs json
-        report.append({
-            "case_id": cid, "escalated": bool(s.get("_escalated")),
-            "old_overall": old_by_id[cid]["overall_score"], "new_overall": ov,
-            "repro": db._fixed_dim_score(ev, "可复现性"),
-            "intent": db._fixed_dim_score(ev, "意图可控性"),
-            "new_adopted": new_adopt,
-            "title": (s.get("post") or {}).get("title", "")[:22],
-        })
-
-    # 同步 runs json
-    (MW / "runs" / TABLE / f"{QUERY_ID}.json").write_text(
-        json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
-
-    # 报告
-    print("\n" + "=" * 92)
-    print(f"{'case_id':26} {'升级':4} {'旧综':>5} {'新综':>5} {'复现':>4} {'意图':>4} {'命中':>5}  标题")
-    still = 0
-    for r in sorted(report, key=lambda x: x["new_overall"]):
-        still += int(r["new_adopted"])
-        print(f"{r['case_id'][:26]:26} {'★' if r['escalated'] else ' ':^4} "
-              f"{(r['old_overall'] or 0):5.2f} {(r['new_overall'] or 0):5.2f} "
-              f"{str(r['repro']):>4} {str(r['intent']):>4} "
-              f"{'是' if r['new_adopted'] else '否':>4}  {r['title']}")
-    esc_n = sum(r["escalated"] for r in report)
-    print("=" * 92)
-    print(f"重评 {len(report)} 帖 · 升级 sonnet {esc_n} 帖 · 命中 {len(adopted)}→{still} · "
-          f"总成本 ${cost:.4f}")
-    print(f"DB 已更新,旧值备份在 {bpath.name}")
-
-
-if __name__ == "__main__":
-    asyncio.run(main())

+ 0 - 109
examples/mode_workflow/_reeval_one.py

@@ -1,109 +0,0 @@
-# -*- coding: utf-8 -*-
-"""一次性:用当前 eval_prompt_template.md 对单条已存帖子重评(复用生产评估链路 evaluate_posts)。
-支持 --escalate-model 演示 sonnet+flash-lite 组合(模糊带升级)。"""
-import argparse, asyncio, json, sys
-from datetime import datetime
-from pathlib import Path
-
-PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
-sys.path.insert(0, str(PROJECT_ROOT))
-from dotenv import load_dotenv
-load_dotenv()
-
-MW = Path(__file__).resolve().parent
-sys.path.insert(0, str(MW))
-import db
-
-from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts
-from examples.process_pipeline.script.llm_evaluate_sources import (
-    _EVAL_PRODUCT_FIELDS, build_eval_llm_call, DEFAULT_EVAL_MODEL,
-)
-
-
-def _load(query_id):
-    return json.loads((MW / "runs" / "search_process" / f"{query_id}.json")
-                      .read_text(encoding="utf-8"))
-
-
-def _save(query_id, data):
-    (MW / "runs" / "search_process" / f"{query_id}.json").write_text(
-        json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
-
-
-async def main():
-    ap = argparse.ArgumentParser()
-    ap.add_argument("--query-id", required=True)
-    ap.add_argument("--case-id", required=True)
-    ap.add_argument("--query", default="")
-    ap.add_argument("--model", default=DEFAULT_EVAL_MODEL)
-    ap.add_argument("--escalate-model", default="")
-    ap.add_argument("--escalate-band", type=float, nargs=2, default=[4.0, 6.0])
-    ap.add_argument("--max-images", type=int, default=4)
-    ap.add_argument("--persist", action="store_true",
-                    help="把新评估写回 DB(overall_score/knowledge_type/llm_evaluation),落库前先备份旧值")
-    a = ap.parse_args()
-
-    data = _load(a.query_id)
-    query = a.query or data.get("query", "")
-    src = next((s for s in data.get("results", []) if s.get("case_id") == a.case_id), None)
-    if not src:
-        raise SystemExit(f"未找到 case_id={a.case_id}")
-    for k in _EVAL_PRODUCT_FIELDS:
-        src.pop(k, None)
-
-    llm_call, model_id = build_eval_llm_call(a.model)
-    esc_llm = esc_model = None
-    if a.escalate_model:
-        esc_llm, esc_model = build_eval_llm_call(a.escalate_model)
-    print(f"▶ 重评 {a.case_id}  初评={model_id}"
-          + (f"  升级={esc_model} 带[{a.escalate_band[0]:g},{a.escalate_band[1]:g}]" if esc_model else "")
-          + f"  query={query!r}\n")
-
-    sources, cost = await evaluate_posts(
-        [src], "", llm_call, model_id, max_concurrent=1,
-        include_images=True, max_images=a.max_images, image_mode="url", query=query,
-        escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=tuple(a.escalate_band),
-    )
-    ev = sources[0]["llm_evaluation"]
-    overall = db.overall_score(ev)
-    pub = (src.get("post") or {}).get("publish_timestamp", "")
-    adopted = db.is_adopted(overall, ev, pub)
-
-    print("\n" + "=" * 60)
-    print(f"最终评估模型 = {sources[0].get('_escalated') or model_id}")
-    print(f"综合分(overall_score) = {overall}")
-    print(f"  · 和内容制作知识相关 = {((ev.get('相关性') or {}).get('和内容制作知识相关') or {}).get('得分')}")
-    print(f"  · 实现完整性/可复现门槛 = {db._repro_score(ev)}   (门槛 <4 → 不采纳)")
-    print(f"  · 意图可控性        = {db._fixed_dim_score(ev, '意图可控性')}  (暂只采分)")
-    print(f"采纳判定(is_adopted)  = {adopted}")
-    print(f"总成本 ≈ ${cost:.4f}")
-
-    if a.persist:
-        if not isinstance(ev, dict) or ev.get("_error"):
-            raise SystemExit("评估结果异常(_error),拒绝落库")
-        # 1) 备份旧 DB 行(overall_score/knowledge_type/llm_evaluation/publish_time)
-        old = next((p for p in db.fetch_posts(a.query_id, "process")
-                    if p["case_id"] == a.case_id), None)
-        if old is None:
-            raise SystemExit(f"DB 无此行,无法落库: query={a.query_id} case={a.case_id}")
-        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
-        bpath = (MW / "runs" / "search_process"
-                 / f"{a.query_id}.{a.case_id}.score_backup.{ts}.json")
-        bpath.write_text(json.dumps({
-            "query_id": a.query_id, "case_id": a.case_id,
-            "old_overall_score": old.get("overall_score"),
-            "old_knowledge_type": old.get("knowledge_type"),
-            "old_llm_evaluation": old.get("llm_evaluation"),
-            "old_adopted": old.get("adopted"),
-        }, ensure_ascii=False, indent=2), encoding="utf-8")
-        # 2) 写回 DB(派生列 overall_score/knowledge_type 由 update_post_eval 重算)
-        n = db.update_post_eval(a.query_id, a.case_id, ev, table="search_process")
-        # 3) 同步 runs json,保持后续重评输入一致
-        src["llm_evaluation"] = ev
-        _save(a.query_id, data)
-        print(f"\n💾 旧值已备份 → {bpath.name}")
-        print(f"✅ DB 已更新 {n} 行(overall={overall} 采纳={adopted})")
-
-
-if __name__ == "__main__":
-    asyncio.run(main())

+ 2 - 2
examples/mode_workflow/db.py

@@ -177,7 +177,7 @@ CREATE TABLE IF NOT EXISTS mode_tools (
 """
 """
 
 
 
 
-# 工序知识「已导入知识库」台账:防重复上传(import_process_knowledge.py 用)。
+# 工序知识「已导入知识库」台账:防重复上传(stages/import_process_knowledge.py 用)。
 # 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本:
 # 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本:
 # 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。
 # 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。
 # 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。
 # 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。
@@ -848,7 +848,7 @@ def update_post_eval(query_id, case_id, evaluation, table="search_process"):
         conn.close()
         conn.close()
 
 
 
 
-# ── 上传去重:知识库已导入台账(import_process_knowledge.py 用)────────────────
+# ── 上传去重:知识库已导入台账(stages/import_process_knowledge.py 用)────────────────
 
 
 def fetch_ingested_map(case_id):
 def fetch_ingested_map(case_id):
     """返回 {proc_index: version} —— 该 case 各工序已导入知识库的版本。空表示没传过。"""
     """返回 {proc_index: version} —— 该 case 各工序已导入知识库的版本。空表示没传过。"""

+ 0 - 113
examples/mode_workflow/eval_compare.py

@@ -1,113 +0,0 @@
-# -*- coding: utf-8 -*-
-"""一次性:用当前 eval_prompt_template.md(新 prompt)对单帖重评,与库里旧评估对比打分。
-用法: python eval_compare.py <query_id> <case_id>
-"""
-import argparse
-import asyncio
-import json
-import sys
-from pathlib import Path
-
-PROJECT_ROOT = Path(__file__).resolve().parents[2]   # …/Agent
-sys.path.insert(0, str(PROJECT_ROOT))
-from dotenv import load_dotenv
-load_dotenv()
-
-HERE = Path(__file__).resolve().parent
-sys.path.insert(0, str(HERE))
-import db
-
-from examples.process_pipeline.script.search_eval.search_and_evaluate import _attach_image_refs
-from examples.process_pipeline.script.llm_evaluate_sources import (
-    _evaluate_one, build_eval_llm_call, DEFAULT_EVAL_MODEL,
-)
-
-
-def _row_to_source(row):
-    return {
-        "case_id": row["case_id"], "platform": row["platform"],
-        "channel_content_id": row["channel_content_id"], "source_url": row["url"],
-        "post": {
-            "title": row["title"], "body_text": row["body"],
-            "images": row["images"] or [], "like_count": row["like_count"],
-            "publish_timestamp": row["publish_time"], "link": row["url"],
-        },
-    }
-
-
-def flatten_scores(blob, prefix=""):
-    """blob → {dotted_path: 得分}。只收叶子 {得分:...} 节点。"""
-    out = {}
-    if not isinstance(blob, dict):
-        return out
-    if "得分" in blob:
-        out[prefix.rstrip(".")] = blob.get("得分")
-        return out
-    for k, v in blob.items():
-        if isinstance(v, dict):
-            out.update(flatten_scores(v, f"{prefix}{k}."))
-    return out
-
-
-async def main():
-    ap = argparse.ArgumentParser()
-    ap.add_argument("query_id")
-    ap.add_argument("case_id")
-    ap.add_argument("--model", default=DEFAULT_EVAL_MODEL)
-    ap.add_argument("--max-images", type=int, default=4)
-    args = ap.parse_args()
-
-    row = db.fetch_post(args.query_id, args.case_id, table="search_process")
-    if not row:
-        print(f"❌ {args.query_id}/{args.case_id} 不在 search_process"); return 1
-    old_blob = row.get("llm_evaluation") or {}
-
-    src = _row_to_source(row)
-    await _attach_image_refs([src], args.max_images, 8, "url")
-    n_img = len(src.get("_image_data_urls") or [])
-    print(f"📄 {args.case_id} | {(row['title'] or '')[:40]} | 配图 {n_img} 张 | 模型 {args.model}")
-    print(f"🔍 检索词: {row['query_text']}\n")
-
-    eval_llm, model_id = build_eval_llm_call(args.model)
-    sem = asyncio.Semaphore(1)
-    new_blob, cost = await _evaluate_one(
-        src, "", eval_llm, model_id, sem,
-        image_urls=src.get("_image_data_urls"), query=row["query_text"])
-    if new_blob is None:
-        print("❌ 新评估失败(重试耗尽)"); return 1
-
-    old_f = flatten_scores(old_blob)
-    new_f = flatten_scores(new_blob)
-    keys = sorted(set(old_f) | set(new_f))
-    print(f"{'维度路径':<46} {'旧分':>6} {'新分':>6}   变化")
-    print("─" * 72)
-    for k in keys:
-        o, n = old_f.get(k), new_f.get(k)
-        mark = ""
-        try:
-            if o is not None and n is not None and float(o) != float(n):
-                mark = f"  {float(o):g}→{float(n):g}"
-        except (TypeError, ValueError):
-            pass
-        only = "" if (k in old_f and k in new_f) else ("  (旧无)" if k not in old_f else "  (新无)")
-        print(f"{k:<46} {str(o) if o is not None else '-':>6} {str(n) if n is not None else '-':>6}{mark}{only}")
-
-    print("─" * 72)
-    o_overall, n_overall = db.overall_score(old_blob), db.overall_score(new_blob)
-    o_adopt = db.is_adopted(o_overall, old_blob, row["publish_time"])
-    n_adopt = db.is_adopted(n_overall, new_blob, row["publish_time"])
-    print(f"{'overall_score':<46} {str(o_overall):>6} {str(n_overall):>6}")
-    print(f"{'知识类型':<46} {str(old_blob.get('知识类型')):>6} | {new_blob.get('知识类型')}")
-    print(f"{'是否采纳':<46} {str(o_adopt):>6} {str(n_adopt):>6}")
-    print(f"\n💲 本次重评成本 ${cost:.4f}")
-
-    # 落盘完整新 blob,便于细看理由
-    out = HERE / "runs" / f"eval_compare_{args.case_id}.json"
-    out.write_text(json.dumps({"old": old_blob, "new": new_blob}, ensure_ascii=False, indent=2),
-                   encoding="utf-8")
-    print(f"📝 完整新旧 blob(含理由): {out}")
-    return 0
-
-
-if __name__ == "__main__":
-    raise SystemExit(asyncio.run(main()))

+ 0 - 48
examples/mode_workflow/import_history.py

@@ -1,48 +0,0 @@
-# -*- coding: utf-8 -*-
-"""一次性导入:fixed_query_eval/runs_full/*/form_A.json → 搜索表。
-幂等(upsert),可反复执行。默认导入 search_process(工序方向)。
-
-用法:
-  python import_history.py
-  python import_history.py --runs-dir /path/to/runs_full --table search_tools
-"""
-import argparse
-import json
-import sys
-from pathlib import Path
-
-HERE = Path(__file__).resolve().parent
-sys.path.insert(0, str(HERE))
-import db
-
-DEFAULT_RUNS = (HERE.parent / "process_pipeline" / "script" / "search_eval"
-                / "fixed_query_eval" / "runs_full")
-
-
-def main():
-    p = argparse.ArgumentParser(description="历史搜索结果导入搜索表")
-    p.add_argument("--runs-dir", default=str(DEFAULT_RUNS))
-    p.add_argument("--table", default="search_process",
-                   choices=["search_process", "search_tools"])
-    args = p.parse_args()
-
-    runs = Path(args.runs_dir)
-    files = sorted(runs.glob("q*/form_A.json"))
-    if not files:
-        print(f"❌ {runs} 下没有 q*/form_A.json"); return 1
-
-    total = 0
-    for f in files:
-        data = json.loads(f.read_text(encoding="utf-8"))
-        qid = f.parent.name
-        results = data.get("results", [])
-        n = db.upsert_search_posts(qid, data.get("query") or data.get("original_q"),
-                                   results, table=args.table)
-        print(f"  {qid}: 文件 {len(results)} 条 → 入库 {n} 条")
-        total += n
-    print(f"✅ 共导入 {total} 条 → {args.table}")
-    return 0
-
-
-if __name__ == "__main__":
-    raise SystemExit(main())

+ 3 - 3
examples/mode_workflow/server.py

@@ -495,8 +495,8 @@ class Handler(BaseHTTPRequestHandler):
                     return self._json({"task_id": None, "skipped": skipped,
                     return self._json({"task_id": None, "skipped": skipped,
                                        "note": "所选帖子正在解构中,已跳过(防并发重复解构)"})
                                        "note": "所选帖子正在解构中,已跳过(防并发重复解构)"})
                 try:
                 try:
-                    script = ("pipeline/procedure_extract.py" if mode == "process"
-                              else "pipeline/tool_extract.py")
+                    script = ("stages/procedure_extract.py" if mode == "process"
+                              else "stages/tool_extract.py")
                     cmd = [sys.executable, script, "--query-id", qid,
                     cmd = [sys.executable, script, "--query-id", qid,
                            "--case-ids", ",".join(claimed)]
                            "--case-ids", ",".join(claimed)]
                     if payload.get("model"):
                     if payload.get("model"):
@@ -522,7 +522,7 @@ class Handler(BaseHTTPRequestHandler):
                 if not query:
                 if not query:
                     return self._err("缺 query")
                     return self._err("缺 query")
                 qid = payload.get("query_id") or _next_query_id()
                 qid = payload.get("query_id") or _next_query_id()
-                cmd = [sys.executable, "pipeline/search_eval.py",
+                cmd = [sys.executable, "stages/search_eval.py",
                        "--query-id", qid, "--query", query]
                        "--query-id", qid, "--query", query]
                 if payload.get("synonyms"):
                 if payload.get("synonyms"):
                     cmd += ["--synonyms", payload["synonyms"]]
                     cmd += ["--synonyms", payload["synonyms"]]

+ 10 - 7
examples/mode_workflow/import_process_knowledge.py → examples/mode_workflow/stages/import_process_knowledge.py

@@ -21,13 +21,13 @@
 采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
 采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
 
 
 用法:
 用法:
-    python import_process_knowledge.py                      # 真实导入(采纳工序全量)
-    python import_process_knowledge.py --dry-run            # 只组装+打印,不调接口
-    python import_process_knowledge.py --dry-run --verbose  # 打印完整 payload JSON
-    python import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
-    python import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
-    python import_process_knowledge.py --api-url http://... # 指定后端地址
-    python import_process_knowledge.py --delay 200          # 每次调用间隔 200ms
+    python stages/import_process_knowledge.py                      # 真实导入(采纳工序全量)
+    python stages/import_process_knowledge.py --dry-run            # 只组装+打印,不调接口
+    python stages/import_process_knowledge.py --dry-run --verbose  # 打印完整 payload JSON
+    python stages/import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+    python stages/import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
+    python stages/import_process_knowledge.py --api-url http://... # 指定后端地址
+    python stages/import_process_knowledge.py --delay 200          # 每次调用间隔 200ms
 """
 """
 
 
 import argparse
 import argparse
@@ -38,6 +38,9 @@ import time
 
 
 import requests
 import requests
 
 
+# 本脚本归档在 stages/ 子目录,补 mode_workflow/ 到 sys.path 以裸 import db
+from pathlib import Path
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
 import db
 import db
 
 
 # ── 配置(对齐参考实现)────────────────────────────────────────────────────────
 # ── 配置(对齐参考实现)────────────────────────────────────────────────────────

+ 2 - 2
examples/mode_workflow/pipeline/procedure_extract.py → examples/mode_workflow/stages/procedure_extract.py

@@ -5,8 +5,8 @@
 配图下载转 base64(绕防盗链)随文本一起发。结果按工序拆行写 mode_process。
 配图下载转 base64(绕防盗链)随文本一起发。结果按工序拆行写 mode_process。
 
 
 用法(一般由 server.py 起子进程调):
 用法(一般由 server.py 起子进程调):
-  python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc
-  python pipeline/procedure_extract.py --query-id q0000 --case-ids xhs_abc --model google/gemini-3.1-flash-lite
+  python stages/procedure_extract.py --query-id q0000 --case-ids xhs_abc
+  python stages/procedure_extract.py --query-id q0000 --case-ids xhs_abc --model google/gemini-3.1-flash-lite
 """
 """
 import argparse
 import argparse
 import asyncio
 import asyncio

+ 2 - 2
examples/mode_workflow/pipeline/search_eval.py → examples/mode_workflow/stages/search_eval.py

@@ -5,8 +5,8 @@
 引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。
 引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。
 
 
 用法(一般由 server.py 起子进程调):
 用法(一般由 server.py 起子进程调):
-  python pipeline/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做"
-  python pipeline/search_eval.py --query-id q0005 --query "GPT image2 评测" \
+  python stages/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做"
+  python stages/search_eval.py --query-id q0005 --query "GPT image2 评测" \
       --synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10
       --synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10
 """
 """
 import argparse
 import argparse

+ 3 - 3
examples/mode_workflow/pipeline/tool_extract.py → examples/mode_workflow/stages/tool_extract.py

@@ -7,8 +7,8 @@
 - 写库:db.replace_tools(同版本幂等,跨版本保留);runs/mode_tools/ 留调试副本
 - 写库:db.replace_tools(同版本幂等,跨版本保留);runs/mode_tools/ 留调试副本
 
 
 用法(一般由 server.py 起子进程调):
 用法(一般由 server.py 起子进程调):
-  python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc,gzh_def
-  python pipeline/tool_extract.py --query-id q0000 --case-ids xhs_abc --model anthropic/claude-sonnet-4-6
+  python stages/tool_extract.py --query-id q0000 --case-ids xhs_abc,gzh_def
+  python stages/tool_extract.py --query-id q0000 --case-ids xhs_abc --model anthropic/claude-sonnet-4-6
 """
 """
 import argparse
 import argparse
 import asyncio
 import asyncio
@@ -28,7 +28,7 @@ from examples.process_pipeline.script.search_eval.search_and_evaluate import _at
 from examples.process_pipeline.script.llm_evaluate_sources import _format_post_for_eval, build_eval_llm_call
 from examples.process_pipeline.script.llm_evaluate_sources import _format_post_for_eval, build_eval_llm_call
 from examples.process_pipeline.script.llm_helper import call_llm_with_retry
 from examples.process_pipeline.script.llm_helper import call_llm_with_retry
 
 
-HERE = Path(__file__).resolve().parent          # pipeline/
+HERE = Path(__file__).resolve().parent          # stages/
 MW = HERE.parent                                 # mode_workflow/
 MW = HERE.parent                                 # mode_workflow/
 sys.path.insert(0, str(MW))
 sys.path.insert(0, str(MW))
 import db
 import db

+ 10 - 10
examples/mode_workflow/流程执行手册.md

@@ -20,7 +20,7 @@ cd /Users/max_liu/max_liu/company/Agent/examples/mode_workflow
 约定:
 约定:
 - **渠道代码**:`xhs` = 小红书,`gzh` = 公众号(还有 `sph` 视频号、`douyin`、`zhihu` 等,以支持搜索的为准)。
 - **渠道代码**:`xhs` = 小红书,`gzh` = 公众号(还有 `sph` 视频号、`douyin`、`zhihu` 等,以支持搜索的为准)。
 - **方向**:`工序`(怎么做/流程)写 `search_process` / `mode_process`;`工具`(测评/工具)写 `search_tools` / `mode_tools`。本手册走**工序**。
 - **方向**:`工序`(怎么做/流程)写 `search_process` / `mode_process`;`工具`(测评/工具)写 `search_tools` / `mode_tools`。本手册走**工序**。
-- 脚本基本都带 `-h`,记不清参数时 `python3 pipeline/xxx.py -h`。
+- 脚本基本都带 `-h`,记不清参数时 `python3 stages/xxx.py -h`。
 
 
 ---
 ---
 
 
@@ -38,7 +38,7 @@ python3 -c "import server; print(server._next_query_id())"
 ### 2) 跑搜索 + 评估
 ### 2) 跑搜索 + 评估
 
 
 ```bash
 ```bash
-python3 pipeline/search_eval.py \
+python3 stages/search_eval.py \
   --query-id q0020 \
   --query-id q0020 \
   --query "人物 姿势 精准控制 怎么做" \
   --query "人物 姿势 精准控制 怎么做" \
   --mode-type 工序 \
   --mode-type 工序 \
@@ -82,7 +82,7 @@ echo "$CIDS"
 ### 2) 跑工序解构
 ### 2) 跑工序解构
 
 
 ```bash
 ```bash
-python3 pipeline/procedure_extract.py \
+python3 stages/procedure_extract.py \
   --query-id q0020 \
   --query-id q0020 \
   --case-ids "$CIDS"
   --case-ids "$CIDS"
 ```
 ```
@@ -94,7 +94,7 @@ python3 pipeline/procedure_extract.py \
 
 
 > **解构去重(默认开)**:某 case 若**已真实解构过**(任意 query),不会再调大模型 —— 同 query 直接跳过,跨 query 用 `link_*` 复制补齐关联(成本 $0)。要换 prompt/模型重解构才加 `--force`。
 > **解构去重(默认开)**:某 case 若**已真实解构过**(任意 query),不会再调大模型 —— 同 query 直接跳过,跨 query 用 `link_*` 复制补齐关联(成本 $0)。要换 prompt/模型重解构才加 `--force`。
 
 
-> 工具方向同理,换脚本:`python3 pipeline/tool_extract.py --query-id q0020 --case-ids "$CIDS"`(需先有 `search_tools` 数据)。
+> 工具方向同理,换脚本:`python3 stages/tool_extract.py --query-id q0020 --case-ids "$CIDS"`(需先有 `search_tools` 数据)。
 
 
 ---
 ---
 
 
@@ -106,14 +106,14 @@ python3 pipeline/procedure_extract.py \
 ### 1) 先 dry-run 看条数(不真传)
 ### 1) 先 dry-run 看条数(不真传)
 
 
 ```bash
 ```bash
-python3 import_process_knowledge.py --query-id q0020 --dry-run
+python3 stages/import_process_knowledge.py --query-id q0020 --dry-run
 # 看「发现 N 个采纳 case … 合计导入 M」,确认范围对
 # 看「发现 N 个采纳 case … 合计导入 M」,确认范围对
 ```
 ```
 
 
 ### 2) 真实上传
 ### 2) 真实上传
 
 
 ```bash
 ```bash
-python3 import_process_knowledge.py --query-id q0020
+python3 stages/import_process_knowledge.py --query-id q0020
 ```
 ```
 
 
 要点:
 要点:
@@ -135,16 +135,16 @@ QID=q0021                                   # 用步骤1的 _next_query_id() 拿
 QUERY="你的检索词 怎么做"
 QUERY="你的检索词 怎么做"
 
 
 # 2) 搜索+评估
 # 2) 搜索+评估
-python3 pipeline/search_eval.py --query-id "$QID" --query "$QUERY" \
+python3 stages/search_eval.py --query-id "$QID" --query "$QUERY" \
   --mode-type 工序 --platforms xhs,gzh --max-count 20
   --mode-type 工序 --platforms xhs,gzh --max-count 20
 
 
 # 3) 取采纳 → 工序解构
 # 3) 取采纳 → 工序解构
 CIDS=$(python3 -c "import db; print(','.join(p['case_id'] for p in db.fetch_posts('$QID','process') if p.get('adopted')))")
 CIDS=$(python3 -c "import db; print(','.join(p['case_id'] for p in db.fetch_posts('$QID','process') if p.get('adopted')))")
-[ -n "$CIDS" ] && python3 pipeline/procedure_extract.py --query-id "$QID" --case-ids "$CIDS"
+[ -n "$CIDS" ] && python3 stages/procedure_extract.py --query-id "$QID" --case-ids "$CIDS"
 
 
 # 4) 上传(先 dry-run 再真传)
 # 4) 上传(先 dry-run 再真传)
-python3 import_process_knowledge.py --query-id "$QID" --dry-run
-python3 import_process_knowledge.py --query-id "$QID"
+python3 stages/import_process_knowledge.py --query-id "$QID" --dry-run
+python3 stages/import_process_knowledge.py --query-id "$QID"
 ```
 ```
 
 
 > 每步都是**前台阻塞**跑,跑完再跑下一步,日志直接打在终端。
 > 每步都是**前台阻塞**跑,跑完再跑下一步,日志直接打在终端。