Просмотр исходного кода

feat(mode_workflow): 新增工序知识批量导入工具及PooledDB依赖

- 在db.py中新增fetch_adopted_process_cases函数,获取已采纳且带有工序解构的case列表
- 新增import_process_knowledge.py脚本,实现从数据库批量导入已采纳工序到知识接口
- 添加PooledDB依赖到requirements.txt
- 更新examples/mode_workflow/README.md补充该工具的使用文档
刘文武 1 день назад
Родитель
Сommit
b553e17b6a

+ 5 - 1
DockerCompose项目启动说明.md

@@ -2,4 +2,8 @@ cd ~/workspace/workspace/agent_create_server/Agent/
 
 git pull
 
-docker compose -p agent_create_server up -d --build
+docker compose -p agent_create_server up -d --build
+
+docker ps
+
+docker logs -fn 500

+ 18 - 0
examples/mode_workflow/README.md

@@ -24,6 +24,7 @@ python server.py              # http://localhost:8772
 | `pipeline/search_eval.py` | 任意 query 搜索+评估 → search_process / search_tools(按解构方向分表) |
 | `pipeline/procedure_extract.py` | 工序解构(LLM 直出)→ mode_process |
 | `pipeline/tool_extract.py` | 工具解构 → mode_tools |
+| `import_process_knowledge.py` | 已采纳工序(mode_process 最新版)→ 知识导入接口;**读 DB 非本地文件**,采纳口径同 Dashboard(`db.is_adopted_rel`) |
 | `prompts/` | 工序/工具解构 system prompt(可单独迭代) |
 | `reference/judged_matrix.json` | 内容树(27 动作×50 类型),Dashboard 覆盖度用 |
 | `runs/` | 运行日志与调试副本(gitignore):search_process / search_tools / mode_process / mode_tools / logs |
@@ -49,6 +50,23 @@ query 搜到时只真实解构一次——`pipeline/{procedure,tool}_extract.py`
 行补齐关联(`cost=0`),不再付费重跑。换 prompt/模型要对比时传 `--force`(API `force:true`)
 跳过去重。`runs/backfill_links.py` 是事后扫尾工具,复用同一 `link_process`。
 
+## 知识导入(工序上传)
+
+把 DB 中**已采纳**的工序解构导入到知识接口(`/api/v1/knowledge/ingest`)。数据来源是
+`mode_process`(每 case 取最新版本),不读本地文件——`db.fetch_adopted_process_cases()`
+把工序解构与 `search_process` 评估 JOIN,用 `is_adopted_rel`(相关性<4 / 超两年 / 综合分<6
+任一即不采纳)过滤;每个 procedure 组装一条知识,字段映射与
+`Downloads/import/how_process_knowledge/main.py` 一致(steps→scopes/custom_ext)。
+
+```bash
+python import_process_knowledge.py --dry-run            # 只取数+组装 payload,不调接口(先验证)
+python import_process_knowledge.py --dry-run -v         # 同上,打印完整 payload JSON
+python import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+python import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
+python import_process_knowledge.py                      # 真实导入(去掉 --dry-run)
+# 其它:--api-url <根地址>(默认 47.236.83.130:8001)  --delay <毫秒>(调用间隔,默认 100)
+```
+
 ## 与旧 search_eval 的关系
 
 取代 `fixed_query_eval`(8770)+ `mode_procedure`(8771)两套服务的"搜索评估 + 大模型解构"

+ 28 - 0
examples/mode_workflow/db.py

@@ -698,6 +698,34 @@ _REL_SQL = ("JSON_UNQUOTE(COALESCE("
             "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\"')))")
 
 
+def fetch_adopted_process_cases(query_id=None):
+    """返回「已采纳且有工序解构」的 case_id 列表(供知识上传脚本用)。
+
+    采纳是帖子级属性(评估存在 search_process),工序解构存在 mode_process,故二者 JOIN:
+    只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。
+    relevance 得分由 _REL_SQL 直取标量,不传整块 llm_evaluation。
+    query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。
+    """
+    sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, "
+           f"{_REL_SQL} AS rel "
+           "FROM search_process s "
+           "JOIN (SELECT DISTINCT case_id FROM mode_process) m ON s.case_id = m.case_id")
+    params = ()
+    if query_id:
+        sql += " WHERE s.query_id=%s"
+        params = (query_id,)
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(sql, params)
+            rows = cur.fetchall()
+    finally:
+        conn.close()
+    cases = [r["case_id"] for r in rows
+             if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"])]
+    return sorted(set(cases))
+
+
 def fetch_dashboard_rows():
     """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。
     优化:① 不传 llm_evaluation 整块,SQL 只取采纳判定要的相关性得分;

+ 319 - 0
examples/mode_workflow/import_process_knowledge.py

@@ -0,0 +1,319 @@
+"""
+把数据库里「已采纳」的工序解构(mode_process)批量导入到知识导入接口。
+
+与 Downloads/import/how_process_knowledge/main.py 的区别只在数据来源:
+  参考实现 —— 扫 data/*/_model/workflow.json,读本地 JSON 文件。
+  本脚本   —— 从 MySQL 取数:db.fetch_adopted_process_cases() 拿采纳 case_id,
+              再 db.fetch_process(case_id) 重建 {source, procedures}。
+两边的数据结构一致(source 块 + procedures[].steps[]),故 payload 组装逻辑完全复用。
+
+字段映射(同参考实现):
+  source.id          ← case_id(DB 主键,无需从 JSON 兜底目录名)
+  source.source_type ← 固定 "post"
+  source.title/author ← source.title / source.author
+  每个 procedure → 一条知识:
+    title          ← procedure.name;为空回退 "来源标题 — 工序N"
+    content        ← 整个 procedure 对象的 JSON 串
+    dim_attributes ← 固定 ["how工序"]   dim_creations ← 固定 ["制作"]
+    scopes         ← 各步骤 substance/form 去重
+    custom_ext     ← 各步骤 effect→作用 action→动作 via→工具 去重
+
+采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
+
+用法:
+    python import_process_knowledge.py                      # 真实导入(采纳工序全量)
+    python import_process_knowledge.py --dry-run            # 只组装+打印,不调接口
+    python import_process_knowledge.py --dry-run --verbose  # 打印完整 payload JSON
+    python import_process_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+    python import_process_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
+    python import_process_knowledge.py --api-url http://... # 指定后端地址
+    python import_process_knowledge.py --delay 200          # 每次调用间隔 200ms
+"""
+
+import argparse
+import json
+import logging
+import sys
+import time
+
+import requests
+
+import db
+
+# ── 配置(对齐参考实现)────────────────────────────────────────────────────────
+
+DEFAULT_API_URL = "http://47.236.83.130:8001"
+INGEST_ENDPOINT = "/api/v1/knowledge/ingest"
+
+DIM_ATTRIBUTES = ["how工序"]
+DIM_CREATIONS = ["制作"]
+
+EXT_KEY_EFFECT = "作用"
+EXT_KEY_ACTION = "动作"
+EXT_KEY_TOOL = "工具"
+
+# ── 日志 ──────────────────────────────────────────────────────────────────────
+
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s [%(levelname)s] %(message)s",
+    datefmt="%H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+
+
+# ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)──────────────────────
+
+def iter_cases_from_db(query_id=None, limit=None):
+    """产出 (case_id, source_data, procedures)。
+
+    先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。
+    fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。
+    """
+    case_ids = db.fetch_adopted_process_cases(query_id)
+    if limit:
+        case_ids = case_ids[:limit]
+    for case_id in case_ids:
+        payload = db.fetch_process(case_id)   # 最新版本
+        if not payload:
+            continue
+        yield case_id, (payload.get("source") or {}), (payload.get("procedures") or [])
+
+
+# ── 作用域提取(原样复用参考实现)──────────────────────────────────────────────
+
+def _split_values(raw):
+    """按顿号分割,括号内的顿号不作为分隔符,结果去重保序。
+      "高保真线框图、UI设计稿"          → ["高保真线框图", "UI设计稿"]
+      "修改后的照片(发型、服装)、二次元服装" → ["修改后的照片(发型、服装)", "二次元服装"]
+    """
+    parts, current, depth = [], [], 0
+    for ch in raw:
+        if ch in ("(", "("):
+            depth += 1
+            current.append(ch)
+        elif ch in (")", ")"):
+            depth -= 1
+            current.append(ch)
+        elif ch == "、" and depth == 0:
+            part = "".join(current).strip()
+            if part:
+                parts.append(part)
+            current = []
+        else:
+            current.append(ch)
+    part = "".join(current).strip()
+    if part:
+        parts.append(part)
+
+    seen, result = set(), []
+    for p in parts:
+        if p not in seen:
+            seen.add(p)
+            result.append(p)
+    return result
+
+
+def build_scopes(procedure):
+    """从所有步骤收集 substance / form,各自去重后返回 scope 列表。"""
+    seen_sub, seen_form, scopes = set(), set(), []
+    for step in (procedure.get("steps") or []):
+        for sub in _split_values((step.get("substance") or "").strip()):
+            if sub not in seen_sub:
+                scopes.append({"scope_type": "substance", "value": sub})
+                seen_sub.add(sub)
+        for form in _split_values((step.get("form") or "").strip()):
+            if form not in seen_form:
+                scopes.append({"scope_type": "form", "value": form})
+                seen_form.add(form)
+    return scopes
+
+
+def build_custom_ext(procedure):
+    """从所有步骤提取 effect/action/via,值按顿号分割,同 key 相同值去重。"""
+    ext = []
+    seen = {}
+
+    def add(key, raw):
+        if not raw:
+            return
+        for value in _split_values(raw):
+            seen.setdefault(key, set())
+            if value not in seen[key]:
+                seen[key].add(value)
+                ext.append({"key": key, "type": "str", "value": value})
+
+    for step in (procedure.get("steps") or []):
+        add(EXT_KEY_EFFECT, (step.get("effect") or "").strip())
+        add(EXT_KEY_ACTION, (step.get("action") or "").strip())
+        via = (step.get("via") or "").strip()
+        if via and via != "human":
+            add(EXT_KEY_TOOL, via)
+    return ext
+
+
+# ── 单条 payload 组装(复用参考实现;source_id 直接用 case_id)─────────────────────
+
+def build_payload(source_id, source_data, procedure, proc_index):
+    source_title = (source_data.get("title") or "").strip()
+    source_author = (source_data.get("author") or "").strip() or None
+    proc_name = (procedure.get("name") or "").strip()
+
+    if proc_name:
+        knowledge_title = proc_name
+    elif source_title:
+        knowledge_title = f"{source_title} — 工序{proc_index}"
+    else:
+        knowledge_title = f"工序{proc_index}"
+
+    content = json.dumps(procedure, ensure_ascii=False)
+
+    source_metadata = {
+        "platform": source_data.get("platform") or "",
+        "date": source_data.get("date") or "",
+        "url": source_data.get("url") or None,
+        "excerpt": (source_data.get("excerpt") or "")[:500],
+        "procedure_id": procedure.get("id") or "",
+        "procedure_name": proc_name,
+    }
+
+    payload = {
+        "source": {
+            "id": source_id,
+            "source_type": "post",
+            "title": source_title or None,
+            "author": source_author,
+            "source_metadata": source_metadata,
+        },
+        "title": knowledge_title[:512],
+        "content": content,
+        "dim_attributes": DIM_ATTRIBUTES,
+        "dim_creations": DIM_CREATIONS,
+    }
+
+    scopes = build_scopes(procedure)
+    if scopes:
+        payload["scopes"] = scopes
+    custom_ext = build_custom_ext(procedure)
+    if custom_ext:
+        payload["custom_ext"] = custom_ext
+    return payload
+
+
+# ── 单条写入(原样复用参考实现)────────────────────────────────────────────────
+
+def ingest_one(api_url, payload, dry_run):
+    """调用导入接口写入一条知识,返回 (success, info_message)。"""
+    if dry_run:
+        return True, "(dry-run, skipped)"
+
+    url = api_url.rstrip("/") + INGEST_ENDPOINT
+    try:
+        resp = requests.post(url, json=payload, timeout=30)
+        if resp.status_code == 201:
+            kid = resp.json().get("knowledge_id", "?")
+            return True, f"knowledge_id={kid}"
+        try:
+            detail = resp.json().get("detail", resp.text[:300])
+        except Exception:
+            detail = resp.text[:300]
+        return False, f"HTTP {resp.status_code}: {detail}"
+    except requests.Timeout:
+        return False, "超时(30s)"
+    except requests.RequestException as exc:
+        return False, str(exc)
+
+
+# ── 主循环 ────────────────────────────────────────────────────────────────────
+
+def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
+    cases = list(iter_cases_from_db(query_id, limit))
+    if not cases:
+        scope = f"(query_id={query_id})" if query_id else ""
+        logger.error("DB 中未发现任何「已采纳且有工序解构」的 case %s", scope)
+        sys.exit(1)
+
+    mode_tag = "  [DRY-RUN]" if dry_run else ""
+    logger.info("发现 %d 个采纳 case。目标接口:%s%s", len(cases), api_url, mode_tag)
+
+    ok_count = fail_count = skip_count = 0
+
+    for case_id, source_data, procedures in cases:
+        logger.info("── %s", case_id)
+        if not procedures:
+            logger.warning("  无 procedures,跳过")
+            skip_count += 1
+            continue
+        logger.info("  source_id=%-45s  procedures=%d", case_id, len(procedures))
+
+        for idx, procedure in enumerate(procedures, 1):
+            payload = build_payload(case_id, source_data, procedure, idx)
+            title = payload["title"]
+            n_scopes = len(payload.get("scopes", []))
+            n_ext = len(payload.get("custom_ext", []))
+            n_steps = len(procedure.get("steps") or [])
+
+            if dry_run and verbose:
+                print(f"\n{'=' * 60}")
+                print(f"[{case_id}] 工序 {idx}/{len(procedures)}")
+                print(json.dumps(payload, ensure_ascii=False, indent=2))
+
+            ok, msg = ingest_one(api_url, payload, dry_run)
+            status_icon = "✓" if ok else "✗"
+            level = logging.INFO if ok else logging.WARNING
+            logger.log(
+                level,
+                "  %s [%d/%d] title=%r  steps=%d  scopes=%d  ext=%d  %s",
+                status_icon, idx, len(procedures),
+                title[:40], n_steps, n_scopes, n_ext, msg,
+            )
+
+            if ok:
+                ok_count += 1
+            else:
+                fail_count += 1
+
+            if delay_ms > 0 and not dry_run:
+                time.sleep(delay_ms / 1000)
+
+    logger.info(
+        "完成。成功=%d  失败=%d  跳过=%d  合计导入=%d",
+        ok_count, fail_count, skip_count, ok_count,
+    )
+    if fail_count:
+        sys.exit(1)
+
+
+# ── CLI ───────────────────────────────────────────────────────────────────────
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="把 DB 中已采纳的工序解构(mode_process)批量导入知识接口",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument("--api-url", default=DEFAULT_API_URL, metavar="URL",
+                        help=f"后端 API 根地址(默认:{DEFAULT_API_URL})")
+    parser.add_argument("--dry-run", action="store_true",
+                        help="仅从 DB 取数并组装 payload,不实际调用接口")
+    parser.add_argument("--verbose", "-v", action="store_true",
+                        help="dry-run 时打印完整 payload JSON")
+    parser.add_argument("--delay", type=int, default=100, metavar="MS",
+                        help="两次 API 调用之间的间隔毫秒数(默认:100)")
+    parser.add_argument("--query-id", default=None, metavar="QID",
+                        help="只导入该搜索任务(query_id)下的采纳 case")
+    parser.add_argument("--limit", type=int, default=None, metavar="N",
+                        help="只处理前 N 个 case(调试用)")
+
+    args = parser.parse_args()
+    run(
+        api_url=args.api_url,
+        dry_run=args.dry_run,
+        verbose=args.verbose,
+        delay_ms=args.delay,
+        query_id=args.query_id,
+        limit=args.limit,
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 2 - 1
requirements.txt

@@ -19,4 +19,5 @@ lark-oapi==1.5.3
 
 
 pymysql==1.1.1
-cryptography==48.0.0
+cryptography==48.0.0
+PooledDB==1.3.0