Просмотр исходного кода

feat(mode_workflow): 解构前按 case 全局去重,避免重复花钱

同一帖(case_id)被多个 query 搜到时只真实解构一次:procedure/tool_extract
在调 LLM 前查 db.latest_real_version,已解构的跨 query 用 db.link_process 复制
link_* 行补齐关联(cost=0),--force(API force:true)才强制重测。version 列
放宽到 VARCHAR(32) 容纳 link_v_mopN_* 版本(init 幂等 ALTER)。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
刘文武 1 день назад
Родитель
Сommit
12183cfbe7

+ 6 - 0
examples/mode_workflow/README.md

@@ -43,6 +43,12 @@ Dashboard    → /api/dashboard 实时聚合四表(内容树覆盖按 steps 的
 `cost_usd` / `duration_s` 记录每次解构调用;同一 `(case_id, version)` 的多行重复存同一值,
 聚合统计时按该键去重(见 `server.py:_dashboard` 的 `cost_groups`)。
 
+**解构前按 case 全局去重(省钱):** `case_id` 是帖子物理身份,与 query 无关。同一帖被多个
+query 搜到时只真实解构一次——`pipeline/{procedure,tool}_extract.py` 在调 LLM 前先查
+`db.latest_real_version(case_id)`,已解构过的帖跨 query 用 `db.link_process` 复制 `link_*`
+行补齐关联(`cost=0`),不再付费重跑。换 prompt/模型要对比时传 `--force`(API `force:true`)
+跳过去重。`runs/backfill_links.py` 是事后扫尾工具,复用同一 `link_process`。
+
 ## 与旧 search_eval 的关系
 
 取代 `fixed_query_eval`(8770)+ `mode_procedure`(8771)两套服务的"搜索评估 + 大模型解构"

+ 72 - 2
examples/mode_workflow/db.py

@@ -48,6 +48,7 @@ def _conn():
 # ── DDL ──────────────────────────────────────────────────────────────────────
 
 SEARCH_TABLES = {"process": "search_process", "tools": "search_tools"}
+MODE_TABLES = {"process": "mode_process", "tools": "mode_tools"}
 
 
 def _search_table(mode_or_table):
@@ -58,6 +59,14 @@ def _search_table(mode_or_table):
     return t
 
 
+def _mode_table(mode_or_table):
+    """mode(process/tools)或表名 → 合法解构表名(白名单,防 SQL 注入)。"""
+    t = MODE_TABLES.get(mode_or_table, mode_or_table)
+    if t not in MODE_TABLES.values():
+        raise ValueError(f"未知解构表/模式: {mode_or_table!r}")
+    return t
+
+
 def _ddl_search(table, direction):
     return f"""
 CREATE TABLE IF NOT EXISTS {table} (
@@ -106,7 +115,7 @@ CREATE TABLE IF NOT EXISTS mode_process (
   step_count    INT           NULL,
   tools_used    JSON          NULL COMMENT '从 steps[].via 去重提取',
   model         VARCHAR(64)   NULL,
-  version       VARCHAR(16)   NULL COMMENT 'v_MMDDHHMM,保留历史',
+  version       VARCHAR(32)   NULL COMMENT 'v_MMDDHHMM,保留历史;link_* 为跨 query 复制(cost=0)',
   cost_usd      DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
   duration_s    FLOAT         NULL,
   created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
@@ -134,7 +143,7 @@ CREATE TABLE IF NOT EXISTS mode_tools (
   defects_json  JSON          NULL,
   updated_time  VARCHAR(64)   NULL COMMENT '工具最新更新时间',
   model         VARCHAR(64)   NULL,
-  version       VARCHAR(16)   NULL,
+  version       VARCHAR(32)   NULL COMMENT 'v_MMDDHHMM;link_* 为跨 query 复制(cost=0)',
   cost_usd      DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
   duration_s    FLOAT         NULL,
   created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
@@ -153,6 +162,10 @@ def init_tables():
             cur.execute(_ddl_search("search_tools", "工具方向"))
             cur.execute(DDL_PROCESS)
             cur.execute(DDL_TOOLS)
+            # 历史库迁移:version 由 VARCHAR(16) 放宽到 32,容纳 link_v_mopN_* 复制版本。
+            # MODIFY 幂等(已是 32 则 MySQL 元数据无操作),建表后表必存在,可安全执行。
+            for t in ("mode_process", "mode_tools"):
+                cur.execute(f"ALTER TABLE {t} MODIFY COLUMN version VARCHAR(32) NULL")
         print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools")
     finally:
         conn.close()
@@ -541,6 +554,63 @@ def fetch_tools(case_id, version=None):
             "tool_count": len(tools), "tools": tools}
 
 
+# ── 跨 query 去重 / link 复制(方案A:解构前先去重,避免重复花钱)──────────────
+# case_id 是帖子物理身份(platform_channelContentId),与 query 无关。同一帖被多个
+# query 搜到时只需真实解构一次;其余 query 用 link_* 复制行补齐关联(cost=0)。
+
+def latest_real_version(case_id, mode="process"):
+    """该 case 是否已有「真实」解构(任意 query;link_* 是复制品,不算源)。
+    返回最新一行 {"version","query_id"} 或 None。给解构前去重判定用。"""
+    table = _mode_table(mode)
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(f"""SELECT version, query_id FROM {table}
+                            WHERE case_id=%s AND LEFT(version,5) <> 'link_'
+                            ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+            return cur.fetchone()
+    finally:
+        conn.close()
+
+
+def link_process(query_id, case_id, mode="process"):
+    """把 case 在别处最新「真实」版本的解构行复制到目标 query
+    (version='link_'+源版本, cost_usd=0)。幂等(先删目标同版本)。
+    返回复制行数;该 case 从未真实解构过则返回 0(无源可复制)。"""
+    table = _mode_table(mode)
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(f"""SELECT version FROM {table}
+                            WHERE case_id=%s AND LEFT(version,5) <> 'link_'
+                            ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
+            r = cur.fetchone()
+            if not r:
+                return 0
+            srcver = r["version"]
+            newver = ("link_" + srcver)[:32]   # version 列 VARCHAR(32)
+            # 复制除自增 id / 时间戳外的全部列,改写 query_id / version / cost。
+            cur.execute(f"SHOW COLUMNS FROM {table}")
+            cols = [c["Field"] for c in cur.fetchall()
+                    if c["Field"] not in ("id", "created_at", "updated_at")]
+            cur.execute(f"SELECT {','.join(cols)} FROM {table} WHERE case_id=%s AND version=%s",
+                        (case_id, srcver))
+            rows = cur.fetchall()
+            cur.execute(f"DELETE FROM {table} WHERE query_id=%s AND case_id=%s AND version=%s",
+                        (query_id, case_id, newver))
+            for row in rows:
+                row = dict(row)
+                row["query_id"] = query_id
+                row["version"] = newver
+                row["cost_usd"] = 0
+                cur.execute(
+                    f"INSERT INTO {table} ({','.join(cols)}) VALUES ({','.join(['%s']*len(cols))})",
+                    [row[k] for k in cols])
+            return len(rows)
+    finally:
+        conn.close()
+
+
 # ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────
 
 def fetch_dashboard_rows():

+ 27 - 2
examples/mode_workflow/pipeline/procedure_extract.py

@@ -189,14 +189,37 @@ async def extract_one(row, system, llm_call, model, args):
 
 async def run(args):
     case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+
+    # 方案A:解构前先按 case 全局去重。已真实解构过的帖不再调 LLM(省钱),
+    # 跨 query 的用 link_* 复制行补齐关联(cost=0)。--force 跳过去重强制重解构。
+    linked = skipped = 0
+    todo = []
+    for cid in dict.fromkeys(case_ids):   # 顺手去掉同批重复 case
+        if not args.force:
+            ex = db.latest_real_version(cid, mode="process")
+            if ex:
+                if ex["query_id"] == args.query_id:
+                    print(f"♻️ {cid} 本 query 已解构(版本 {ex['version']}),跳过")
+                    skipped += 1
+                else:
+                    n = db.link_process(args.query_id, cid, mode="process")
+                    print(f"♻️ {cid} 已在 {ex['query_id']} 解构(版本 {ex['version']}),"
+                          f"link 补齐 {n} 行 · $0")
+                    linked += 1
+                continue
+        todo.append(cid)
+
     rows = []
-    for cid in case_ids:
+    for cid in todo:
         row = db.fetch_post(args.query_id, cid, table="search_process")
         if row is None:
             print(f"⚠️ {args.query_id}/{cid} 不在 search_process,跳过")
             continue
         rows.append(row)
     if not rows:
+        if linked or skipped:
+            print(f"✅ 无需 LLM 解构(link 补齐 {linked} 帖 · 已存在跳过 {skipped} 帖)")
+            return 0
         print("❌ 没有可解构的帖子"); return 1
 
     system = PROMPT_FILE.read_text(encoding="utf-8")
@@ -208,7 +231,7 @@ async def run(args):
     costs = []
     for row in rows:   # 工序解构 token 重,串行跑,避免 OpenRouter 限流
         costs.append(await extract_one(row, system, llm_call, args.model, args))
-    print(f"\n📊 完成 {len(rows)} 帖 · 总成本 ${sum(costs):.4f}")
+    print(f"\n📊 完成 {len(rows)} 帖 · link 补齐 {linked} 帖 · 总成本 ${sum(costs):.4f}")
     return 0
 
 
@@ -222,6 +245,8 @@ def main():
     p.add_argument("--max-concurrent", type=int, default=4)
     p.add_argument("--max-tokens", type=int, default=8000)
     p.add_argument("--no-images", action="store_true")
+    p.add_argument("--force", action="store_true",
+                   help="跳过去重,强制重解构(换 prompt/模型做对比时用)")
     args = p.parse_args()
     raise SystemExit(asyncio.run(run(args)))
 

+ 27 - 2
examples/mode_workflow/pipeline/tool_extract.py

@@ -87,14 +87,37 @@ async def extract_one(source, llm_call, model):
 async def run(args):
     qid = args.query_id
     case_ids = [c.strip() for c in args.case_ids.split(",") if c.strip()]
+
+    # 方案A:解构前按 case 全局去重(同 procedure_extract)。已解构的不再调 LLM,
+    # 跨 query 的用 link_* 复制补齐关联。--force 强制重解构。
+    linked = skipped = 0
+    todo = []
+    for cid in dict.fromkeys(case_ids):
+        if not args.force:
+            ex = db.latest_real_version(cid, mode="tools")
+            if ex:
+                if ex["query_id"] == qid:
+                    print(f"♻️ {cid} 本 query 已解构(版本 {ex['version']}),跳过")
+                    skipped += 1
+                else:
+                    n = db.link_process(qid, cid, mode="tools")
+                    print(f"♻️ {cid} 已在 {ex['query_id']} 解构(版本 {ex['version']}),"
+                          f"link 补齐 {n} 行 · $0")
+                    linked += 1
+                continue
+        todo.append(cid)
+
     sources = []
-    for cid in case_ids:
+    for cid in todo:
         row = db.fetch_post(qid, cid, table="search_tools")
         if row is None:
             print(f"⚠️ {qid}/{cid} 不在 search_tools,跳过")
             continue
         sources.append(_row_to_source(row))
     if not sources:
+        if linked or skipped:
+            print(f"✅ 无需 LLM 解构(link 补齐 {linked} 帖 · 已存在跳过 {skipped} 帖)")
+            return 0
         print("❌ 没有可解构的帖子"); return 1
 
     if args.model and "/" in args.model:
@@ -126,7 +149,7 @@ async def run(args):
         return cost
 
     costs = await asyncio.gather(*[_work(s) for s in sources])
-    print(f"\n📊 完成 {len(sources)} 帖 · 总成本 ${sum(costs):.4f}")
+    print(f"\n📊 完成 {len(sources)} 帖 · link 补齐 {linked} 帖 · 总成本 ${sum(costs):.4f}")
     return 0
 
 
@@ -137,6 +160,8 @@ def main():
     p.add_argument("--model", default=None, help="默认 gemini-flash-lite,可传 OpenRouter id")
     p.add_argument("--max-concurrent", type=int, default=3)
     p.add_argument("--version", default=None, help="默认自动 v_月日时分")
+    p.add_argument("--force", action="store_true",
+                   help="跳过去重,强制重解构(换 prompt/模型做对比时用)")
     args = p.parse_args()
     raise SystemExit(asyncio.run(run(args)))
 

+ 2 - 0
examples/mode_workflow/server.py

@@ -349,6 +349,8 @@ class Handler(BaseHTTPRequestHandler):
                        "--case-ids", ",".join(cids)]
                 if payload.get("model"):
                     cmd += ["--model", payload["model"]]
+                if payload.get("force"):   # 默认按 case 全局去重;force 才强制重解构
+                    cmd += ["--force"]
                 kind = "proc" if u.path.endswith("process") else "tool"
                 self._json({"task_id": _spawn_task(kind, cmd)})
             elif u.path == "/api/run_search":