Просмотр исходного кода

feat(eval, workflow): 添加可复现性与意图可控性评估维度,完善全链路逻辑

1.  新增可复现性、意图可控性两个评估维度,更新前端页面维度列表与评估prompt模板,定义详细评分规则与硬封顶逻辑
2.  为评估流水线添加模糊带升级功能:使用便宜模型初评,将边界分数的帖子交由强模型复核,兼顾成本与精度
3.  添加并发解构防重复认领逻辑,避免重复解构同一帖子造成额外LLM成本
4.  新增单条/批量帖子重评脚本,支持定向重评与升级复核,附带旧数据备份回滚能力
5.  完善知识导入脚本与数据库逻辑,新增导入台账防止重复上传,并将可复现性纳入采纳判定门槛
6.  优化评估去重逻辑,支持复用已有评估结果仅重算query相关分数,降低评估成本
7.  优化解构任务提示逻辑,对跳过的任务弹出友好的用户提示
刘文武 1 день назад
Родитель
Сommit
c88387837a

+ 141 - 0
examples/mode_workflow/_batch_reeval_q0000.py

@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+"""批量重评 q0000 下当前【命中(is_adopted)】的帖子,用 flash-lite+sonnet 组合(模糊带升级),
+跑完定向替换 DB 的得分相关字段(overall_score / knowledge_type / llm_evaluation)。
+先备份旧值到 runs/search_process/q0000.score_backup.<ts>.json,可回滚。"""
+import asyncio, copy, json, sys
+from datetime import datetime
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[3]
+sys.path.insert(0, str(PROJECT_ROOT))
+from dotenv import load_dotenv
+load_dotenv()
+
+MW = Path(__file__).resolve().parent
+sys.path.insert(0, str(MW))
+import db
+from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts
+from examples.process_pipeline.script.llm_evaluate_sources import (
+    _EVAL_PRODUCT_FIELDS, build_eval_llm_call,
+)
+
+QUERY_ID = "q0000"
+TABLE = "search_process"
+INIT_MODEL = "gemini-flash-lite"
+ESC_MODEL = "sonnet"
+BAND = (4.0, 6.0)
+
+
+def _load_db_rows():
+    conn = db._conn()
+    try:
+        with conn.cursor() as c:
+            c.execute(f"SELECT case_id, overall_score, knowledge_type, publish_time, "
+                      f"llm_evaluation FROM {TABLE} WHERE query_id=%s", (QUERY_ID,))
+            return c.fetchall()
+    finally:
+        conn.close()
+
+
+def _update_scores(case_id, overall, knowledge_type, evaluation):
+    conn = db._conn()
+    try:
+        with conn.cursor() as c:
+            c.execute(
+                f"UPDATE {TABLE} SET overall_score=%s, knowledge_type=%s, llm_evaluation=%s, "
+                f"updated_at=CURRENT_TIMESTAMP WHERE query_id=%s AND case_id=%s",
+                (overall, db._j(knowledge_type or []), db._j(evaluation), QUERY_ID, case_id))
+    finally:
+        conn.close()
+
+
+async def main():
+    rows = _load_db_rows()
+    def _ev(r):
+        e = r["llm_evaluation"]
+        return json.loads(e) if isinstance(e, str) else (e or {})
+    adopted = [r for r in rows if db.is_adopted(r["overall_score"], _ev(r), r["publish_time"])]
+    adopted_ids = {r["case_id"] for r in adopted}
+    print(f"q0000 共 {len(rows)} 帖,当前命中 {len(adopted)} 帖 → 重评这些\n")
+
+    # 备份旧得分字段
+    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
+    backup = [{"case_id": r["case_id"], "overall_score": r["overall_score"],
+               "knowledge_type": r["knowledge_type"], "publish_time": r["publish_time"],
+               "llm_evaluation": _ev(r)} for r in adopted]
+    bpath = MW / "runs" / TABLE / f"{QUERY_ID}.score_backup.{ts}.json"
+    bpath.write_text(json.dumps(backup, ensure_ascii=False, indent=2), encoding="utf-8")
+    print(f"💾 旧得分已备份 → {bpath.name}\n")
+
+    # 从 runs json 取完整帖子(含配图)作为重评输入
+    data = json.loads((MW / "runs" / TABLE / f"{QUERY_ID}.json").read_text(encoding="utf-8"))
+    query = data.get("query", "")
+    by_id = {s["case_id"]: s for s in data.get("results", [])}
+    missing = [cid for cid in adopted_ids if cid not in by_id]
+    if missing:
+        print(f"⚠️ runs json 缺 {len(missing)} 条,将跳过: {missing}")
+    targets = []
+    for cid in adopted_ids:
+        if cid not in by_id:
+            continue
+        s = copy.deepcopy(by_id[cid])
+        for k in _EVAL_PRODUCT_FIELDS:
+            s.pop(k, None)
+        s.pop("_image_data_urls", None)
+        targets.append(s)
+
+    eval_llm, eval_model = build_eval_llm_call(INIT_MODEL)
+    esc_llm, esc_model = build_eval_llm_call(ESC_MODEL)
+    print(f"🧠 组合评估:{eval_model} 初评 → {esc_model} 复核(带 [{BAND[0]:g},{BAND[1]:g}])\n")
+    sources, cost = await evaluate_posts(
+        targets, "", eval_llm, eval_model, max_concurrent=4,
+        include_images=True, max_images=4, image_mode="url", query=query,
+        escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=BAND)
+
+    # 旧分查表
+    old_by_id = {r["case_id"]: r for r in adopted}
+    report = []
+    for s in sources:
+        cid = s["case_id"]
+        ev = s["llm_evaluation"]
+        if not isinstance(ev, dict) or ev.get("_error"):
+            print(f"   ⚠️ 评估失败,跳过更新: {cid}")
+            continue
+        kt = ev.get("知识类型") or []
+        ov = db.overall_score(ev)
+        pub = (s.get("post") or {}).get("publish_timestamp") or old_by_id[cid]["publish_time"]
+        new_adopt = db.is_adopted(ov, ev, pub)
+        _update_scores(cid, ov, kt, ev)            # 定向替换 DB
+        by_id[cid]["llm_evaluation"] = ev          # 同步 runs json
+        report.append({
+            "case_id": cid, "escalated": bool(s.get("_escalated")),
+            "old_overall": old_by_id[cid]["overall_score"], "new_overall": ov,
+            "repro": db._fixed_dim_score(ev, "可复现性"),
+            "intent": db._fixed_dim_score(ev, "意图可控性"),
+            "new_adopted": new_adopt,
+            "title": (s.get("post") or {}).get("title", "")[:22],
+        })
+
+    # 同步 runs json
+    (MW / "runs" / TABLE / f"{QUERY_ID}.json").write_text(
+        json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
+
+    # 报告
+    print("\n" + "=" * 92)
+    print(f"{'case_id':26} {'升级':4} {'旧综':>5} {'新综':>5} {'复现':>4} {'意图':>4} {'命中':>5}  标题")
+    still = 0
+    for r in sorted(report, key=lambda x: x["new_overall"]):
+        still += int(r["new_adopted"])
+        print(f"{r['case_id'][:26]:26} {'★' if r['escalated'] else ' ':^4} "
+              f"{(r['old_overall'] or 0):5.2f} {(r['new_overall'] or 0):5.2f} "
+              f"{str(r['repro']):>4} {str(r['intent']):>4} "
+              f"{'是' if r['new_adopted'] else '否':>4}  {r['title']}")
+    esc_n = sum(r["escalated"] for r in report)
+    print("=" * 92)
+    print(f"重评 {len(report)} 帖 · 升级 sonnet {esc_n} 帖 · 命中 {len(adopted)}→{still} · "
+          f"总成本 ${cost:.4f}")
+    print(f"DB 已更新,旧值备份在 {bpath.name}")
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 75 - 0
examples/mode_workflow/_reeval_one.py

@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+"""一次性:用当前 eval_prompt_template.md 对单条已存帖子重评(复用生产评估链路 evaluate_posts)。
+支持 --escalate-model 演示 sonnet+flash-lite 组合(模糊带升级)。"""
+import argparse, asyncio, json, sys
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[3]   # …/Agent
+sys.path.insert(0, str(PROJECT_ROOT))
+from dotenv import load_dotenv
+load_dotenv()
+
+MW = Path(__file__).resolve().parent
+sys.path.insert(0, str(MW))
+import db
+
+from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts
+from examples.process_pipeline.script.llm_evaluate_sources import (
+    _EVAL_PRODUCT_FIELDS, build_eval_llm_call, DEFAULT_EVAL_MODEL,
+)
+
+
+def _load(query_id):
+    return json.loads((MW / "runs" / "search_process" / f"{query_id}.json")
+                      .read_text(encoding="utf-8"))
+
+
+async def main():
+    ap = argparse.ArgumentParser()
+    ap.add_argument("--query-id", required=True)
+    ap.add_argument("--case-id", required=True)
+    ap.add_argument("--query", default="")
+    ap.add_argument("--model", default=DEFAULT_EVAL_MODEL)
+    ap.add_argument("--escalate-model", default="")
+    ap.add_argument("--escalate-band", type=float, nargs=2, default=[4.0, 6.0])
+    ap.add_argument("--max-images", type=int, default=4)
+    a = ap.parse_args()
+
+    data = _load(a.query_id)
+    query = a.query or data.get("query", "")
+    src = next((s for s in data.get("results", []) if s.get("case_id") == a.case_id), None)
+    if not src:
+        raise SystemExit(f"未找到 case_id={a.case_id}")
+    for k in _EVAL_PRODUCT_FIELDS:
+        src.pop(k, None)
+
+    llm_call, model_id = build_eval_llm_call(a.model)
+    esc_llm = esc_model = None
+    if a.escalate_model:
+        esc_llm, esc_model = build_eval_llm_call(a.escalate_model)
+    print(f"▶ 重评 {a.case_id}  初评={model_id}"
+          + (f"  升级={esc_model} 带[{a.escalate_band[0]:g},{a.escalate_band[1]:g}]" if esc_model else "")
+          + f"  query={query!r}\n")
+
+    sources, cost = await evaluate_posts(
+        [src], "", llm_call, model_id, max_concurrent=1,
+        include_images=True, max_images=a.max_images, image_mode="url", query=query,
+        escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=tuple(a.escalate_band),
+    )
+    ev = sources[0]["llm_evaluation"]
+    overall = db.overall_score(ev)
+    pub = (src.get("post") or {}).get("publish_timestamp", "")
+    adopted = db.is_adopted(overall, ev, pub)
+
+    print("\n" + "=" * 60)
+    print(f"最终评估模型 = {sources[0].get('_escalated') or model_id}")
+    print(f"综合分(overall_score) = {overall}")
+    print(f"  · 和内容制作知识相关 = {((ev.get('相关性') or {}).get('和内容制作知识相关') or {}).get('得分')}")
+    print(f"  · 可复现性          = {db._fixed_dim_score(ev, '可复现性')}   (门槛 <4 → 不采纳)")
+    print(f"  · 意图可控性        = {db._fixed_dim_score(ev, '意图可控性')}  (暂只采分)")
+    print(f"采纳判定(is_adopted)  = {adopted}")
+    print(f"总成本 ≈ ${cost:.4f}")
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 108 - 8
examples/mode_workflow/db.py

@@ -177,6 +177,25 @@ CREATE TABLE IF NOT EXISTS mode_tools (
 """
 
 
+# 工序知识「已导入知识库」台账:防重复上传(import_process_knowledge.py 用)。
+# 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本:
+# 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。
+# 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。
+DDL_INGEST_LOG = """
+CREATE TABLE IF NOT EXISTS knowledge_ingest_log (
+  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
+  case_id       VARCHAR(128)  NOT NULL,
+  proc_index    INT           NOT NULL COMMENT '工序序号(1-based),对齐导入脚本枚举',
+  version       VARCHAR(32)   NULL COMMENT '导入时 mode_process 版本;变了应重导',
+  knowledge_id  VARCHAR(128)  NULL COMMENT '接口返回的 knowledge_id',
+  api_url       VARCHAR(255)  NULL,
+  ingested_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  updated_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+  UNIQUE KEY uk_case_proc (case_id, proc_index)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序知识已导入台账(防重复上传)';
+"""
+
+
 def init_tables():
     conn = _conn()
     try:
@@ -185,11 +204,12 @@ def init_tables():
             cur.execute(_ddl_search("search_tools", "工具方向"))
             cur.execute(DDL_PROCESS)
             cur.execute(DDL_TOOLS)
+            cur.execute(DDL_INGEST_LOG)
             # 历史库迁移:version 由 VARCHAR(16) 放宽到 32,容纳 link_v_mopN_* 复制版本。
             # MODIFY 幂等(已是 32 则 MySQL 元数据无操作),建表后表必存在,可安全执行。
             for t in ("mode_process", "mode_tools"):
                 cur.execute(f"ALTER TABLE {t} MODIFY COLUMN version VARCHAR(32) NULL")
-        print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools")
+        print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools, knowledge_ingest_log")
     finally:
         conn.close()
 
@@ -269,9 +289,21 @@ def _recency_hard(date_str):
     return 1
 
 
+def _fixed_dim_score(evaluation, name):
+    """取 质量.固定维度.<name>.得分 标量,缺失/非数值返回 None(不参与判定)。"""
+    v = (((evaluation or {}).get("质量") or {}).get("固定维度") or {}).get(name)
+    if isinstance(v, dict):
+        v = v.get("得分")
+    try:
+        return float(v) if v is not None else None
+    except (TypeError, ValueError):
+        return None
+
+
 def is_adopted(overall, evaluation, publish_time):
     """采纳/命中判定,口径对齐 mode_procedure 的 decision=="report":
-    制作相关性<4、发布超两年、综合分<6 —— 任一命中即不采纳;指标缺失不参与判定。"""
+    制作相关性<4、可复现性<4、发布超两年、综合分<6 —— 任一命中即不采纳;指标缺失不参与判定。
+    (意图可控性暂只采分不设门槛,留待阈值标定后再开。)"""
     rel = None
     v = ((evaluation or {}).get("相关性") or {}).get("和内容制作知识相关")
     if isinstance(v, dict):
@@ -282,6 +314,9 @@ def is_adopted(overall, evaluation, publish_time):
         rel = None
     if rel is not None and rel < 4:
         return False
+    repro = _fixed_dim_score(evaluation, "可复现性")
+    if repro is not None and repro < 4:
+        return False
     rh = _recency_hard(publish_time)
     if rh is not None and rh < 2:
         return False
@@ -290,8 +325,8 @@ def is_adopted(overall, evaluation, publish_time):
     return True
 
 
-def is_adopted_rel(overall, rel, publish_time):
-    """is_adopted 的轻量版:相关性得分(rel)已由 SQL JSON_EXTRACT 直接取出,
+def is_adopted_rel(overall, rel, publish_time, repro=None):
+    """is_adopted 的轻量版:相关性得分(rel)、可复现性(repro)已由 SQL JSON_EXTRACT 直接取出,
     无需传输/解析整块 llm_evaluation。判定口径与 is_adopted 完全一致。"""
     try:
         rel = float(rel) if rel is not None else None
@@ -299,6 +334,12 @@ def is_adopted_rel(overall, rel, publish_time):
         rel = None
     if rel is not None and rel < 4:
         return False
+    try:
+        repro = float(repro) if repro is not None else None
+    except (TypeError, ValueError):
+        repro = None
+    if repro is not None and repro < 4:
+        return False
     rh = _recency_hard(publish_time)
     if rh is not None and rh < 2:
         return False
@@ -697,6 +738,11 @@ _REL_SQL = ("JSON_UNQUOTE(COALESCE("
             "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\".\"得分\"'),"
             "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\"')))")
 
+# 可复现性门槛同样需要标量直取(口径同 is_adopted 的 _fixed_dim_score)。
+_REPRO_SQL = ("JSON_UNQUOTE(COALESCE("
+              "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\".\"得分\"'),"
+              "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\"')))")
+
 
 def fetch_adopted_process_cases(query_id=None):
     """返回「已采纳且有工序解构」的 case_id 列表(供知识上传脚本用)。
@@ -707,7 +753,7 @@ def fetch_adopted_process_cases(query_id=None):
     query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。
     """
     sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, "
-           f"{_REL_SQL} AS rel "
+           f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro "
            "FROM search_process s "
            "JOIN (SELECT DISTINCT case_id FROM mode_process) m ON s.case_id = m.case_id")
     params = ()
@@ -722,10 +768,63 @@ def fetch_adopted_process_cases(query_id=None):
     finally:
         conn.close()
     cases = [r["case_id"] for r in rows
-             if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"])]
+             if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])]
     return sorted(set(cases))
 
 
+# ── 评估去重:复用 query 无关分,只重算 query 相关分(search_eval.py 用)──────────
+
+def fetch_existing_eval(case_id, table="search_process"):
+    """返回该 case 在搜索表里最近一条「有效」评估 blob(任意 query)。
+    评估去重用:同帖在别的相似 query 下评过时,复用其 query 无关分(质量/通用相关/时效),
+    只重算「和 query 相关」。无有效评估(全是 _error 或没评过)返回 None。
+    取最近若干条逐一挑出首个非 error、结构完整的 blob。"""
+    table = _search_table(table)
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(f"""SELECT llm_evaluation FROM {table}
+                            WHERE case_id=%s AND llm_evaluation IS NOT NULL
+                            ORDER BY updated_at DESC, id DESC LIMIT 5""", (case_id,))
+            rows = cur.fetchall()
+    finally:
+        conn.close()
+    for r in rows:
+        e = _loads(r["llm_evaluation"])
+        if isinstance(e, dict) and not e.get("_error") and isinstance(e.get("相关性"), dict):
+            return e
+    return None
+
+
+# ── 上传去重:知识库已导入台账(import_process_knowledge.py 用)────────────────
+
+def fetch_ingested_map(case_id):
+    """返回 {proc_index: version} —— 该 case 各工序已导入知识库的版本。空表示没传过。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("SELECT proc_index, version FROM knowledge_ingest_log WHERE case_id=%s",
+                        (case_id,))
+            return {r["proc_index"]: r["version"] for r in cur.fetchall()}
+    finally:
+        conn.close()
+
+
+def mark_ingested(case_id, proc_index, version, knowledge_id=None, api_url=None):
+    """记一条「已导入」台账(case_id+proc_index 唯一,重导同序号则更新版本/knowledge_id)。"""
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute("""INSERT INTO knowledge_ingest_log
+                             (case_id, proc_index, version, knowledge_id, api_url)
+                           VALUES (%s,%s,%s,%s,%s)
+                           ON DUPLICATE KEY UPDATE version=VALUES(version),
+                             knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""",
+                        (case_id, proc_index, version, knowledge_id, api_url))
+    finally:
+        conn.close()
+
+
 def fetch_dashboard_rows():
     """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。
     优化:① 不传 llm_evaluation 整块,SQL 只取采纳判定要的相关性得分;
@@ -734,7 +833,8 @@ def fetch_dashboard_rows():
     try:
         with conn.cursor() as cur:
             # 进度分母走「采纳」口径;mode 标方向(工序帖来自 search_process)。
-            cols = f"query_id, case_id, platform, overall_score, publish_time, {_REL_SQL} AS rel"
+            cols = (f"query_id, case_id, platform, overall_score, publish_time, "
+                    f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro")
             cur.execute(f"SELECT {cols} FROM search_process")
             posts = cur.fetchall()
             for p in posts:
@@ -761,7 +861,7 @@ def fetch_dashboard_rows():
         conn.close()
     for p in posts:
         # 采纳判定:口径同帖子列表(is_adopted),作为「需解构」分母依据
-        p["adopted"] = is_adopted_rel(p["overall_score"], p["rel"], p["publish_time"])
+        p["adopted"] = is_adopted_rel(p["overall_score"], p["rel"], p["publish_time"], p["repro"])
     for r in procs:
         r["steps"] = _loads(r["steps"], [])
         r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None

+ 36 - 16
examples/mode_workflow/import_process_knowledge.py

@@ -65,10 +65,11 @@ logger = logging.getLogger(__name__)
 # ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)──────────────────────
 
 def iter_cases_from_db(query_id=None, limit=None):
-    """产出 (case_id, source_data, procedures)。
+    """产出 (case_id, source_data, procedures, version)。
 
     先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。
     fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。
+    version 用于上传去重:同 case 同工序、版本未变即视为已传过,跳过。
     """
     case_ids = db.fetch_adopted_process_cases(query_id)
     if limit:
@@ -77,7 +78,8 @@ def iter_cases_from_db(query_id=None, limit=None):
         payload = db.fetch_process(case_id)   # 最新版本
         if not payload:
             continue
-        yield case_id, (payload.get("source") or {}), (payload.get("procedures") or [])
+        yield (case_id, (payload.get("source") or {}),
+               (payload.get("procedures") or []), payload.get("version"))
 
 
 # ── 作用域提取(原样复用参考实现)──────────────────────────────────────────────
@@ -203,30 +205,30 @@ def build_payload(source_id, source_data, procedure, proc_index):
 # ── 单条写入(原样复用参考实现)────────────────────────────────────────────────
 
 def ingest_one(api_url, payload, dry_run):
-    """调用导入接口写入一条知识,返回 (success, info_message)。"""
+    """调用导入接口写入一条知识,返回 (success, info_message, knowledge_id)。"""
     if dry_run:
-        return True, "(dry-run, skipped)"
+        return True, "(dry-run, skipped)", None
 
     url = api_url.rstrip("/") + INGEST_ENDPOINT
     try:
         resp = requests.post(url, json=payload, timeout=30)
         if resp.status_code == 201:
             kid = resp.json().get("knowledge_id", "?")
-            return True, f"knowledge_id={kid}"
+            return True, f"knowledge_id={kid}", kid
         try:
             detail = resp.json().get("detail", resp.text[:300])
         except Exception:
             detail = resp.text[:300]
-        return False, f"HTTP {resp.status_code}: {detail}"
+        return False, f"HTTP {resp.status_code}: {detail}", None
     except requests.Timeout:
-        return False, "超时(30s)"
+        return False, "超时(30s)", None
     except requests.RequestException as exc:
-        return False, str(exc)
+        return False, str(exc), None
 
 
 # ── 主循环 ────────────────────────────────────────────────────────────────────
 
-def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
+def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
     cases = list(iter_cases_from_db(query_id, limit))
     if not cases:
         scope = f"(query_id={query_id})" if query_id else ""
@@ -234,19 +236,28 @@ def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
         sys.exit(1)
 
     mode_tag = "  [DRY-RUN]" if dry_run else ""
-    logger.info("发现 %d 个采纳 case。目标接口:%s%s", len(cases), api_url, mode_tag)
+    force_tag = "  [FORCE]" if force else ""
+    logger.info("发现 %d 个采纳 case。目标接口:%s%s%s", len(cases), api_url, mode_tag, force_tag)
 
-    ok_count = fail_count = skip_count = 0
+    ok_count = fail_count = skip_count = dup_count = 0
 
-    for case_id, source_data, procedures in cases:
+    for case_id, source_data, procedures, version in cases:
         logger.info("── %s", case_id)
         if not procedures:
             logger.warning("  无 procedures,跳过")
             skip_count += 1
             continue
-        logger.info("  source_id=%-45s  procedures=%d", case_id, len(procedures))
+        # 去重台账:该 case 各工序已导入的版本。force 时清空(强制重导)。
+        ingested = {} if force else db.fetch_ingested_map(case_id)
+        logger.info("  source_id=%-45s  procedures=%d  version=%s", case_id, len(procedures), version)
 
         for idx, procedure in enumerate(procedures, 1):
+            # 已导入且版本未变 → 跳过,杜绝重复上传(版本变了说明重解构过,应重导)。
+            if not force and ingested.get(idx) == version:
+                logger.info("  ♻️ [%d/%d] 已导入(版本 %s),跳过", idx, len(procedures), version)
+                dup_count += 1
+                continue
+
             payload = build_payload(case_id, source_data, procedure, idx)
             title = payload["title"]
             n_scopes = len(payload.get("scopes", []))
@@ -258,7 +269,7 @@ def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
                 print(f"[{case_id}] 工序 {idx}/{len(procedures)}")
                 print(json.dumps(payload, ensure_ascii=False, indent=2))
 
-            ok, msg = ingest_one(api_url, payload, dry_run)
+            ok, msg, kid = ingest_one(api_url, payload, dry_run)
             status_icon = "✓" if ok else "✗"
             level = logging.INFO if ok else logging.WARNING
             logger.log(
@@ -270,6 +281,12 @@ def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
 
             if ok:
                 ok_count += 1
+                # 仅真实导入成功才记台账(dry-run 不写,免污染去重状态)
+                if not dry_run:
+                    try:
+                        db.mark_ingested(case_id, idx, version, kid, api_url)
+                    except Exception as exc:
+                        logger.warning("  ⚠️ 台账写入失败(不影响本次导入):%s", exc)
             else:
                 fail_count += 1
 
@@ -277,8 +294,8 @@ def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
                 time.sleep(delay_ms / 1000)
 
     logger.info(
-        "完成。成功=%d  失败=%d  跳过=%d  合计导入=%d",
-        ok_count, fail_count, skip_count, ok_count,
+        "完成。成功=%d  失败=%d  无工序跳过=%d  已传过跳过=%d  合计导入=%d",
+        ok_count, fail_count, skip_count, dup_count, ok_count,
     )
     if fail_count:
         sys.exit(1)
@@ -303,6 +320,8 @@ def main():
                         help="只导入该搜索任务(query_id)下的采纳 case")
     parser.add_argument("--limit", type=int, default=None, metavar="N",
                         help="只处理前 N 个 case(调试用)")
+    parser.add_argument("--force", action="store_true",
+                        help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)")
 
     args = parser.parse_args()
     run(
@@ -312,6 +331,7 @@ def main():
         delay_ms=args.delay,
         query_id=args.query_id,
         limit=args.limit,
+        force=args.force,
     )
 
 

+ 7 - 1
examples/mode_workflow/index.html

@@ -2764,7 +2764,7 @@
         const pick = (names) => names.filter((n) => flat[n]).map((n) => [n, flat[n]]);
 
         let html = "";
-        let p = pick(["时效性", "热度性", "评论反馈"]); // ① 固定维度(置顶)
+        let p = pick(["时效性", "热度性", "评论反馈", "可复现性", "意图可控性"]); // ① 固定维度(置顶)
         if (p.length) html += sub("固定维度") + rows(p);
         p = pick(["真实感", "真实感 (非AI)", "真实感(非AI)", "表现力"]); // ② 用例
         if (p.length) html += sub("用例") + rows(p);
@@ -3306,6 +3306,12 @@
             method: "POST",
             body: JSON.stringify({ query_id: state.queryId, case_ids: caseIds, model }),
           });
+          // 全部正在解构中(被认领跳过):没有 task_id,提示一下即可,别去轮询空任务
+          if (!r.task_id) {
+            toast(r.note || "所选帖子正在解构中,已跳过", "info");
+            return;
+          }
+          if ((r.skipped || []).length) toast(`${r.skipped.length} 帖正在解构中,已跳过`, "info");
           showTask(`${isProc ? "工序" : "工具"}解构 · ${caseIds.length} 帖`, r.task_id, async () => {
             state.selected.clear();
             caseIds.forEach(invalidateExtractCache); // 重新解构后数据已变,清这些帖的缓存

+ 86 - 7
examples/mode_workflow/pipeline/search_eval.py

@@ -11,6 +11,7 @@
 """
 import argparse
 import asyncio
+import copy
 import json
 import sys
 from pathlib import Path
@@ -25,8 +26,9 @@ from examples.process_pipeline.script.search_eval.search_and_evaluate import (
     search_all, evaluate_posts, transcribe_video_posts, build_query_overrides,
 )
 from examples.process_pipeline.script.llm_evaluate_sources import (
-    build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL,
+    build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL, _format_post_for_eval,
 )
+from examples.process_pipeline.script.llm_helper import call_llm_with_retry
 
 HERE = Path(__file__).resolve().parent
 MW = HERE.parent
@@ -34,6 +36,34 @@ sys.path.insert(0, str(MW))
 import db
 
 
+async def _rescore_query_relevance(source, query, llm_call, model, sem):
+    """评估去重的轻量重算:只判「和 query 相关」{得分,理由}。
+    纯文本(不带图)、低 token —— 比整套多模态评估便宜得多。返回 (dict|None, cost)。"""
+    post_block = _format_post_for_eval(source)
+    system = ("你是内容评估器。只判断这篇帖子与给定【检索词】的相关程度——"
+              "即「这帖是否回答/命中了这个检索词」。严格只输出一个 JSON 对象,"
+              '形如 {"得分": <0到10的数字>, "理由": "<一句话>"},不要任何额外字段或解释。')
+    user = f"【检索词】{query}\n\n【帖子】\n{post_block}"
+    messages = [{"role": "system", "content": system},
+                {"role": "user", "content": user}]
+
+    def _v(d):
+        if not isinstance(d, dict):
+            return "需 JSON 对象"
+        try:
+            v = float(d.get("得分"))
+        except (TypeError, ValueError):
+            return "得分 缺失或非数字"
+        return None if 0 <= v <= 10 else "得分需在 0-10"
+
+    async with sem:
+        data, cost = await call_llm_with_retry(
+            llm_call=llm_call, messages=messages, model=model,
+            temperature=0.1, max_tokens=300, validate_fn=_v,
+            task_name=f"QueryRel[{source.get('case_id', '?')}]")
+    return data, cost
+
+
 async def run(args):
     phrasings = [args.query] + [s.strip() for s in (args.synonyms or "").split(",") if s.strip()]
     # 去重保序
@@ -65,17 +95,59 @@ async def run(args):
         if n:
             print(f"🎙️  视频转写 {n} 条")
 
+    table = "search_tools" if args.mode_type == "工具" else "search_process"
+
+    # ── 评估去重 ────────────────────────────────────────────────────────────────
+    # 评估的相关性含两子项:「和内容制作知识相关」(与 query 无关)与「和 query 相关」
+    # (query 专属)。同帖在别的相似 query 下评过时,质量/通用相关/时效等 query 无关分
+    # 可直接复用,只需用一次轻量纯文本调用重算「和 query 相关」,免去整套多模态评估,省钱。
+    # --force-eval 跳过去重,全部走完整评估。
     cost = 0.0
     if not args.no_eval:
-        sources, cost = await evaluate_posts(
-            sources, "", eval_llm, eval_model_id, args.max_concurrent,
-            include_images=not args.no_images, max_images=args.max_images,
-            image_mode=args.image_mode, query=args.query,
-        )
+        prior = {}
+        if not args.force_eval:
+            for s in sources:
+                e = db.fetch_existing_eval(s["case_id"], table)
+                if e:
+                    prior[s["case_id"]] = e
+        fresh = [s for s in sources if s["case_id"] not in prior]
+        reused = [s for s in sources if s["case_id"] in prior]
+        if reused:
+            print(f"♻️ 评估去重:{len(reused)} 帖已评过 → 复用通用分+重算 query 相关分;"
+                  f"{len(fresh)} 帖走完整评估")
+
+        if fresh:
+            esc_llm = esc_model = None
+            if args.escalate_model:
+                esc_llm, esc_model = build_eval_llm_call(args.escalate_model)
+                print(f"⬆️  启用模糊带升级:{eval_model_id} 初评 → {esc_model} "
+                      f"复核(带 [{args.escalate_band[0]:g},{args.escalate_band[1]:g}])")
+            _, c = await evaluate_posts(
+                fresh, "", eval_llm, eval_model_id, args.max_concurrent,
+                include_images=not args.no_images, max_images=args.max_images,
+                image_mode=args.image_mode, query=args.query,
+                escalate_llm=esc_llm, escalate_model=esc_model,
+                escalate_band=tuple(args.escalate_band),
+            )  # evaluate_posts 就地把 llm_evaluation 挂到各 source 上
+            cost += c
+
+        if reused:
+            sem = asyncio.Semaphore(args.max_concurrent)
+            rr = await asyncio.gather(*[
+                _rescore_query_relevance(s, args.query, eval_llm, eval_model_id, sem)
+                for s in reused])
+            for s, (qr, c) in zip(reused, rr):
+                cost += c
+                blob = copy.deepcopy(prior[s["case_id"]])
+                if qr is not None:   # 重算成功才覆盖,失败则沿用旧 query 相关分
+                    blob.setdefault("相关性", {})["和 query 相关"] = {
+                        "得分": qr.get("得分"), "理由": qr.get("理由", "")}
+                s["llm_evaluation"] = blob
+                qr_s = (blob.get("相关性", {}).get("和 query 相关") or {}).get("得分", "?")
+                print(f"   ♻️ [query={qr_s}] {s['case_id'][:24]}")
     for s in sources:
         s.pop("_image_data_urls", None)
 
-    table = "search_tools" if args.mode_type == "工具" else "search_process"
     n = db.upsert_search_posts(args.query_id, args.query, sources, table=table)
     print(f"🗄️  {table} 入库 {n} 行 · 方向 {args.mode_type or '工序'} · 评估成本 ${cost:.4f}")
 
@@ -98,12 +170,19 @@ def main():
     p.add_argument("--platforms", default="xhs,gzh")
     p.add_argument("--max-count", type=int, default=10)
     p.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS))
+    p.add_argument("--escalate-model", default="", choices=[""] + list(EVAL_MODELS),
+                   help="模糊带升级用的强模型(如 sonnet);留空=不升级。初评落在 --escalate-band "
+                        "的可复现性/意图可控性帖子交此模型复核")
+    p.add_argument("--escalate-band", type=float, nargs=2, default=[4.0, 6.0],
+                   metavar=("LO", "HI"), help="升级触发的闭区间,默认 4 6")
     p.add_argument("--max-concurrent", type=int, default=3)
     p.add_argument("--max-images", type=int, default=4)
     p.add_argument("--image-mode", choices=["url", "base64"], default="url")
     p.add_argument("--no-transcribe", action="store_true")
     p.add_argument("--no-eval", action="store_true")
     p.add_argument("--no-images", action="store_true")
+    p.add_argument("--force-eval", action="store_true",
+                   help="跳过评估去重,所有帖都走完整评估(换 prompt/模型对比时用)")
     args = p.parse_args()
     raise SystemExit(asyncio.run(run(args)))
 

+ 55 - 12
examples/mode_workflow/server.py

@@ -58,8 +58,36 @@ def _render_search_html():
 TASKS = {}
 _TASK_LOCK = threading.Lock()
 
-
-def _spawn_task(kind, cmd):
+# ── 解构「认领」:进程内防并发重复解构 ───────────────────────────────────────────
+# 解构脚本在「查 latest_real_version」与「写库」之间隔着一次很长的 LLM 调用,两个并发
+# 任务对同一 case 都会查到「还没解构」→ 都调一次 LLM,白花钱。这里在进程内记下正在解构
+# 中的 (mode, case_id),起任务前剔掉已在解构中的 case。仅进程内、不持久化:任务结束即
+# 释放(见 _spawn_task 的 _wait),服务重启自然清空 —— 没有 DB 认领锁那种「僵尸锁卡死」风险。
+# mode 区分 process/tools:同一帖的工序解构与工具解构互不影响,各自认领。
+_INFLIGHT = set()            # {(mode, case_id)}
+_INFLIGHT_LOCK = threading.Lock()
+
+
+def _claim_cases(mode, case_ids):
+    """认领一批 case,返回真正抢到(此前不在解构中)的子集并登记;已在解构中的被剔除。"""
+    claimed = []
+    with _INFLIGHT_LOCK:
+        for cid in case_ids:
+            key = (mode, cid)
+            if key not in _INFLIGHT:
+                _INFLIGHT.add(key)
+                claimed.append(cid)
+    return claimed
+
+
+def _release_cases(mode, case_ids):
+    with _INFLIGHT_LOCK:
+        for cid in case_ids:
+            _INFLIGHT.discard((mode, cid))
+
+
+def _spawn_task(kind, cmd, release=None):
+    """起子进程跑 pipeline。release=(mode, case_ids):任务结束时释放这些 case 的认领。"""
     LOG_DIR.mkdir(parents=True, exist_ok=True)
     task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}"
     log_path = LOG_DIR / f"{task_id}.log"
@@ -74,6 +102,8 @@ def _spawn_task(kind, cmd):
         f.close()
         with _TASK_LOCK:
             TASKS[task_id]["status"] = "done" if rc == 0 else "failed"
+        if release:
+            _release_cases(*release)   # 解构结束,释放认领,后续任务可再处理这些 case
         # 搜索/解构任务一结束,四表数据可能变,作废 Dashboard 缓存,下次重算
         _invalidate_dashboard()
 
@@ -449,16 +479,29 @@ class Handler(BaseHTTPRequestHandler):
                 cids = payload.get("case_ids") or []
                 if not qid or not cids:
                     return self._err("缺 query_id / case_ids")
-                script = ("pipeline/procedure_extract.py" if u.path.endswith("process")
-                          else "pipeline/tool_extract.py")
-                cmd = [sys.executable, script, "--query-id", qid,
-                       "--case-ids", ",".join(cids)]
-                if payload.get("model"):
-                    cmd += ["--model", payload["model"]]
-                if payload.get("force"):   # 默认按 case 全局去重;force 才强制重解构
-                    cmd += ["--force"]
-                kind = "proc" if u.path.endswith("process") else "tool"
-                self._json({"task_id": _spawn_task(kind, cmd)})
+                mode = "process" if u.path.endswith("process") else "tools"
+                # 认领:剔掉正在解构中的 case,防并发重复解构(白花 LLM 钱)
+                uniq = list(dict.fromkeys(cids))
+                claimed = _claim_cases(mode, uniq)
+                skipped = [c for c in uniq if c not in claimed]
+                if not claimed:
+                    return self._json({"task_id": None, "skipped": skipped,
+                                       "note": "所选帖子正在解构中,已跳过(防并发重复解构)"})
+                try:
+                    script = ("pipeline/procedure_extract.py" if mode == "process"
+                              else "pipeline/tool_extract.py")
+                    cmd = [sys.executable, script, "--query-id", qid,
+                           "--case-ids", ",".join(claimed)]
+                    if payload.get("model"):
+                        cmd += ["--model", payload["model"]]
+                    if payload.get("force"):   # 默认按 case 全局去重;force 才强制重解构
+                        cmd += ["--force"]
+                    kind = "proc" if mode == "process" else "tool"
+                    task_id = _spawn_task(kind, cmd, release=(mode, claimed))
+                except Exception:
+                    _release_cases(mode, claimed)   # 起进程失败也要释放认领,避免卡住
+                    raise
+                self._json({"task_id": task_id, "skipped": skipped})
             elif u.path == "/api/run_search":
                 query = (payload.get("query") or "").strip()
                 if not query:

+ 9 - 1
examples/process_pipeline/script/search_eval/eval_prompt_template.md

@@ -96,6 +96,14 @@
           "得分": "0-10。展示出的制作成品视觉表现力,能否发到社媒平台,无用例则为 0 分",
           "理由": "中文"
         }
+      },
+      "可复现性": {
+        "得分": "0-10。照着做能否稳定复现出相当效果:决定成品好坏的关键变量,是否由教程里教的可控手段(具体 prompt/参数/控制条件/可量化的判断标准)掌控。**硬封顶规则(优先于一切,无论其余步骤描述得多清晰、流程多完整):只要决定成品长相的关键创作动作(写 prompt / 选风格 / 控构图 / 挑模板)被甩给『自己看着改』『按需求调』,或外包给另一个黑盒(『发给豆包/DeepSeek 让它写』),或依赖教程没教你如何获得的不可控输入(现成的好模板/好底图、个人审美、靠抽卡运气)——可复现性 ≤4。** 注意区分:『把每一步点哪个按钮说清楚』是步骤清晰,不等于可复现;真正可复现要求决定效果的那个创作变量本身被教成可控的。",
+        "理由": "中文"
+      },
+      "意图可控性": {
+        "得分": "0-10。用户对成品的视觉效果有自己的预期。这条知识能否让用户朝自己想要的方向去调、可达的视觉范围有多宽。高分:讲清了哪些变量(prompt/参数/风格/构图/控制条件)如何影响成品长相,用户能据此把效果调向自己的目标、覆盖多种风格题材。**硬封顶规则(优先于一切):若成品被锁死在某个固定模板/单一风格上、用户只能复刻作者那一种结果、想换个风格/版式就无从下手(如『套这个模板就能出』但不教怎么改方向),或对效果的精确控制只能依赖模型本身发挥而非教程给出的可调手段——意图可控性 ≤4。** 无成品或纯展示无方法→低分。",
+        "理由": "中文"
       }
     },
 
@@ -120,7 +128,7 @@
           }
         },
         "泛化性": {
-          "得分": "0-10。流程是否可迁移到其他题材/风格/工具,还是只适用于特定单例",
+          "得分": "0-10。这套『方法/流程』本身能否搬去做别的题材或换别的工具,还是只适用于特定单例(只评方法可移植性;成品能不能按预期调由『意图可控性』负责,勿混)",
           "理由": "中文"
         }
       },

+ 43 - 0
examples/process_pipeline/script/search_eval/search_and_evaluate.py

@@ -410,6 +410,9 @@ async def evaluate_posts(
     max_images: int = 4,
     image_mode: str = "url",
     query: Optional[str] = None,
+    escalate_llm: Optional[Callable] = None,
+    escalate_model: Optional[str] = None,
+    escalate_band: Tuple[float, float] = (4.0, 6.0),
 ) -> Tuple[List[Dict[str, Any]], float]:
     """对每条 post 用 rubric 逐条评估,把 llm_evaluation 挂到 source 上。返回 (sources, total_cost)。
 
@@ -419,6 +422,11 @@ async def evaluate_posts(
       image_mode="url"   → 直接传图片 URL(最快,实测 xhs 可直连)
       image_mode="base64"→ 下载转 base64(最稳,绕防盗链)
     评估失败(重试耗尽)的条目标 error 标记并保留,不丢。
+
+    模糊带升级(escalate_llm 非空时启用,sonnet+flash-lite 组合):
+      初评(便宜模型)出分后,凡 可复现性 或 意图可控性 落在 escalate_band 闭区间内的帖子,
+      用 escalate_llm(强模型,如 sonnet)对同一条重评、结果覆盖初评,并标 _escalated。
+      命中的边界帖按强模型口径定分,其余仍走初评,兼顾成本与边界精度。
     """
     from examples.process_pipeline.script.llm_evaluate_sources import _evaluate_one
     # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件
@@ -456,6 +464,41 @@ async def evaluate_posts(
         title = (s.get("post", {}) or {}).get("title", "")[:30]
         print(f"   - [{rel_str}] {s['case_id'][:24]} {title}")
 
+    # ── 模糊带升级:flash-lite 初评落在 [lo,hi] 的边界帖交 sonnet 复核 ──
+    if escalate_llm is not None:
+        lo, hi = escalate_band
+
+        def _band_hit(ev):
+            """可复现性 / 意图可控性 任一落在 [lo,hi] 闭区间 → 需升级。"""
+            fixed = (((ev or {}).get("质量") or {}).get("固定维度") or {})
+            for name in ("可复现性", "意图可控性"):
+                v = fixed.get(name)
+                if isinstance(v, dict):
+                    v = v.get("得分")
+                try:
+                    v = float(v)
+                except (TypeError, ValueError):
+                    continue
+                if lo <= v <= hi:
+                    return True
+            return False
+
+        to_esc = [s for s in sources if _band_hit(s.get("llm_evaluation"))]
+        if to_esc:
+            print(f"⬆️  模糊带升级 {len(to_esc)}/{len(sources)} 条 "
+                  f"(可复现性/意图可控性∈[{lo:g},{hi:g}]) → {escalate_model}")
+            esc_results = await asyncio.gather(*[
+                _evaluate_one(s, requirement, escalate_llm, escalate_model, sem,
+                              image_urls=(s.get("_image_data_urls") if include_images else None),
+                              query=query)
+                for s in to_esc
+            ])
+            for s, (llm_eval, cost) in zip(to_esc, esc_results):
+                total_cost += cost
+                if llm_eval is not None:
+                    s["llm_evaluation"] = llm_eval
+                    s["_escalated"] = escalate_model
+
     print(f"   汇总: 评估成功 {len(sources)-failed} / 评估失败 {failed} / cost=${total_cost:.4f}")
     return sources, total_cost