Jelajahi Sumber

feat(mode_workflow): 新增可选--case-ids参数、修复仪表盘版本bug、更新文档与数据库架构

本次提交包含以下修改:
- 为所有工作流脚本添加可选`--case-ids`参数,不传时自动处理当前query下全部已采纳案例
- 修复仪表盘最新版本检测的排序问题,改用自增ID替代字符串版本比较,正确过滤link_复制版本
- 更新数据库架构,新增source列、seq列与唯一组合索引,创建独立的工具导入台账表
- 重构导入与提取脚本,优化事务处理与源数据提取逻辑
- 更新所有文档,补充新的使用示例与默认行为说明
刘文武 16 jam lalu
induk
melakukan
3bcdbafa0f

+ 1 - 0
examples/mode_workflow/README.md

@@ -61,6 +61,7 @@ query 搜到时只真实解构一次——`stages/{procedure,tool}_extract.py` 
 python stages/import_process_knowledge.py --dry-run            # 只取数+组装 payload,不调接口(先验证)
 python stages/import_process_knowledge.py --dry-run -v         # 同上,打印完整 payload JSON
 python stages/import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+python stages/import_process_knowledge.py --case-ids xhs_a,gzh_b # 只传指定 case(采纳集内精确补传/重传单帖)
 python stages/import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
 python stages/import_process_knowledge.py                      # 真实导入(去掉 --dry-run)
 # 其它:--api-url <根地址>(默认 47.236.83.130:8001)  --delay <毫秒>(调用间隔,默认 100)

+ 141 - 24
examples/mode_workflow/db.py

@@ -141,7 +141,9 @@ CREATE TABLE IF NOT EXISTS mode_process (
   version       VARCHAR(32)   NULL COMMENT 'v_MMDDHHMM,保留历史;link_* 为跨 query 复制(cost=0)',
   cost_usd      DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
   duration_s    FLOAT         NULL,
+  seq           SMALLINT      NULL COMMENT '帖内序号(0-based);与 (query_id,case_id,version) 组唯一键防并发/重复写',
   created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  UNIQUE KEY uk_q_case_ver_seq (query_id, case_id, version, seq),
   KEY idx_case_ver (case_id, version),
   KEY idx_qid (query_id)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
@@ -154,6 +156,7 @@ CREATE TABLE IF NOT EXISTS mode_tools (
   case_id       VARCHAR(128)  NOT NULL,
   platform      VARCHAR(32)   NULL,
   post_title    VARCHAR(512)  NULL,
+  source        JSON          NULL COMMENT '解构时帖子来源块(tool_extract._row_to_source 产出)',
   tool_name     VARCHAR(255)  NULL,
   substance_scope JSON        NULL COMMENT '实质作用域(数组)',
   form_scope    JSON          NULL COMMENT '形式作用域(数组或null)',
@@ -169,7 +172,9 @@ CREATE TABLE IF NOT EXISTS mode_tools (
   version       VARCHAR(32)   NULL COMMENT 'v_MMDDHHMM;link_* 为跨 query 复制(cost=0)',
   cost_usd      DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
   duration_s    FLOAT         NULL,
+  seq           SMALLINT      NULL COMMENT '帖内序号(0-based);与 (query_id,case_id,version) 组唯一键防并发/重复写',
   created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  UNIQUE KEY uk_q_case_ver_seq (query_id, case_id, version, seq),
   KEY idx_case_ver (case_id, version),
   KEY idx_qid (query_id),
   KEY idx_tool_name (tool_name)
@@ -181,6 +186,8 @@ CREATE TABLE IF NOT EXISTS mode_tools (
 # 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本:
 # 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。
 # 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。
+# 注:工具知识用独立的 tools_ingest_log,不与本表混用(case_id 是帖子物理身份,
+#     同帖可能既被工序解构又被工具解构,共表会在 (case_id, index) 上撞键)。
 DDL_INGEST_LOG = """
 CREATE TABLE IF NOT EXISTS knowledge_ingest_log (
   id            BIGINT AUTO_INCREMENT PRIMARY KEY,
@@ -195,6 +202,43 @@ CREATE TABLE IF NOT EXISTS knowledge_ingest_log (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序知识已导入台账(防重复上传)';
 """
 
+# 工具知识「已导入知识库」台账:语义同 knowledge_ingest_log,但针对工具方向独立成表
+# (stages/import_tools_knowledge.py 用)。每条知识 = 某 case 的某个工具(tool_index 1-based),
+# 版本记录导入时的 mode_tools 版本;变了(重解构)应重导,不变即「已传过」跳过。
+DDL_TOOLS_INGEST_LOG = """
+CREATE TABLE IF NOT EXISTS tools_ingest_log (
+  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
+  case_id       VARCHAR(128)  NOT NULL,
+  tool_index    INT           NOT NULL COMMENT '工具序号(1-based),对齐导入脚本枚举',
+  version       VARCHAR(32)   NULL COMMENT '导入时 mode_tools 版本;变了应重导',
+  knowledge_id  VARCHAR(128)  NULL COMMENT '接口返回的 knowledge_id',
+  api_url       VARCHAR(255)  NULL,
+  ingested_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  updated_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+  UNIQUE KEY uk_case_tool (case_id, tool_index)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具知识已导入台账(防重复上传)';
+"""
+
+
+def _ensure_column(cur, table, column, column_ddl):
+    """给已存在的表幂等补列:列已存在则跳过(MySQL ADD COLUMN 无 IF NOT EXISTS)。
+    column_ddl 为 ADD COLUMN 后的完整定义,如 \"source JSON NULL ... AFTER post_title\"。"""
+    cur.execute("""SELECT COUNT(*) AS n FROM information_schema.columns
+                   WHERE table_schema=DATABASE() AND table_name=%s AND column_name=%s""",
+                (table, column))
+    if cur.fetchone()["n"] == 0:
+        cur.execute(f"ALTER TABLE {table} ADD COLUMN {column_ddl}")
+
+
+def _ensure_unique_index(cur, table, index_name, cols):
+    """幂等加唯一索引:已存在则跳过(MySQL ADD INDEX 无 IF NOT EXISTS)。
+    cols 为列表达式,如 "query_id, case_id, version, seq"。加之前需保证无冲突数据。"""
+    cur.execute("""SELECT COUNT(*) AS n FROM information_schema.statistics
+                   WHERE table_schema=DATABASE() AND table_name=%s AND index_name=%s""",
+                (table, index_name))
+    if cur.fetchone()["n"] == 0:
+        cur.execute(f"ALTER TABLE {table} ADD UNIQUE KEY {index_name} ({cols})")
+
 
 def init_tables():
     conn = _conn()
@@ -205,11 +249,38 @@ def init_tables():
             cur.execute(DDL_PROCESS)
             cur.execute(DDL_TOOLS)
             cur.execute(DDL_INGEST_LOG)
+            cur.execute(DDL_TOOLS_INGEST_LOG)
             # 历史库迁移:version 由 VARCHAR(16) 放宽到 32,容纳 link_v_mopN_* 复制版本。
             # MODIFY 幂等(已是 32 则 MySQL 元数据无操作),建表后表必存在,可安全执行。
             for t in ("mode_process", "mode_tools"):
                 cur.execute(f"ALTER TABLE {t} MODIFY COLUMN version VARCHAR(32) NULL")
-        print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools, knowledge_ingest_log")
+            # 历史库迁移:给老 mode_tools 补 source 列(MySQL 的 ADD COLUMN 无 IF NOT EXISTS,
+            # 故先查 information_schema 判存在,缺了才 ADD,幂等)。
+            _ensure_column(cur, "mode_tools", "source",
+                           "source JSON NULL COMMENT '解构时帖子来源块' AFTER post_title")
+            # 历史库迁移:加 seq(帖内序号)+ (query_id,case_id,version,seq) 唯一键,防并发/重复
+            # 写入产生重复行。顺序必须是 加列 → 回填 → 加唯一键。MySQL 5.7 无窗口函数,seq 在
+            # 应用层按 (query_id,case_id,version) 内 id 升序回填(现有数据该粒度已无重复)。
+            for t in ("mode_process", "mode_tools"):
+                _ensure_column(cur, t, "seq",
+                               "seq SMALLINT NULL COMMENT '帖内序号(0-based)' AFTER duration_s")
+            for t in ("mode_process", "mode_tools"):
+                cur.execute(f"""SELECT id, query_id, case_id, version FROM {t}
+                                WHERE seq IS NULL ORDER BY query_id, case_id, version, id""")
+                key, n, ups = None, 0, []
+                for r in cur.fetchall():
+                    k = (r["query_id"], r["case_id"], r["version"])
+                    if k != key:
+                        key, n = k, 0
+                    ups.append((n, r["id"])); n += 1
+                if ups:
+                    cur.executemany(f"UPDATE {t} SET seq=%s WHERE id=%s", ups)
+                    print(f"  ↳ {t}: 回填 seq {len(ups)} 行")
+            for t in ("mode_process", "mode_tools"):
+                _ensure_unique_index(cur, t, "uk_q_case_ver_seq",
+                                     "query_id, case_id, version, seq")
+        print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools, "
+              "knowledge_ingest_log, tools_ingest_log")
     finally:
         conn.close()
 
@@ -513,12 +584,13 @@ def replace_process(query_id, case_id, platform, post_title, payload,
     procedures = payload.get("procedures") or []
     conn = _conn()
     try:
+        conn.begin()   # DELETE+INSERT 原子化:配合 uk_q_case_ver_seq,并发/重复写入不会留下重复行
         with conn.cursor() as cur:
             cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
                         (case_id, version))
             if procedures:
                 rows = []
-                for p in procedures:
+                for i, p in enumerate(procedures):
                     steps = p.get("steps") or []
                     vias = []
                     for s in steps:
@@ -531,16 +603,20 @@ def replace_process(query_id, case_id, platform, post_title, payload,
                         p.get("purpose"), p.get("category"),
                         _j(p.get("declarations")), _j(p.get("type_registry")),
                         _j(steps), len(steps), _j(vias),
-                        model, version, cost_usd, duration_s,
+                        model, version, cost_usd, duration_s, i,
                     ))
                 cur.executemany("""
                 INSERT INTO mode_process
                   (query_id, case_id, platform, post_title, source, procedure_id, name,
                    purpose, category, declarations, type_registry, steps, step_count,
-                   tools_used, model, version, cost_usd, duration_s)
-                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+                   tools_used, model, version, cost_usd, duration_s, seq)
+                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
                 """, rows)
+        conn.commit()
         return len(procedures)
+    except Exception:
+        conn.rollback()
+        raise
     finally:
         conn.close()
 
@@ -551,7 +627,8 @@ def fetch_process_versions(case_id):
         with conn.cursor() as cur:
             cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
                            FROM mode_process WHERE case_id=%s
-                           GROUP BY version ORDER BY version DESC""", (case_id,))
+                           GROUP BY version
+                           ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,))
             return cur.fetchall()
     finally:
         conn.close()
@@ -564,7 +641,7 @@ def fetch_process(case_id, version=None):
         with conn.cursor() as cur:
             if version is None:
                 cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
-                               ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+                               ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (case_id,))
                 row = cur.fetchone()
                 if not row:
                     return None
@@ -597,31 +674,38 @@ def _proc_payload(case_id, version, rows):
 # ── mode_tools ───────────────────────────────────────────────────────────────
 
 def replace_tools(query_id, case_id, platform, post_title, tools,
-                  model, version, cost_usd, duration_s):
-    """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。"""
+                  model, version, cost_usd, duration_s, source=None):
+    """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。
+    source:帖子来源块(同 mode_process,每行重复存),供知识上传脚本重建 source 用。"""
+    src = _j(source)
     conn = _conn()
     try:
+        conn.begin()   # DELETE+INSERT 原子化:配合 uk_q_case_ver_seq,并发/重复写入不会留下重复行
         with conn.cursor() as cur:
             cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
                         (case_id, version))
             if tools:
                 rows = [(
-                    query_id, case_id, platform, (post_title or "")[:500],
+                    query_id, case_id, platform, (post_title or "")[:500], src,
                     (t.get("工具名称") or "")[:250],
                     _j(t.get("实质作用域")), _j(t.get("形式作用域")),
                     t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
                     _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
-                    t.get("最新更新时间"), model, version, cost_usd, duration_s,
-                ) for t in tools]
+                    t.get("最新更新时间"), model, version, cost_usd, duration_s, i,
+                ) for i, t in enumerate(tools)]
                 cur.executemany("""
                 INSERT INTO mode_tools
-                  (query_id, case_id, platform, post_title, tool_name, substance_scope,
+                  (query_id, case_id, platform, post_title, source, tool_name, substance_scope,
                    form_scope, creation_layer, source_link, input_desc, output_desc,
                    usage_json, cases_json, defects_json, updated_time, model, version,
-                   cost_usd, duration_s)
-                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+                   cost_usd, duration_s, seq)
+                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
                 """, rows)
+        conn.commit()
         return len(tools)
+    except Exception:
+        conn.rollback()
+        raise
     finally:
         conn.close()
 
@@ -632,7 +716,8 @@ def fetch_tools_versions(case_id):
         with conn.cursor() as cur:
             cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
                            FROM mode_tools WHERE case_id=%s
-                           GROUP BY version ORDER BY version DESC""", (case_id,))
+                           GROUP BY version
+                           ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,))
             return cur.fetchall()
     finally:
         conn.close()
@@ -645,7 +730,7 @@ def fetch_tools(case_id, version=None):
         with conn.cursor() as cur:
             if version is None:
                 cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
-                               ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+                               ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (case_id,))
                 row = cur.fetchone()
                 if not row:
                     return None
@@ -673,6 +758,7 @@ def _tools_payload(case_id, version, rows):
             "title": rows[0]["post_title"], "model": rows[0]["model"],
             "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
             "duration_s": rows[0]["duration_s"],
+            "source": _loads(rows[0].get("source")),
             "tool_count": len(tools), "tools": tools}
 
 
@@ -688,7 +774,8 @@ def fetch_extract(mode, case_id, version=None):
         with conn.cursor() as cur:
             cur.execute(f"""SELECT version, COUNT(*) AS n, MAX(model) AS model
                             FROM {mtable} WHERE case_id=%s
-                            GROUP BY version ORDER BY version DESC""", (case_id,))
+                            GROUP BY version
+                            ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,))
             versions = cur.fetchall()
             # 详情:把"取最新版本"折进同一条 SQL,版本指定时直接用;省一次往返。
             target = version or (versions[0]["version"] if versions else None)
@@ -716,7 +803,7 @@ def latest_real_version(case_id, mode="process"):
         with conn.cursor() as cur:
             cur.execute(f"""SELECT version, query_id FROM {table}
                             WHERE case_id=%s AND LEFT(version,5) <> 'link_'
-                            ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+                            ORDER BY id DESC LIMIT 1""", (case_id,))
             return cur.fetchone()
     finally:
         conn.close()
@@ -732,7 +819,7 @@ def link_process(query_id, case_id, mode="process"):
         with conn.cursor() as cur:
             cur.execute(f"""SELECT version FROM {table}
                             WHERE case_id=%s AND LEFT(version,5) <> 'link_'
-                            ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+                            ORDER BY id DESC LIMIT 1""", (case_id,))
             r = cur.fetchone()
             if not r:
                 return 0
@@ -905,6 +992,34 @@ def mark_ingested(case_id, proc_index, version, knowledge_id=None, api_url=None)
         conn.close()
 
 
+def fetch_tools_ingested_map(case_id):
+    """返回 {tool_index: version} —— 该 case 各工具已导入知识库的版本。空表示没传过。
+    工具方向独立台账(tools_ingest_log),与工序的 knowledge_ingest_log 互不干扰。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("SELECT tool_index, version FROM tools_ingest_log WHERE case_id=%s",
+                        (case_id,))
+            return {r["tool_index"]: r["version"] for r in cur.fetchall()}
+    finally:
+        conn.close()
+
+
+def mark_tools_ingested(case_id, tool_index, version, knowledge_id=None, api_url=None):
+    """记一条工具「已导入」台账(case_id+tool_index 唯一,重导同序号则更新版本/knowledge_id)。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("""INSERT INTO tools_ingest_log
+                             (case_id, tool_index, version, knowledge_id, api_url)
+                           VALUES (%s,%s,%s,%s,%s)
+                           ON DUPLICATE KEY UPDATE version=VALUES(version),
+                             knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""",
+                        (case_id, tool_index, version, knowledge_id, api_url))
+    finally:
+        conn.close()
+
+
 def fetch_dashboard_rows():
     """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。
     优化:① 不传 llm_evaluation 整块,SQL 只取采纳判定要的相关性得分;
@@ -925,15 +1040,17 @@ def fetch_dashboard_rows():
                 p["mode"] = "tools"
             posts += st
             # 成本/耗时按全部版本计;steps 仅最新版需要 → 非最新版只回 NULL,省传输。
-            cur.execute("""SELECT p.case_id, p.version, p.cost_usd, p.duration_s, p.created_at,
+            cur.execute("""SELECT p.id, p.case_id, p.version, p.cost_usd, p.duration_s, p.created_at,
                                   CASE WHEN p.version = m.maxv THEN p.steps END AS steps
                            FROM mode_process p
-                           JOIN (SELECT case_id, MAX(version) AS maxv
-                                 FROM mode_process GROUP BY case_id) m
+                           JOIN (SELECT t.case_id, t.version AS maxv FROM mode_process t
+                                 JOIN (SELECT case_id, MAX(id) AS mid FROM mode_process
+                                       WHERE LEFT(version,5) <> 'link_' GROUP BY case_id) x
+                                   ON t.id = x.mid) m
                              ON p.case_id = m.case_id
                            ORDER BY p.id""")
             procs = cur.fetchall()
-            cur.execute("""SELECT case_id, version, tool_name, substance_scope,
+            cur.execute("""SELECT id, case_id, version, tool_name, substance_scope,
                                   form_scope, cost_usd, duration_s, created_at
                            FROM mode_tools""")
             tools = cur.fetchall()

+ 13 - 5
examples/mode_workflow/server.py

@@ -176,14 +176,22 @@ def _dashboard_cached():
 def _dashboard():
     posts, procs, tools = db.fetch_dashboard_rows()
 
-    # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)
+    # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)。
+    # 「最新」= 最近插入的真实解构(按自增 id),排除 link_ 复制版;与 db.py 的 maxv 同口径。
+    # 旧实现按 version 字符串比大小,被 v_top5_/v_rest_ 等前缀版本号污染(字典序高于 v_0617*)。
     def latest(rows):
-        best = {}
+        best = {}   # case_id -> (id, version) of newest real version
         for r in rows:
+            if (r["version"] or "").startswith("link_"):
+                continue
             cid = r["case_id"]
-            if cid not in best or (r["version"] or "") > (best[cid] or ""):
-                best[cid] = r["version"]
-        return [r for r in rows if r["version"] == best[r["case_id"]]]
+            if cid not in best or (r["id"] or 0) > best[cid][0]:
+                best[cid] = (r["id"] or 0, r["version"])
+        return [r for r in rows if cid_best(best, r)]
+
+    def cid_best(best, r):
+        b = best.get(r["case_id"])
+        return b is not None and r["version"] == b[1]
 
     latest_procs = latest(procs)
     latest_tools = latest(tools)

+ 26 - 7
examples/mode_workflow/stages/import_process_knowledge.py

@@ -25,6 +25,7 @@
     python stages/import_process_knowledge.py --dry-run            # 只组装+打印,不调接口
     python stages/import_process_knowledge.py --dry-run --verbose  # 打印完整 payload JSON
     python stages/import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+    python stages/import_process_knowledge.py --case-ids xhs_a,gzh_b # 只传指定 case(采纳集内精确补传)
     python stages/import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
     python stages/import_process_knowledge.py --api-url http://... # 指定后端地址
     python stages/import_process_knowledge.py --delay 200          # 每次调用间隔 200ms
@@ -67,17 +68,25 @@ logger = logging.getLogger(__name__)
 
 # ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)──────────────────────
 
-def iter_cases_from_db(query_id=None, limit=None):
+def iter_cases_from_db(query_id=None, limit=None, case_ids=None):
     """产出 (case_id, source_data, procedures, version)。
 
     先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。
     fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。
     version 用于上传去重:同 case 同工序、版本未变即视为已传过,跳过。
+    case_ids 不为空时,在采纳集内精确筛选(点名 case 不在采纳集则告警跳过,不绕过采纳口径)。
     """
-    case_ids = db.fetch_adopted_process_cases(query_id)
+    adopted = db.fetch_adopted_process_cases(query_id)
+    if case_ids:
+        adopted_set = set(adopted)
+        missing = [c for c in case_ids if c not in adopted_set]
+        if missing:
+            logger.warning("--case-ids 中 %d 个不在采纳集(跳过):%s",
+                           len(missing), ",".join(missing))
+        adopted = [c for c in case_ids if c in adopted_set]
     if limit:
-        case_ids = case_ids[:limit]
-    for case_id in case_ids:
+        adopted = adopted[:limit]
+    for case_id in adopted:
         payload = db.fetch_process(case_id)   # 最新版本
         if not payload:
             continue
@@ -231,10 +240,15 @@ def ingest_one(api_url, payload, dry_run):
 
 # ── 主循环 ────────────────────────────────────────────────────────────────────
 
-def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
-    cases = list(iter_cases_from_db(query_id, limit))
+def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force, case_ids=None):
+    cases = list(iter_cases_from_db(query_id, limit, case_ids))
     if not cases:
-        scope = f"(query_id={query_id})" if query_id else ""
+        parts = []
+        if query_id:
+            parts.append(f"query_id={query_id}")
+        if case_ids:
+            parts.append(f"case_ids={','.join(case_ids)}")
+        scope = f"({'; '.join(parts)})" if parts else ""
         logger.error("DB 中未发现任何「已采纳且有工序解构」的 case %s", scope)
         sys.exit(1)
 
@@ -321,12 +335,16 @@ def main():
                         help="两次 API 调用之间的间隔毫秒数(默认:100)")
     parser.add_argument("--query-id", default=None, metavar="QID",
                         help="只导入该搜索任务(query_id)下的采纳 case")
+    parser.add_argument("--case-ids", default=None, metavar="C1,C2",
+                        help="只导入指定 case_id(逗号分隔);仍受采纳口径约束,不在采纳集的跳过")
     parser.add_argument("--limit", type=int, default=None, metavar="N",
                         help="只处理前 N 个 case(调试用)")
     parser.add_argument("--force", action="store_true",
                         help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)")
 
     args = parser.parse_args()
+    case_ids = ([c.strip() for c in args.case_ids.split(",") if c.strip()]
+                if args.case_ids else None)
     run(
         api_url=args.api_url,
         dry_run=args.dry_run,
@@ -335,6 +353,7 @@ def main():
         query_id=args.query_id,
         limit=args.limit,
         force=args.force,
+        case_ids=case_ids,
     )
 
 

+ 58 - 25
examples/mode_workflow/stages/import_tools_knowledge.py

@@ -10,7 +10,8 @@
 字段映射(同参考实现 tools_knowledge/main.py):
   source.id          ← case_id(DB 主键)
   source.source_type ← 固定 "post"
-  source.title       ← mode_tools.post_title
+  source.title       ← mode_tools.source 块里的帖子标题(老数据回退 post_title)
+  source_metadata    ← source 块的 platform/url/like_count/publish_timestamp/正文摘要
   每个 tool → 一条知识:
     title          ← tool.工具名称;为空回退 "来源标题 — 工具N"
     content        ← 整个 tool 对象的 JSON 串(原封不动传过去)
@@ -20,14 +21,15 @@
 
 采纳口径:db.is_adopted_rel(相关性<4 / 实现完整性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
 
-去重台账:复用 knowledge_ingest_log(case_id, proc_index)。工具与工序共用此表,故工具
-index 统一加 TOOL_INDEX_BASE 偏移,避免同一 case 既有工序又有工具时键冲突
+去重台账:工具方向独立表 tools_ingest_log(case_id, tool_index),与工序
+knowledge_ingest_log 互不干扰(同帖可能既被工序解构又被工具解构,分表避免撞键)
 
 用法:
     python stages/import_tools_knowledge.py                      # 真实导入(采纳工具全量)
     python stages/import_tools_knowledge.py --dry-run            # 只组装+打印,不调接口
     python stages/import_tools_knowledge.py --dry-run --verbose  # 打印完整 payload JSON
     python stages/import_tools_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+    python stages/import_tools_knowledge.py --case-ids xhs_a,gzh_b # 只传指定 case(采纳集内精确补传)
     python stages/import_tools_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
     python stages/import_tools_knowledge.py --api-url http://... # 指定后端地址
     python stages/import_tools_knowledge.py --delay 200          # 每次调用间隔 200ms
@@ -56,9 +58,6 @@ DIM_CREATIONS = ["制作"]
 
 EXT_KEY_TOOL = "工具"
 
-# 工具知识在共享台账 knowledge_ingest_log 里的 index 偏移量,避免与工序(1-based)冲突。
-TOOL_INDEX_BASE = 100000
-
 # ── 日志 ──────────────────────────────────────────────────────────────────────
 
 logging.basicConfig(
@@ -71,26 +70,47 @@ logger = logging.getLogger(__name__)
 
 # ── 数据来源:从 DB 取采纳工具(替代参考实现的本地文件扫描)──────────────────────
 
-def iter_cases_from_db(query_id=None, limit=None):
+def _build_source_data(payload):
+    """从 fetch_tools 的 payload 提炼上传所需的来源字段。
+    优先用解构时存下的 source 块(tool_extract._row_to_source 产出);老数据无 source
+    时回退 mode_tools 自带的 post_title/platform(此时点赞/正文/时间取不到,留空)。
+    """
+    src = payload.get("source") or {}
+    post = src.get("post") or {}
+    return {
+        "title": (post.get("title") or payload.get("title") or "").strip() or None,
+        "platform": src.get("platform") or payload.get("platform") or "",
+        "url": src.get("source_url") or post.get("link") or None,
+        "like_count": post.get("like_count"),
+        "publish_timestamp": post.get("publish_timestamp") or "",
+        "excerpt": (post.get("body_text") or "")[:500],
+    }
+
+
+def iter_cases_from_db(query_id=None, limit=None, case_ids=None):
     """产出 (case_id, source_data, tools, version)。
 
     先取采纳 case_id 列表,再逐个 fetch_tools 重建解构详情(取最新版本)。
     fetch_tools 返回 None(无解构行)或 tools 为空的 case 自动跳过。
-    source_data 用 mode_tools 自带的 platform/post_title 拼装(mode_tools 不存 source 块)。
+    source_data 由 _build_source_data 从 mode_tools.source 块提炼(老数据回退 post_title)。
     version 用于上传去重:同 case 同工具、版本未变即视为已传过,跳过。
+    case_ids 不为空时,在采纳集内精确筛选(点名 case 不在采纳集则告警跳过,不绕过采纳口径)。
     """
-    case_ids = db.fetch_adopted_tools_cases(query_id)
+    adopted = db.fetch_adopted_tools_cases(query_id)
+    if case_ids:
+        adopted_set = set(adopted)
+        missing = [c for c in case_ids if c not in adopted_set]
+        if missing:
+            logger.warning("--case-ids 中 %d 个不在采纳集(跳过):%s",
+                           len(missing), ",".join(missing))
+        adopted = [c for c in case_ids if c in adopted_set]
     if limit:
-        case_ids = case_ids[:limit]
-    for case_id in case_ids:
+        adopted = adopted[:limit]
+    for case_id in adopted:
         payload = db.fetch_tools(case_id)   # 最新版本
         if not payload:
             continue
-        source_data = {
-            "title": payload.get("title"),
-            "platform": payload.get("platform"),
-        }
-        yield (case_id, source_data,
+        yield (case_id, _build_source_data(payload),
                (payload.get("tools") or []), payload.get("version"))
 
 
@@ -191,7 +211,11 @@ def build_payload(source_id, source_data, tool, tool_index):
 
     source_metadata = {
         "platform": source_data.get("platform") or "",
-        "url": tool.get("来源链接") or None,
+        # 链接优先取帖子来源,缺失时回退工具自身的来源链接(如 GitHub)。
+        "url": source_data.get("url") or tool.get("来源链接") or None,
+        "like_count": source_data.get("like_count"),
+        "publish_timestamp": source_data.get("publish_timestamp") or "",
+        "excerpt": source_data.get("excerpt") or "",
         "creation_layer": tool.get("创作层级") or "",
         "updated_time": tool.get("最新更新时间") or "",
     }
@@ -245,10 +269,15 @@ def ingest_one(api_url, payload, dry_run):
 
 # ── 主循环 ────────────────────────────────────────────────────────────────────
 
-def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
-    cases = list(iter_cases_from_db(query_id, limit))
+def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force, case_ids=None):
+    cases = list(iter_cases_from_db(query_id, limit, case_ids))
     if not cases:
-        scope = f"(query_id={query_id})" if query_id else ""
+        parts = []
+        if query_id:
+            parts.append(f"query_id={query_id}")
+        if case_ids:
+            parts.append(f"case_ids={','.join(case_ids)}")
+        scope = f"({'; '.join(parts)})" if parts else ""
         logger.error("DB 中未发现任何「已采纳且有工具解构」的 case %s", scope)
         sys.exit(1)
 
@@ -264,14 +293,13 @@ def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
             logger.warning("  无 tools,跳过")
             skip_count += 1
             continue
-        # 去重台账:该 case 各工具已导入的版本。force 时清空(强制重导)。
-        ingested = {} if force else db.fetch_ingested_map(case_id)
+        # 去重台账:该 case 各工具已导入的版本(工具独立表)。force 时清空(强制重导)。
+        ingested = {} if force else db.fetch_tools_ingested_map(case_id)
         logger.info("  source_id=%-45s  tools=%d  version=%s", case_id, len(tools), version)
 
         for idx, tool in enumerate(tools, 1):
-            ledger_index = TOOL_INDEX_BASE + idx
             # 已导入且版本未变 → 跳过,杜绝重复上传(版本变了说明重解构过,应重导)。
-            if not force and ingested.get(ledger_index) == version:
+            if not force and ingested.get(idx) == version:
                 logger.info("  ♻️ [%d/%d] 已导入(版本 %s),跳过", idx, len(tools), version)
                 dup_count += 1
                 continue
@@ -301,7 +329,7 @@ def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
                 # 仅真实导入成功才记台账(dry-run 不写,免污染去重状态)
                 if not dry_run:
                     try:
-                        db.mark_ingested(case_id, ledger_index, version, kid, api_url)
+                        db.mark_tools_ingested(case_id, idx, version, kid, api_url)
                     except Exception as exc:
                         logger.warning("  ⚠️ 台账写入失败(不影响本次导入):%s", exc)
             else:
@@ -335,12 +363,16 @@ def main():
                         help="两次 API 调用之间的间隔毫秒数(默认:100)")
     parser.add_argument("--query-id", default=None, metavar="QID",
                         help="只导入该搜索任务(query_id)下的采纳 case")
+    parser.add_argument("--case-ids", default=None, metavar="C1,C2",
+                        help="只导入指定 case_id(逗号分隔);仍受采纳口径约束,不在采纳集的跳过")
     parser.add_argument("--limit", type=int, default=None, metavar="N",
                         help="只处理前 N 个 case(调试用)")
     parser.add_argument("--force", action="store_true",
                         help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)")
 
     args = parser.parse_args()
+    case_ids = ([c.strip() for c in args.case_ids.split(",") if c.strip()]
+                if args.case_ids else None)
     run(
         api_url=args.api_url,
         dry_run=args.dry_run,
@@ -349,6 +381,7 @@ def main():
         query_id=args.query_id,
         limit=args.limit,
         force=args.force,
+        case_ids=case_ids,
     )
 
 

+ 11 - 2
examples/mode_workflow/stages/procedure_extract.py

@@ -5,6 +5,7 @@
 配图下载转 base64(绕防盗链)随文本一起发。结果按工序拆行写 mode_process。
 
 用法(一般由 server.py 起子进程调):
+  python stages/procedure_extract.py --query-id q0000                          # 不传 case-ids = 解构该 query 全部已采纳帖
   python stages/procedure_extract.py --query-id q0000 --case-ids xhs_abc
   python stages/procedure_extract.py --query-id q0000 --case-ids xhs_abc --model google/gemini-3.1-flash-lite
 """
@@ -188,7 +189,14 @@ async def extract_one(row, system, llm_call, model, args):
 
 
 async def run(args):
-    case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+    if args.case_ids:
+        case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+    else:   # 不传 --case-ids → 解构该 query 下全部已采纳帖(口径同 db.fetch_posts.adopted)
+        case_ids = [p["case_id"] for p in db.fetch_posts(args.query_id, "process")
+                    if p.get("adopted")]
+        print(f"📋 未指定 --case-ids → query {args.query_id} 全部已采纳帖:{len(case_ids)} 个")
+    if not case_ids:
+        print("❌ 没有可解构的帖子(query 下无已采纳帖,或 query_id 不存在)"); return 1
 
     # 方案A:解构前先按 case 全局去重。已真实解构过的帖不再调 LLM(省钱),
     # 跨 query 的用 link_* 复制行补齐关联(cost=0)。--force 跳过去重强制重解构。
@@ -239,7 +247,8 @@ async def run(args):
 def main():
     p = argparse.ArgumentParser(description="工序解构:search_process 帖子 → mode_process")
     p.add_argument("--query-id", required=True)
-    p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
+    p.add_argument("--case-ids", default=None,
+                   help="逗号分隔 case_id 列表;不传=该 query 下全部已采纳帖")
     p.add_argument("--model", default=DEFAULT_MODEL)
     p.add_argument("--version", default=None, help="默认自动 v_月日时分")
     p.add_argument("--max-images", type=int, default=MAX_IMAGES)

+ 14 - 3
examples/mode_workflow/stages/tool_extract.py

@@ -7,6 +7,7 @@
 - 写库:db.replace_tools(同版本幂等,跨版本保留);runs/mode_tools/ 留调试副本
 
 用法(一般由 server.py 起子进程调):
+  python stages/tool_extract.py --query-id q0000                              # 不传 case-ids = 解构该 query 全部已采纳帖
   python stages/tool_extract.py --query-id q0000 --case-ids xhs_abc,gzh_def
   python stages/tool_extract.py --query-id q0000 --case-ids xhs_abc --model anthropic/claude-sonnet-4-6
 """
@@ -86,7 +87,13 @@ async def extract_one(source, llm_call, model):
 
 async def run(args):
     qid = args.query_id
-    case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+    if args.case_ids:
+        case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+    else:   # 不传 --case-ids → 解构该 query 下全部已采纳帖(口径同 db.fetch_posts.adopted)
+        case_ids = [p["case_id"] for p in db.fetch_posts(qid, "tools") if p.get("adopted")]
+        print(f"📋 未指定 --case-ids → query {qid} 全部已采纳帖:{len(case_ids)} 个")
+    if not case_ids:
+        print("❌ 没有可解构的帖子(query 下无已采纳帖,或 query_id 不存在)"); return 1
 
     # 方案A:解构前按 case 全局去重(同 procedure_extract)。已解构的不再调 LLM,
     # 跨 query 的用 link_* 复制补齐关联。--force 强制重解构。
@@ -138,9 +145,12 @@ async def run(args):
         async with sem:
             tools, cost = await extract_one(s, llm_call, model_id)
         dur = round(time.monotonic() - t0, 1)
+        # 存来源块:剔除喂模型用的临时字段(_image_data_urls 等 base64,不入库)。
+        clean_source = {k: v for k, v in s.items() if not k.startswith("_")}
         n = db.replace_tools(qid, s["case_id"], s.get("platform"),
                              (s.get("post") or {}).get("title", ""),
-                             tools, model_id, version, cost, dur)
+                             tools, model_id, version, cost, dur,
+                             source=clean_source)
         (out_dir / f"{s['case_id']}_{version}.json").write_text(json.dumps({
             "case_id": s["case_id"], "version": version, "model": model_id,
             "cost_usd": cost, "duration_s": dur, "tools": tools,
@@ -156,7 +166,8 @@ async def run(args):
 def main():
     p = argparse.ArgumentParser(description="工具解构:search_tools 帖子 → mode_tools")
     p.add_argument("--query-id", required=True)
-    p.add_argument("--case-ids", required=True, help="逗号分隔 case_id 列表")
+    p.add_argument("--case-ids", default=None,
+                   help="逗号分隔 case_id 列表;不传=该 query 下全部已采纳帖")
     p.add_argument("--model", default=None, help="默认 gemini-flash-lite,可传 OpenRouter id")
     p.add_argument("--max-concurrent", type=int, default=3)
     p.add_argument("--version", default=None, help="默认自动 v_月日时分")

+ 18 - 15
examples/mode_workflow/流程执行手册.md

@@ -66,25 +66,30 @@ python3 stages/search_eval.py \
 
 ## 步骤 3:对「已采纳」的帖子做工序解构
 
-### 1) 取出该 query 下已采纳的 case_id
+### 推荐:整个 query 一条命令(不传 --case-ids = 自动解构该 query 全部已采纳帖)
 
 ```bash
-# 直接打印(逗号分隔),核对数量
-python3 -c "import db; cs=[p['case_id'] for p in db.fetch_posts('q0020','process') if p.get('adopted')]; print(len(cs),'个'); print(','.join(cs))"
+python3 stages/procedure_extract.py --query-id q0020
 ```
 
+脚本内部按 `db.fetch_posts('q0020','process')` 的 `adopted` 口径自动选帖(与下面「精确控制」手动取 CIDS 等价),
+已真实解构过的帖自动去重跳过(成本 $0)。**这是日常最常用方式**。
+
+工具方向同理(需先有 `search_tools` 数据):
+
 ```bash
-# 存进 shell 变量,供下一步直接用
-CIDS=$(python3 -c "import db; print(','.join(p['case_id'] for p in db.fetch_posts('q0020','process') if p.get('adopted')))")
-echo "$CIDS"
+python3 stages/tool_extract.py --query-id q0020
 ```
 
-### 2) 跑工序解构
+### 精确控制:只解构指定的几帖(分批 / 重试个别帖时用)
 
 ```bash
-python3 stages/procedure_extract.py \
-  --query-id q0020 \
-  --case-ids "$CIDS"
+# 1) 先取该 query 下已采纳的 case_id,核对数量
+python3 -c "import db; cs=[p['case_id'] for p in db.fetch_posts('q0020','process') if p.get('adopted')]; print(len(cs),'个'); print(','.join(cs))"
+
+# 2) 存进 shell 变量传给 --case-ids(也可手填某几个 case_id)
+CIDS=$(python3 -c "import db; print(','.join(p['case_id'] for p in db.fetch_posts('q0020','process') if p.get('adopted')))")
+python3 stages/procedure_extract.py --query-id q0020 --case-ids "$CIDS"
 ```
 
 要点:
@@ -94,8 +99,6 @@ python3 stages/procedure_extract.py \
 
 > **解构去重(默认开)**:某 case 若**已真实解构过**(任意 query),不会再调大模型 —— 同 query 直接跳过,跨 query 用 `link_*` 复制补齐关联(成本 $0)。要换 prompt/模型重解构才加 `--force`。
 
-> 工具方向同理,换脚本:`python3 stages/tool_extract.py --query-id q0020 --case-ids "$CIDS"`(需先有 `search_tools` 数据)。
-
 ---
 
 ## 步骤 4:上传知识库
@@ -121,6 +124,7 @@ python3 stages/import_process_knowledge.py --query-id q0020
   - 上传中途有失败(如超时)→ 失败的没记台账,**直接重跑命令**即可只补传失败的、跳过已成功的。
   - 重新解构过(版本变了)→ 重跑会自动重传更新那几条。
 - `--query-id` 不写 = 全量采纳 case(慎用,先 dry-run 看量)。
+- `--case-ids xhs_a,gzh_b` 只传指定 case(逗号分隔,采纳集内精确补传/重传单帖);仍受采纳口径约束。
 - `--limit N` 只处理前 N 个 case(调试)。
 - `--force` 忽略台账强制全量重传。
 - `--delay <毫秒>` 调每条间隔(默认 100);`--api-url <根地址>` 换后端。
@@ -138,9 +142,8 @@ QUERY="你的检索词 怎么做"
 python3 stages/search_eval.py --query-id "$QID" --query "$QUERY" \
   --mode-type 工序 --platforms xhs,gzh --max-count 20
 
-# 3) 取采纳 → 工序解构
-CIDS=$(python3 -c "import db; print(','.join(p['case_id'] for p in db.fetch_posts('$QID','process') if p.get('adopted')))")
-[ -n "$CIDS" ] && python3 stages/procedure_extract.py --query-id "$QID" --case-ids "$CIDS"
+# 3) 工序解构(不传 --case-ids = 自动取该 query 全部已采纳帖)
+python3 stages/procedure_extract.py --query-id "$QID"
 
 # 4) 上传(先 dry-run 再真传)
 python3 stages/import_process_knowledge.py --query-id "$QID" --dry-run