Просмотр исходного кода

feat(mode_workflow): 添加工序归类全流程功能及配套资源

新增后端/api/categorize和/api/categorize_status接口,支持触发归类任务和查询已归类案例状态
新增数据库操作方法,实现归类后的工序数据读写与已归类案例校验
新增category_match.py脚本,完成工序分类匹配与结果回写的完整逻辑
更新前端页面,添加单帖/批量归类按钮,以及归类结果的标签展示
新增知识导入接口的官方文档与示例payload文件
刘文武 2 дней назад
Родитель
Сommit
13960efc23

+ 59 - 0
examples/mode_workflow/db.py

@@ -784,6 +784,65 @@ def update_process_steps_by_query(query_id, case_id, version, steps_in_order):
         conn.close()
         conn.close()
 
 
 
 
+def update_process_steps(case_id, version, steps_in_order):
+    """按工序顺序覆盖某 (case_id, version) 各行的 steps JSON 列(不限 query_id)。
+    与 fetch_process / fetch_extract 同口径(按 case 的某版本),保证归类回写的版本
+    与前端 /api/extract 展示的版本一致(否则 link_ 复制帖会写错版本、前端看不到)。
+    steps_in_order 须与 fetch_process(case_id, version).procedures 同序(按 id 升序)。
+    行数与工序数不符则报错回滚。返回更新行数。"""
+    conn = _conn()
+    try:
+        conn.begin()
+        with conn.cursor() as cur:
+            cur.execute("""SELECT id FROM mode_process WHERE case_id=%s AND version=%s
+                           ORDER BY id""", (case_id, version))
+            ids = [r["id"] for r in cur.fetchall()]
+            if len(ids) != len(steps_in_order):
+                raise ValueError(f"行数({len(ids)})与工序数({len(steps_in_order)})不一致")
+            n = 0
+            for row_id, steps in zip(ids, steps_in_order):
+                cur.execute("UPDATE mode_process SET steps=%s WHERE id=%s", (_j(steps), row_id))
+                n += cur.rowcount
+        conn.commit()
+        return n
+    except Exception:
+        conn.rollback()
+        raise
+    finally:
+        conn.close()
+
+
+def fetch_categorized_cases(case_ids, mode="process"):
+    """返回 case_ids 中「已归类」的子集:该 case 最新真实版(link_ 排后)的 steps 已含
+    substanceMatch 字段(归类跑过的工序行一定带此 key)。与归类回写/前端展示同口径。
+    供前端判断「是否已全部归类 → 提示重新归类」。仅工序方向有意义(mode_process)。"""
+    if not case_ids:
+        return set()
+    table = _mode_table(mode)
+    ph = ",".join(["%s"] * len(case_ids))
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            # 每 case 按「真实版优先、id 降序」排,取首行(最新真实版的代表工序行)的判断结果;
+            # steps LIKE 在库端算,只回传 0/1,不拉 steps 大字段。
+            cur.execute(f"""SELECT case_id, (steps LIKE %s) AS cat
+                            FROM {table} WHERE case_id IN ({ph})
+                            ORDER BY case_id, (LEFT(version,5)='link_') ASC, id DESC""",
+                        ['%substanceMatch%'] + list(case_ids))
+            rows = cur.fetchall()
+    finally:
+        conn.close()
+    seen, out = set(), set()
+    for r in rows:
+        cid = r["case_id"]
+        if cid in seen:        # 每 case 只看首行(最新真实版)
+            continue
+        seen.add(cid)
+        if r["cat"]:
+            out.add(cid)
+    return out
+
+
 def _proc_payload(case_id, version, rows):
 def _proc_payload(case_id, version, rows):
     """mode_process 行集 → {case_id, version, …, procedures:[...]}。无行返回 None。"""
     """mode_process 行集 → {case_id, version, …, procedures:[...]}。无行返回 None。"""
     if not rows:
     if not rows:

+ 78 - 0
examples/mode_workflow/docs/knowledge_ingest_api.md

@@ -0,0 +1,78 @@
+# 知识导入接口(knowledge ingest)
+
+> 来源:`stages/import_process_knowledge.py`(`build_payload` + `ingest_one`)
+> 用途:把 DB 中「已采纳」的工序解构(mode_process)逐条写入知识库。
+
+## 基本信息
+
+| 项 | 值 |
+|----|----|
+| Method | `POST` |
+| URL | `{api_url}/api/v1/knowledge/ingest` |
+| 默认 `api_url` | `http://47.236.83.130:8001` |
+| 完整默认地址 | `http://47.236.83.130:8001/api/v1/knowledge/ingest` |
+| Content-Type | `application/json` |
+| 超时 | 30s |
+| 成功状态码 | `201 Created` |
+
+**一条 procedure(工序)= 一次请求 = 一条知识。** 同一个 case 下有 N 个工序就调用 N 次。
+
+## 请求头
+
+```
+Content-Type: application/json
+```
+
+## 请求体字段
+
+| 字段 | 类型 | 必填 | 说明 |
+|------|------|------|------|
+| `source` | object | 是 | 来源帖子信息 |
+| `source.id` | string | 是 | 来源主键,直接取 DB 的 `case_id` |
+| `source.source_type` | string | 是 | 固定 `"post"` |
+| `source.title` | string \| null | 是 | 来源标题,空则 `null` |
+| `source.author` | string \| null | 是 | 来源作者,空则 `null` |
+| `source.source_metadata` | object | 是 | 来源附加元信息(见下) |
+| `source.source_metadata.platform` | string | 是 | 平台,空则 `""` |
+| `source.source_metadata.date` | string | 是 | 发布日期,空则 `""` |
+| `source.source_metadata.url` | string \| null | 是 | 原文链接,空则 `null` |
+| `source.source_metadata.excerpt` | string | 是 | 摘要,截断到 500 字符 |
+| `source.source_metadata.procedure_id` | string | 是 | 工序 id,空则 `""` |
+| `source.source_metadata.procedure_name` | string | 是 | 工序名,空则 `""` |
+| `title` | string | 是 | 知识标题,截断到 512 字符。取工序名;为空回退 `"来源标题 — 工序N"`,再空则 `"工序N"` |
+| `content` | string | 是 | 整个 procedure 对象序列化后的 JSON 字符串(`ensure_ascii=False`) |
+| `dim_attributes` | string[] | 是 | 固定 `["how工序"]` |
+| `dim_creations` | string[] | 是 | 固定 `["制作"]` |
+| `scopes` | object[] | 否 | 作用域;为空时**不带该字段** |
+| `scopes[].scope_type` | string | - | `"substance"` 或 `"form"` |
+| `scopes[].value` | string | - | 由各步骤 `substance`/`form` 按顿号拆分去重 |
+| `custom_ext` | object[] | 否 | 自定义扩展;为空时**不带该字段** |
+| `custom_ext[].key` | string | - | `"作用"`(effect)/ `"动作"`(action)/ `"工具"`(via,`human` 除外) |
+| `custom_ext[].type` | string | - | 固定 `"str"` |
+| `custom_ext[].value` | string | - | 对应步骤字段按顿号拆分去重 |
+
+> 注:`scopes` 与 `custom_ext` 仅在非空时才会出现在请求体里(脚本里条件添加)。
+
+## 成功响应(201)
+
+```json
+{
+  "knowledge_id": "<生成的知识ID>"
+}
+```
+
+## 失败响应
+
+| 情况 | 处理 |
+|------|------|
+| 非 201 | 读取响应 JSON 的 `detail` 字段(无则取 `text[:300]`)作为错误信息 |
+| 超时(30s) | 记为失败,信息 `"超时(30s)"` |
+| 网络异常 | 记为失败,信息为异常字符串 |
+
+## curl 示例
+
+```bash
+curl -X POST http://47.236.83.130:8001/api/v1/knowledge/ingest \
+  -H "Content-Type: application/json" \
+  -d @knowledge_ingest_payload.json
+```

+ 34 - 0
examples/mode_workflow/docs/knowledge_ingest_payload.json

@@ -0,0 +1,34 @@
+{
+  "source": {
+    "id": "xhs_a1b2c3",
+    "source_type": "post",
+    "title": "用 AI 生成二次元风格头像的完整流程",
+    "author": "设计师小王",
+    "source_metadata": {
+      "platform": "xiaohongshu",
+      "date": "2026-03-15",
+      "url": "https://www.xiaohongshu.com/explore/xhs_a1b2c3",
+      "excerpt": "本文介绍如何用 AI 工具把真人照片转成二次元风格头像,包含线稿、上色、细节调整三步……",
+      "procedure_id": "proc_001",
+      "procedure_name": "二次元头像生成"
+    }
+  },
+  "title": "二次元头像生成",
+  "content": "{\"id\": \"proc_001\", \"name\": \"二次元头像生成\", \"steps\": [{\"substance\": \"真人照片\", \"form\": \"二次元线稿\", \"effect\": \"风格转换\", \"action\": \"上传并生成\", \"via\": \"Midjourney\"}, {\"substance\": \"二次元线稿\", \"form\": \"上色稿\", \"effect\": \"着色\", \"action\": \"自动上色\", \"via\": \"Photoshop\"}]}",
+  "dim_attributes": ["how工序"],
+  "dim_creations": ["制作"],
+  "scopes": [
+    { "scope_type": "substance", "value": "真人照片" },
+    { "scope_type": "substance", "value": "二次元线稿" },
+    { "scope_type": "form", "value": "二次元线稿" },
+    { "scope_type": "form", "value": "上色稿" }
+  ],
+  "custom_ext": [
+    { "key": "作用", "type": "str", "value": "风格转换" },
+    { "key": "作用", "type": "str", "value": "着色" },
+    { "key": "动作", "type": "str", "value": "上传并生成" },
+    { "key": "动作", "type": "str", "value": "自动上色" },
+    { "key": "工具", "type": "str", "value": "Midjourney" },
+    { "key": "工具", "type": "str", "value": "Photoshop" }
+  ]
+}

+ 120 - 21
examples/mode_workflow/index.html

@@ -550,6 +550,22 @@
         color: var(--ink-faint);
         color: var(--ink-faint);
         font-weight: 500;
         font-weight: 500;
       }
       }
+      /* 帖子列头:标题一行、操作按钮另起一行(按钮不和「帖子 N/M」挤一行) */
+      .col-head.posts-head {
+        flex-direction: column;
+        align-items: stretch;
+        gap: 8px;
+      }
+      .posts-head .ph-title {
+        display: flex;
+        align-items: center;
+        gap: 8px;
+      }
+      .posts-head .ph-actions {
+        display: flex;
+        flex-wrap: wrap;
+        gap: 8px;
+      }
 
 
       .qlist {
       .qlist {
         flex: 1;
         flex: 1;
@@ -963,6 +979,25 @@
         font-size: 11.5px;
         font-size: 11.5px;
         word-break: break-all;
         word-break: break-all;
       }
       }
+      /* 归类命中 tag(实质/形式单元格内,原值下方)──绿色胶囊,与原文本区分 */
+      .steps .match-tags {
+        margin-top: 5px;
+        display: flex;
+        flex-wrap: wrap;
+        gap: 3px;
+      }
+      .steps .mtag {
+        display: inline-block;
+        padding: 1px 7px;
+        border-radius: 10px;
+        background: #e3f3e8;
+        color: #2e6b45;
+        border: 1px solid #bfe3cb;
+        font-size: 10.5px;
+        font-weight: 600;
+        line-height: 1.6;
+        white-space: nowrap;
+      }
       .inf {
       .inf {
         background: var(--infer) !important;
         background: var(--infer) !important;
         position: relative;
         position: relative;
@@ -2202,23 +2237,13 @@
           </div>
           </div>
         </div>
         </div>
         <div class="card">
         <div class="card">
-          <div class="col-head">
-            帖子
-            <span style="display: flex; gap: 8px; align-items: center">
-              <span
-                class="n"
-                id="p-count"
-              ></span>
-              <button
-                class="btn sm"
-                id="btn-batch"
-                disabled
-                hidden
-              >
-                批量解构
-              </button>
+          <div class="col-head posts-head">
+            <div class="ph-title">帖子 <span class="n" id="p-count"></span></div>
+            <div class="ph-actions">
+              <button class="btn sm" id="btn-batch" disabled hidden>批量解构</button>
               <button class="btn sm" id="btn-extract-adopted" hidden>解构全部已采纳</button>
               <button class="btn sm" id="btn-extract-adopted" hidden>解构全部已采纳</button>
-            </span>
+              <button class="btn sm" id="btn-cat-adopted" hidden>归类全部已采纳</button>
+            </div>
           </div>
           </div>
           <div
           <div
             class="plat-tabs"
             class="plat-tabs"
@@ -2972,14 +2997,44 @@
         const ea = $("#btn-extract-adopted");
         const ea = $("#btn-extract-adopted");
         ea.hidden = !adoptedN;
         ea.hidden = !adoptedN;
         ea.textContent = `解构全部已采纳(${adoptedN})`;
         ea.textContent = `解构全部已采纳(${adoptedN})`;
+        // 归类全部已采纳:仅工序方向 + 有「已采纳且已解构」的帖才显示(只有已解构能归类)
+        const catN = (state.posts || []).filter((p) => p.adopted && p.has_process).length;
+        const ca = $("#btn-cat-adopted");
+        ca.hidden = state.mode !== "process" || !catN;
+        ca.textContent = `归类全部已采纳(${catN})`;
       }
       }
       $("#btn-batch").onclick = () => state.selected.size && startExtract([...state.selected]);
       $("#btn-batch").onclick = () => state.selected.size && startExtract([...state.selected]);
       $("#btn-extract-adopted").onclick = async () => {
       $("#btn-extract-adopted").onclick = async () => {
-        const cids = (state.posts || []).filter((p) => p.adopted).map((p) => p.case_id);
+        const adopted = (state.posts || []).filter((p) => p.adopted);
+        const cids = adopted.map((p) => p.case_id);
         if (!cids.length) return toast("当前 query 下没有已采纳的帖子", "warn");
         if (!cids.length) return toast("当前 query 下没有已采纳的帖子", "warn");
         const dir = state.mode === "process" ? "工序" : "工具";
         const dir = state.mode === "process" ? "工序" : "工具";
-        if (!(await uiConfirm(`对该 query 下全部 ${cids.length} 个已采纳帖做${dir}解构?\n已解构过的会自动跳过/复用,不重复花钱。`))) return;
-        startExtract(cids);   // 复用:认领锁 + 解构去重 + showTask 轮询
+        const doneKey = state.mode === "process" ? "has_process" : "has_tools";
+        const undoneN = adopted.filter((p) => !p[doneKey]).length;
+        const allDone = undoneN === 0;   // 已采纳帖全部已解构
+        const msg = allDone
+          ? `这 ${cids.length} 个已采纳帖都已${dir}解构。是否重新解构?\n重新解构会消耗 LLM、生成新版本并覆盖当前展示。`
+          : `对该 query 下全部 ${cids.length} 个已采纳帖做${dir}解构(其中 ${undoneN} 个未解构)。\n已解构过的会自动跳过/复用,不重复花钱。`;
+        if (!(await uiConfirm(msg))) return;
+        // 全部已解构时,确认重做须 force(否则按 case 去重会全部跳过、白点一次)
+        startExtract(cids, allDone ? { force: true } : {});
+      };
+      $("#btn-cat-adopted").onclick = async () => {
+        // 只归类「已采纳且已解构」的帖(只有已解构才有 steps 可归类)
+        const cids = (state.posts || []).filter((p) => p.adopted && p.has_process).map((p) => p.case_id);
+        if (!cids.length) return toast("当前 query 下没有「已采纳且已解构」的帖子", "warn");
+        // 查这些帖已归类的数量,决定提示文案(已归类口径:steps 含 substanceMatch)
+        let catN = 0;
+        try {
+          const r = await api(`/api/categorize_status?mode=${state.mode}&case_ids=${encodeURIComponent(cids.join(","))}`);
+          catN = (r.categorized || []).length;
+        } catch (e) { /* 查不到归类状态不阻断,按未归类提示 */ }
+        const allCat = catN >= cids.length;   // 全部已归类
+        const msg = allCat
+          ? `这 ${cids.length} 个帖的工序都已归类。是否重新归类?\n重新归类会用最新分类结果覆盖原有实质/形式 tag。`
+          : `对该 query 下 ${cids.length} 个已采纳且已解构的帖做工序归类${catN ? `(其中 ${catN} 个已归类,将覆盖)` : ""}?\n将把命中的分类回写进各工序的实质/形式。`;
+        if (!(await uiConfirm(msg))) return;
+        startCategorize(cids);   // 归类即覆盖写,无需 force
       };
       };
 
 
       /* ════ 帖子详情弹层 ════ */
       /* ════ 帖子详情弹层 ════ */
@@ -3359,6 +3414,7 @@
     <span class="spacer"></span>
     <span class="spacer"></span>
     ${versions.length ? `<select id="ver-sel">${opts}</select>` : ""}
     ${versions.length ? `<select id="ver-sel">${opts}</select>` : ""}
     <button class="btn sm primary" onclick="openReextractDialog()">${missing ? "提取" : "♻ 重新生成"}</button>
     <button class="btn sm primary" onclick="openReextractDialog()">${missing ? "提取" : "♻ 重新生成"}</button>
+    ${isProc && !missing ? `<button class="btn sm" onclick="categorizeCurrent()" title="对本帖工序做分类归纳,把命中分类回写到实质/形式">🏷 归纳</button>` : ""}
     <button class="btn sm" onclick="showTaskPanel()" title="重新打开任务日志面板">📋 操作日志</button>`;
     <button class="btn sm" onclick="showTaskPanel()" title="重新打开任务日志面板">📋 操作日志</button>`;
         const vs = $("#ver-sel");
         const vs = $("#ver-sel");
         if (vs)
         if (vs)
@@ -3487,8 +3543,8 @@
               rows += `<td rowspan="${n}" class="sid">${esc(s.id || "")}</td>
               rows += `<td rowspan="${n}" class="sid">${esc(s.id || "")}</td>
           <td rowspan="${n}"><div class="intent-text">${renderIntent(s.intent || s.directive || "")}</div></td>
           <td rowspan="${n}"><div class="intent-text">${renderIntent(s.intent || s.directive || "")}</div></td>
           <td rowspan="${n}">${s.effect ? `<span class="pill navy">${esc(s.effect)}</span>` : ""}</td>
           <td rowspan="${n}">${s.effect ? `<span class="pill navy">${esc(s.effect)}</span>` : ""}</td>
-          <td rowspan="${n}">${esc(fmtSF(s.substance))}</td>
-          <td rowspan="${n}">${esc(fmtSF(s.form))}</td>`;
+          <td rowspan="${n}">${esc(fmtSF(s.substance))}${matchTag(s.substanceMatch)}</td>
+          <td rowspan="${n}">${esc(fmtSF(s.form))}${matchTag(s.formMatch)}</td>`;
             }
             }
             rows += ioCell(ins[i], "in");
             rows += ioCell(ins[i], "in");
             if (i === 0) {
             if (i === 0) {
@@ -3520,6 +3576,17 @@
       function fmtSF(v) {
       function fmtSF(v) {
         return v == null ? "" : Array.isArray(v) ? v.join("、") : v;
         return v == null ? "" : Array.isArray(v) ? v.join("、") : v;
       }
       }
+      /* 归类命中(substanceMatch/formMatch):多个用「、」拆,逐个出绿色 tag 放原值下方 */
+      function matchTag(v) {
+        if (v == null || v === "") return "";
+        const tags = String(v)
+          .split("、")
+          .map((x) => x.trim())
+          .filter(Boolean)
+          .map((x) => `<span class="mtag">${esc(x)}</span>`)
+          .join("");
+        return tags ? `<div class="match-tags">${tags}</div>` : "";
+      }
       function ioCell(x, kind) {
       function ioCell(x, kind) {
         const cls = kind === "in" ? "c-in" : "c-out";
         const cls = kind === "in" ? "c-in" : "c-out";
         if (!x) return `<td class="${cls}"></td><td class="${cls}"></td><td class="${cls}"></td>`;
         if (!x) return `<td class="${cls}"></td><td class="${cls}"></td><td class="${cls}"></td>`;
@@ -3708,12 +3775,44 @@
               renderPosts();
               renderPosts();
               await loadExtract();
               await loadExtract();
             }
             }
+            // 需求3:工序解构完成 → 自动归类(工具方向无 substance/form,不归类)
+            if (isProc) startCategorize(caseIds, { auto: true });
           });
           });
         } catch (e) {
         } catch (e) {
           toast("任务启动失败:" + (e.body?.error || e.status), "error");
           toast("任务启动失败:" + (e.body?.error || e.status), "error");
         }
         }
       }
       }
 
 
+      /* ════ 工序归类(category-match)════
+         起 /api/categorize 子进程任务,完成后清缓存并刷新当前帖解构,实质/形式单元格出 tag。
+         auto=true 为「工序解构完成后自动归类」,文案略不同。 */
+      async function startCategorize(caseIds, opts = {}) {
+        if (state.mode !== "process") return; // 仅工序方向有 substance/form 可归类
+        if (!state.queryId || !caseIds.length) return;
+        try {
+          const r = await api("/api/categorize", {
+            method: "POST",
+            body: JSON.stringify({ query_id: state.queryId, case_ids: caseIds }),
+          });
+          if (!r.task_id) return toast(r.note || "无可归类帖", "info");
+          showTask(`工序归类 · ${caseIds.length} 帖${opts.auto ? "(自动)" : ""}`, r.task_id, async () => {
+            caseIds.forEach(invalidateExtractCache); // 归类改了 steps,清缓存才能拿到新 match
+            // 当前正看的帖在本批里 → 重载解构,实质/形式立即出 tag
+            if (caseIds.includes(state.caseId)) {
+              state.version = null;
+              await loadExtract();
+            }
+            toast(opts.auto ? "解构完成,已自动归类" : "归类完成", "success");
+          });
+        } catch (e) {
+          toast("归类启动失败:" + (e.body?.error || e.status), "error");
+        }
+      }
+      function categorizeCurrent() {
+        if (!state.caseId) return toast("请先选择帖子", "warn");
+        startCategorize([state.caseId]);
+      }
+
       /* ════ 任务面板(✕ 只隐藏;「操作日志」按钮可随时唤回)════ */
       /* ════ 任务面板(✕ 只隐藏;「操作日志」按钮可随时唤回)════ */
       let pollTimer = null,
       let pollTimer = null,
         hasTask = false;
         hasTask = false;

+ 17 - 0
examples/mode_workflow/server.py

@@ -664,6 +664,11 @@ class Handler(BaseHTTPRequestHandler):
                             "page_size": page_size, "posts": posts})
                             "page_size": page_size, "posts": posts})
             elif u.path == "/api/search_progress":
             elif u.path == "/api/search_progress":
                 self._json(_search_progress(qs.get("mode", "process")))
                 self._json(_search_progress(qs.get("mode", "process")))
+            elif u.path == "/api/categorize_status":
+                # 给定 case_ids(逗号分隔),返回其中「已归类」的子集,供前端判断是否提示「重新归类」
+                cids = [c for c in qs.get("case_ids", "").split(",") if c.strip()]
+                done = db.fetch_categorized_cases(cids, qs.get("mode", "process")) if cids else set()
+                self._json({"categorized": sorted(done), "total": len(cids)})
             elif u.path == "/api/post":
             elif u.path == "/api/post":
                 # 单帖详情(正文/配图/评估全量):列表已瘦身,详情按需取;带 ETag/304
                 # 单帖详情(正文/配图/评估全量):列表已瘦身,详情按需取;带 ETag/304
                 r = db.fetch_post(qs.get("query_id", ""), qs.get("case_id", ""),
                 r = db.fetch_post(qs.get("query_id", ""), qs.get("case_id", ""),
@@ -768,6 +773,18 @@ class Handler(BaseHTTPRequestHandler):
                     _release_cases(mode, claimed)   # 起进程失败也要释放认领,避免卡住
                     _release_cases(mode, claimed)   # 起进程失败也要释放认领,避免卡住
                     raise
                     raise
                 self._json({"task_id": task_id, "skipped": skipped})
                 self._json({"task_id": task_id, "skipped": skipped})
+            elif u.path == "/api/categorize":
+                # 工序归类:对一个 query 下若干已解构 case 跑 category_match(--run 子进程),
+                # 把命中的分类回写进 mode_process 的 steps(substanceMatch/formMatch)。
+                # 仅工序方向有意义(工具方向无 steps.substance/form)。
+                qid = payload.get("query_id")
+                cids = payload.get("case_ids") or []
+                if not qid or not cids:
+                    return self._err("缺 query_id / case_ids")
+                uniq = list(dict.fromkeys(cids))
+                cmd = [sys.executable, "stages/category_match.py", "--run",
+                       "--query-id", qid, "--case-ids", ",".join(uniq)]
+                self._json({"task_id": _spawn_task("cat", cmd)})
             elif u.path == "/api/query_score":
             elif u.path == "/api/query_score":
                 sel = {
                 sel = {
                     "tool_type": payload.get("tool_type", ""),
                     "tool_type": payload.get("tool_type", ""),

+ 116 - 31
examples/mode_workflow/stages/category_match.py

@@ -31,6 +31,9 @@ formMatch;无命中为 None。
     python stages/category_match.py            # 默认 0.0.0.0:8780
     python stages/category_match.py            # 默认 0.0.0.0:8780
     python stages/category_match.py 8090        # 指定端口
     python stages/category_match.py 8090        # 指定端口
     CATEGORY_MATCH_API=http://host:8300 python stages/category_match.py
     CATEGORY_MATCH_API=http://host:8300 python stages/category_match.py
+
+可调环境变量(前缀均 CATEGORY_MATCH_):TOP_K / MIN_SCORE / TIMEOUT(默认 60s) /
+    CONCURRENCY(默认 8) / RETRIES(默认 3,仅超时/网络/5xx 重试,指数退避) / BACKOFF(默认 1s)。
 """
 """
 from __future__ import annotations
 from __future__ import annotations
 
 
@@ -54,8 +57,11 @@ MATCH_ENDPOINT = "/api/v1/category-match"
 TOP_K = int(os.environ.get("CATEGORY_MATCH_TOP_K", "10"))          # 后续再调
 TOP_K = int(os.environ.get("CATEGORY_MATCH_TOP_K", "10"))          # 后续再调
 MIN_SCORE = float(os.environ.get("CATEGORY_MATCH_MIN_SCORE", "0.8"))  # 后续再调
 MIN_SCORE = float(os.environ.get("CATEGORY_MATCH_MIN_SCORE", "0.8"))  # 后续再调
 RECORD = True
 RECORD = True
-API_TIMEOUT = float(os.environ.get("CATEGORY_MATCH_TIMEOUT", "30"))
+API_TIMEOUT = float(os.environ.get("CATEGORY_MATCH_TIMEOUT", "60"))   # 下游单帖可达 30s+,默认放宽到 60
 BATCH_CONCURRENCY = int(os.environ.get("CATEGORY_MATCH_CONCURRENCY", "8"))  # 批量并发上限
 BATCH_CONCURRENCY = int(os.environ.get("CATEGORY_MATCH_CONCURRENCY", "8"))  # 批量并发上限
+# 下游调用失败重试:仅对「可重试」错误(超时/网络/5xx),指数退避;业务错误(4xx)立即失败不重试
+MAX_RETRIES = int(os.environ.get("CATEGORY_MATCH_RETRIES", "3"))      # 额外重试次数(总尝试 = 1 + N)
+RETRY_BACKOFF = float(os.environ.get("CATEGORY_MATCH_BACKOFF", "1.0"))  # 退避基数(秒):第 k 次重试前睡 BACKOFF*2^(k-1)
 
 
 # 维度 → source_type(外部接口约定的中文标签)
 # 维度 → source_type(外部接口约定的中文标签)
 ST_SUBSTANCE = "实质"
 ST_SUBSTANCE = "实质"
@@ -174,9 +180,29 @@ def enrich_steps(procedures: List[dict], resp: dict) -> List[dict]:
     return procedures
     return procedures
 
 
 
 
-# ── 调外部接口 ────────────────────────────────────────────────────────────────────
+# ── 调外部接口(带重试)──────────────────────────────────────────────────────────────
+class _RetryExhausted(Exception):
+    """重试耗尽(或遇不可重试错误)时抛出,携带 last_exc 与实际尝试次数 attempts,
+    使失败结果也能报告 attempts(否则 except 分支拿不到次数)。"""
+    def __init__(self, last_exc: Exception, attempts: int):
+        self.last_exc = last_exc
+        self.attempts = attempts
+        super().__init__(str(last_exc))
+
+
+def _is_retryable(exc: Exception) -> bool:
+    """判定异常是否值得重试:超时/连接/读写等传输错误,或 5xx 服务端错误。
+    4xx(如 400 请求格式错)是确定性失败,重试无意义 → 不重试。"""
+    if isinstance(exc, httpx.HTTPStatusError):
+        return exc.response.status_code >= 500
+    return isinstance(exc, httpx.TransportError)   # 含 ReadTimeout/ConnectError 等
+
+
 async def _post_category_match(client: httpx.AsyncClient, post_id: str, knowledge_id: str,
 async def _post_category_match(client: httpx.AsyncClient, post_id: str, knowledge_id: str,
-                               items: List[dict]) -> dict:
+                               items: List[dict]) -> tuple:
+    """POST 到下游 category-match。可重试错误(超时/网络/5xx)按指数退避重试 MAX_RETRIES 次;
+    重试耗尽或遇不可重试错误时抛出最后一次异常(由 process_one 兜成 ok:False)。
+    返回 (resp_json, attempts):attempts=实际尝试次数(1=一次成功,>1=重试过)。"""
     body = {
     body = {
         "top_k": TOP_K,
         "top_k": TOP_K,
         "min_score": MIN_SCORE,
         "min_score": MIN_SCORE,
@@ -185,19 +211,31 @@ async def _post_category_match(client: httpx.AsyncClient, post_id: str, knowledg
         "knowledge_id": knowledge_id,
         "knowledge_id": knowledge_id,
         "items": items,
         "items": items,
     }
     }
-    r = await client.post(CATEGORY_MATCH_API + MATCH_ENDPOINT, json=body)
-    r.raise_for_status()
-    return r.json()
+    last_exc: Optional[Exception] = None
+    for attempt in range(MAX_RETRIES + 1):       # 第 0 次为首发,其后为重试
+        try:
+            r = await client.post(CATEGORY_MATCH_API + MATCH_ENDPOINT, json=body)
+            r.raise_for_status()
+            return r.json(), attempt + 1
+        except httpx.HTTPError as e:
+            last_exc = e
+            if attempt < MAX_RETRIES and _is_retryable(e):
+                await asyncio.sleep(RETRY_BACKOFF * (2 ** attempt))   # 1,2,4,… 秒
+                continue
+            raise _RetryExhausted(e, attempt + 1) from e   # 携带尝试次数,供失败结果报告
+    raise _RetryExhausted(last_exc, MAX_RETRIES + 1)       # 理论不可达
 
 
 
 
 # ── 单帖全流程(取数 → 调接口 → 回写 → 落库)─────────────────────────────────────────
 # ── 单帖全流程(取数 → 调接口 → 回写 → 落库)─────────────────────────────────────────
 async def process_one(client: httpx.AsyncClient, query_id: str, case_id: str,
 async def process_one(client: httpx.AsyncClient, query_id: str, case_id: str,
                       *, include_response: bool = False) -> dict:
                       *, include_response: bool = False) -> dict:
-    """对一帖 (query_id=post_id, case_id=knowledge_id) 跑完整流程。绝不抛异常,
-    错误以 {"ok": False, "error": ...} 返回,便于批量聚合。"""
+    """对一帖跑完整流程:query_id=post_id(给下游记录),case_id=knowledge_id。绝不抛异常,
+    错误以 {"ok": False, "error": ...} 返回,便于批量聚合。
+    取数/回写按 case 的「最新真实版」(fetch_process,与前端 /api/extract 同口径),
+    保证回写的版本即前端展示的版本——否则 link_ 复制帖会写错版本、前端看不到 tag。"""
     base = {"query_id": query_id, "case_id": case_id}
     base = {"query_id": query_id, "case_id": case_id}
     try:
     try:
-        payload = await asyncio.to_thread(db.fetch_process_by_query, query_id, case_id)
+        payload = await asyncio.to_thread(db.fetch_process, case_id)   # 最新真实版,对齐前端展示
         if not payload:
         if not payload:
             return {**base, "ok": False, "error": "无工序解构记录"}
             return {**base, "ok": False, "error": "无工序解构记录"}
         procedures = payload["procedures"]
         procedures = payload["procedures"]
@@ -208,28 +246,50 @@ async def process_one(client: httpx.AsyncClient, query_id: str, case_id: str,
             return {**base, "ok": True, "version": version, "items_sent": 0,
             return {**base, "ok": True, "version": version, "items_sent": 0,
                     "rows_updated": 0, "note": "无可匹配维度词,跳过接口调用"}
                     "rows_updated": 0, "note": "无可匹配维度词,跳过接口调用"}
 
 
-        resp = await _post_category_match(client, query_id, case_id, items)
+        resp, attempts = await _post_category_match(client, query_id, case_id, items)
         if not resp.get("success"):
         if not resp.get("success"):
-            return {**base, "ok": False, "version": version,
+            return {**base, "ok": False, "version": version, "attempts": attempts,
                     "error": "category-match 返回 success=false", "response": resp}
                     "error": "category-match 返回 success=false", "response": resp}
 
 
         enrich_steps(procedures, resp)
         enrich_steps(procedures, resp)
         rows_updated = await asyncio.to_thread(
         rows_updated = await asyncio.to_thread(
-            db.update_process_steps_by_query, query_id, case_id, version,
+            db.update_process_steps, case_id, version,
             [p.get("steps") or [] for p in procedures])
             [p.get("steps") or [] for p in procedures])
 
 
         out = {**base, "ok": True, "version": version, "items_sent": len(items),
         out = {**base, "ok": True, "version": version, "items_sent": len(items),
-               "rows_updated": rows_updated, "recorded": resp.get("recorded")}
+               "rows_updated": rows_updated, "recorded": resp.get("recorded"),
+               "attempts": attempts}
         if include_response:
         if include_response:
             out["response"] = resp
             out["response"] = resp
             out["procedures"] = procedures
             out["procedures"] = procedures
         return out
         return out
-    except httpx.HTTPError as e:
-        return {**base, "ok": False, "error": f"调用 category-match 失败: {type(e).__name__}: {e}"}
+    except _RetryExhausted as e:
+        return {**base, "ok": False, "attempts": e.attempts,
+                "error": f"调用 category-match 失败(尝试 {e.attempts} 次): "
+                         f"{type(e.last_exc).__name__}: {e.last_exc}"}
     except Exception as e:
     except Exception as e:
         return {**base, "ok": False, "error": f"{type(e).__name__}: {e}"}
         return {**base, "ok": False, "error": f"{type(e).__name__}: {e}"}
 
 
 
 
+async def gather_pairs(pairs, *, on_each=None, include_response=False) -> list:
+    """对一批 (query_id, case_id) 并发跑 process_one(受 BATCH_CONCURRENCY 限流,
+    共用一个 keep-alive 连接池)。on_each(index, result):每帖完成时回调(用于打印进度)。
+    返回结果列表(顺序同 pairs)。供 FastAPI batch 与 CLI --run 共用。"""
+    sem = asyncio.Semaphore(BATCH_CONCURRENCY)
+    async with httpx.AsyncClient(
+        timeout=API_TIMEOUT,
+        limits=httpx.Limits(max_connections=BATCH_CONCURRENCY,
+                            max_keepalive_connections=BATCH_CONCURRENCY),
+    ) as client:
+        async def _one(i, q, c):
+            async with sem:
+                r = await process_one(client, q, c, include_response=include_response)
+                if on_each:
+                    on_each(i, r)
+                return r
+        return await asyncio.gather(*[_one(i, q, c) for i, (q, c) in enumerate(pairs)])
+
+
 # ── FastAPI ──────────────────────────────────────────────────────────────────────
 # ── FastAPI ──────────────────────────────────────────────────────────────────────
 app = FastAPI(title="mode_workflow · category-match", version="1.0")
 app = FastAPI(title="mode_workflow · category-match", version="1.0")
 
 
@@ -260,26 +320,51 @@ async def category_match_batch(req: BatchReq):
     pairs = list(zip(req.query_id, req.case_id))
     pairs = list(zip(req.query_id, req.case_id))
     if not pairs:
     if not pairs:
         return {"total": 0, "ok": 0, "failed": 0, "results": []}
         return {"total": 0, "ok": 0, "failed": 0, "results": []}
+    results = await gather_pairs(pairs)
+    ok = sum(1 for r in results if r.get("ok"))
+    return {"total": len(results), "ok": ok, "failed": len(results) - ok, "results": results}
 
 
-    sem = asyncio.Semaphore(BATCH_CONCURRENCY)
-    # 整个批次共用一个连接池(keep-alive),并发受信号量约束,避免压垮下游接口
-    async with httpx.AsyncClient(
-        timeout=API_TIMEOUT,
-        limits=httpx.Limits(max_connections=BATCH_CONCURRENCY,
-                            max_keepalive_connections=BATCH_CONCURRENCY),
-    ) as client:
-        async def _one(q, c):
-            async with sem:
-                return await process_one(client, q, c, include_response=False)
-
-        results = await asyncio.gather(*[_one(q, c) for q, c in pairs])
 
 
+def _cli_run(query_id: str, case_ids: List[str]) -> int:
+    """CLI 归类:对一个 query 下的若干 case 跑归类,实时打印进度(供 server.py 起子进程、
+    前端轮询日志)。返回退出码:全成功=0,有失败=1(便于任务状态判定)。"""
+    pairs = [(query_id, c) for c in case_ids]
+    if not pairs:
+        print("无 case 可归类"); return 0
+    print(f"开始归类:query_id={query_id}  {len(pairs)} 帖  (下游 {CATEGORY_MATCH_API}{MATCH_ENDPOINT})", flush=True)
+    n = len(pairs)
+    cnt = {"i": 0}
+
+    def _progress(_idx, r):
+        cnt["i"] += 1
+        tag = "✓ OK  " if r.get("ok") else "✗ FAIL"
+        extra = (f"items={r.get('items_sent')} rows_updated={r.get('rows_updated')} attempts={r.get('attempts')}"
+                 if r.get("ok") else (r.get("error") or r.get("note") or ""))
+        print(f"[{cnt['i']}/{n}] {tag} {r.get('case_id')}  {extra}", flush=True)
+
+    results = asyncio.run(gather_pairs(pairs, on_each=_progress))
     ok = sum(1 for r in results if r.get("ok"))
     ok = sum(1 for r in results if r.get("ok"))
-    return {"total": len(results), "ok": ok, "failed": len(results) - ok, "results": results}
+    print(f"\n归类完成:{ok}/{n} 成功,{n - ok} 失败", flush=True)
+    return 0 if ok == n else 1
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
+    import argparse
+    ap = argparse.ArgumentParser(description="category-match:FastAPI 服务 或 CLI 归类(--run)")
+    ap.add_argument("port", nargs="?", type=int,
+                    default=int(os.environ.get("CATEGORY_MATCH_PORT", "8780")),
+                    help="服务端口(不带 --run 时生效)")
+    ap.add_argument("--run", action="store_true", help="CLI 归类模式:跑完即退出,不起服务")
+    ap.add_argument("--query-id", help="--run 用:post_id")
+    ap.add_argument("--case-ids", help="--run 用:逗号分隔的 case_id(knowledge_id)")
+    args = ap.parse_args()
+
+    if args.run:
+        cids = [c.strip() for c in (args.case_ids or "").split(",") if c.strip()]
+        if not args.query_id or not cids:
+            print("--run 需提供 --query-id 与 --case-ids"); sys.exit(2)
+        sys.exit(_cli_run(args.query_id, cids))
+
     import uvicorn
     import uvicorn
-    port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("CATEGORY_MATCH_PORT", "8780"))
-    print(f"🚀 category-match 服务 → http://0.0.0.0:{port}  (下游 {CATEGORY_MATCH_API}{MATCH_ENDPOINT})")
-    uvicorn.run(app, host="0.0.0.0", port=port)
+    print(f"🚀 category-match 服务 → http://0.0.0.0:{args.port}  (下游 {CATEGORY_MATCH_API}{MATCH_ENDPOINT})")
+    uvicorn.run(app, host="0.0.0.0", port=args.port)