""" 把数据库里「已采纳」的工序解构(mode_process)批量导入到知识导入接口。 与 Downloads/import/how_process_knowledge/main.py 的区别只在数据来源: 参考实现 —— 扫 data/*/_model/workflow.json,读本地 JSON 文件。 本脚本 —— 从 MySQL 取数:db.fetch_adopted_process_cases() 拿采纳 case_id, 再 db.fetch_process(case_id) 重建 {source, procedures}。 两边的数据结构一致(source 块 + procedures[].steps[]),故 payload 组装逻辑完全复用。 字段映射(同参考实现): source.id ← case_id(DB 主键,无需从 JSON 兜底目录名) source.source_type ← 固定 "post" source.title/author ← source.title / source.author 每个 procedure → 一条知识: title ← procedure.name;为空回退 "来源标题 — 工序N" content ← 整个 procedure 对象的 JSON 串 dim_attributes ← 固定 ["how工序"] dim_creations ← 固定 ["制作"] scopes ← 各步骤 substance/form 去重 custom_ext ← 各步骤 effect→作用 action→动作 via→工具 去重 采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。 用法: python import_process_knowledge.py # 真实导入(采纳工序全量) python import_process_knowledge.py --dry-run # 只组装+打印,不调接口 python import_process_knowledge.py --dry-run --verbose # 打印完整 payload JSON python import_process_knowledge.py --query-id q0001 # 只传某搜索任务下的采纳 case python import_process_knowledge.py --limit 5 # 只处理前 5 个 case(调试) python import_process_knowledge.py --api-url http://... # 指定后端地址 python import_process_knowledge.py --delay 200 # 每次调用间隔 200ms """ import argparse import json import logging import sys import time import requests import db # ── 配置(对齐参考实现)──────────────────────────────────────────────────────── DEFAULT_API_URL = "http://47.236.83.130:8001" INGEST_ENDPOINT = "/api/v1/knowledge/ingest" DIM_ATTRIBUTES = ["how工序"] DIM_CREATIONS = ["制作"] EXT_KEY_EFFECT = "作用" EXT_KEY_ACTION = "动作" EXT_KEY_TOOL = "工具" # ── 日志 ────────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) # ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)────────────────────── def iter_cases_from_db(query_id=None, limit=None): """产出 (case_id, source_data, procedures, version)。 先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。 fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。 version 用于上传去重:同 case 同工序、版本未变即视为已传过,跳过。 """ case_ids = db.fetch_adopted_process_cases(query_id) if limit: case_ids = case_ids[:limit] for case_id in case_ids: payload = db.fetch_process(case_id) # 最新版本 if not payload: continue yield (case_id, (payload.get("source") or {}), (payload.get("procedures") or []), payload.get("version")) # ── 作用域提取(原样复用参考实现)────────────────────────────────────────────── def _split_values(raw): """按顿号分割,括号内的顿号不作为分隔符,结果去重保序。 "高保真线框图、UI设计稿" → ["高保真线框图", "UI设计稿"] "修改后的照片(发型、服装)、二次元服装" → ["修改后的照片(发型、服装)", "二次元服装"] """ parts, current, depth = [], [], 0 for ch in raw: if ch in ("(", "("): depth += 1 current.append(ch) elif ch in (")", ")"): depth -= 1 current.append(ch) elif ch == "、" and depth == 0: part = "".join(current).strip() if part: parts.append(part) current = [] else: current.append(ch) part = "".join(current).strip() if part: parts.append(part) seen, result = set(), [] for p in parts: if p not in seen: seen.add(p) result.append(p) return result def build_scopes(procedure): """从所有步骤收集 substance / form,各自去重后返回 scope 列表。""" seen_sub, seen_form, scopes = set(), set(), [] for step in (procedure.get("steps") or []): for sub in _split_values((step.get("substance") or "").strip()): if sub not in seen_sub: scopes.append({"scope_type": "substance", "value": sub}) seen_sub.add(sub) for form in _split_values((step.get("form") or "").strip()): if form not in seen_form: scopes.append({"scope_type": "form", "value": form}) seen_form.add(form) return scopes def build_custom_ext(procedure): """从所有步骤提取 effect/action/via,值按顿号分割,同 key 相同值去重。""" ext = [] seen = {} def add(key, raw): if not raw: return for value in _split_values(raw): seen.setdefault(key, set()) if value not in seen[key]: seen[key].add(value) ext.append({"key": key, "type": "str", "value": value}) for step in (procedure.get("steps") or []): add(EXT_KEY_EFFECT, (step.get("effect") or "").strip()) add(EXT_KEY_ACTION, (step.get("action") or "").strip()) via = (step.get("via") or "").strip() if via and via != "human": add(EXT_KEY_TOOL, via) return ext # ── 单条 payload 组装(复用参考实现;source_id 直接用 case_id)───────────────────── def build_payload(source_id, source_data, procedure, proc_index): source_title = (source_data.get("title") or "").strip() source_author = (source_data.get("author") or "").strip() or None proc_name = (procedure.get("name") or "").strip() if proc_name: knowledge_title = proc_name elif source_title: knowledge_title = f"{source_title} — 工序{proc_index}" else: knowledge_title = f"工序{proc_index}" content = json.dumps(procedure, ensure_ascii=False) source_metadata = { "platform": source_data.get("platform") or "", "date": source_data.get("date") or "", "url": source_data.get("url") or None, "excerpt": (source_data.get("excerpt") or "")[:500], "procedure_id": procedure.get("id") or "", "procedure_name": proc_name, } payload = { "source": { "id": source_id, "source_type": "post", "title": source_title or None, "author": source_author, "source_metadata": source_metadata, }, "title": knowledge_title[:512], "content": content, "dim_attributes": DIM_ATTRIBUTES, "dim_creations": DIM_CREATIONS, } scopes = build_scopes(procedure) if scopes: payload["scopes"] = scopes custom_ext = build_custom_ext(procedure) if custom_ext: payload["custom_ext"] = custom_ext return payload # ── 单条写入(原样复用参考实现)──────────────────────────────────────────────── def ingest_one(api_url, payload, dry_run): """调用导入接口写入一条知识,返回 (success, info_message, knowledge_id)。""" if dry_run: return True, "(dry-run, skipped)", None url = api_url.rstrip("/") + INGEST_ENDPOINT try: resp = requests.post(url, json=payload, timeout=30) if resp.status_code == 201: kid = resp.json().get("knowledge_id", "?") return True, f"knowledge_id={kid}", kid try: detail = resp.json().get("detail", resp.text[:300]) except Exception: detail = resp.text[:300] return False, f"HTTP {resp.status_code}: {detail}", None except requests.Timeout: return False, "超时(30s)", None except requests.RequestException as exc: return False, str(exc), None # ── 主循环 ──────────────────────────────────────────────────────────────────── def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force): cases = list(iter_cases_from_db(query_id, limit)) if not cases: scope = f"(query_id={query_id})" if query_id else "" logger.error("DB 中未发现任何「已采纳且有工序解构」的 case %s", scope) sys.exit(1) mode_tag = " [DRY-RUN]" if dry_run else "" force_tag = " [FORCE]" if force else "" logger.info("发现 %d 个采纳 case。目标接口:%s%s%s", len(cases), api_url, mode_tag, force_tag) ok_count = fail_count = skip_count = dup_count = 0 for case_id, source_data, procedures, version in cases: logger.info("── %s", case_id) if not procedures: logger.warning(" 无 procedures,跳过") skip_count += 1 continue # 去重台账:该 case 各工序已导入的版本。force 时清空(强制重导)。 ingested = {} if force else db.fetch_ingested_map(case_id) logger.info(" source_id=%-45s procedures=%d version=%s", case_id, len(procedures), version) for idx, procedure in enumerate(procedures, 1): # 已导入且版本未变 → 跳过,杜绝重复上传(版本变了说明重解构过,应重导)。 if not force and ingested.get(idx) == version: logger.info(" ♻️ [%d/%d] 已导入(版本 %s),跳过", idx, len(procedures), version) dup_count += 1 continue payload = build_payload(case_id, source_data, procedure, idx) title = payload["title"] n_scopes = len(payload.get("scopes", [])) n_ext = len(payload.get("custom_ext", [])) n_steps = len(procedure.get("steps") or []) if dry_run and verbose: print(f"\n{'=' * 60}") print(f"[{case_id}] 工序 {idx}/{len(procedures)}") print(json.dumps(payload, ensure_ascii=False, indent=2)) ok, msg, kid = ingest_one(api_url, payload, dry_run) status_icon = "✓" if ok else "✗" level = logging.INFO if ok else logging.WARNING logger.log( level, " %s [%d/%d] title=%r steps=%d scopes=%d ext=%d %s", status_icon, idx, len(procedures), title[:40], n_steps, n_scopes, n_ext, msg, ) if ok: ok_count += 1 # 仅真实导入成功才记台账(dry-run 不写,免污染去重状态) if not dry_run: try: db.mark_ingested(case_id, idx, version, kid, api_url) except Exception as exc: logger.warning(" ⚠️ 台账写入失败(不影响本次导入):%s", exc) else: fail_count += 1 if delay_ms > 0 and not dry_run: time.sleep(delay_ms / 1000) logger.info( "完成。成功=%d 失败=%d 无工序跳过=%d 已传过跳过=%d 合计导入=%d", ok_count, fail_count, skip_count, dup_count, ok_count, ) if fail_count: sys.exit(1) # ── CLI ─────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="把 DB 中已采纳的工序解构(mode_process)批量导入知识接口", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--api-url", default=DEFAULT_API_URL, metavar="URL", help=f"后端 API 根地址(默认:{DEFAULT_API_URL})") parser.add_argument("--dry-run", action="store_true", help="仅从 DB 取数并组装 payload,不实际调用接口") parser.add_argument("--verbose", "-v", action="store_true", help="dry-run 时打印完整 payload JSON") parser.add_argument("--delay", type=int, default=100, metavar="MS", help="两次 API 调用之间的间隔毫秒数(默认:100)") parser.add_argument("--query-id", default=None, metavar="QID", help="只导入该搜索任务(query_id)下的采纳 case") parser.add_argument("--limit", type=int, default=None, metavar="N", help="只处理前 N 个 case(调试用)") parser.add_argument("--force", action="store_true", help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)") args = parser.parse_args() run( api_url=args.api_url, dry_run=args.dry_run, verbose=args.verbose, delay_ms=args.delay, query_id=args.query_id, limit=args.limit, force=args.force, ) if __name__ == "__main__": main()