| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- """
- 把数据库里「已采纳」的工序解构(mode_process)批量导入到知识导入接口。
- 与 Downloads/import/how_process_knowledge/main.py 的区别只在数据来源:
- 参考实现 —— 扫 data/*/_model/workflow.json,读本地 JSON 文件。
- 本脚本 —— 从 MySQL 取数:db.fetch_adopted_process_cases() 拿采纳 case_id,
- 再 db.fetch_process(case_id) 重建 {source, procedures}。
- 两边的数据结构一致(source 块 + procedures[].steps[]),故 payload 组装逻辑完全复用。
- 字段映射(同参考实现):
- source.id ← case_id(DB 主键,无需从 JSON 兜底目录名)
- source.source_type ← 固定 "post"
- source.title/author ← source.title / source.author
- 每个 procedure → 一条知识:
- title ← procedure.name;为空回退 "来源标题 — 工序N"
- content ← 整个 procedure 对象的 JSON 串
- dim_attributes ← 固定 ["how工序"] dim_creations ← 固定 ["制作"]
- scopes ← 各步骤 substance/form 去重
- custom_ext ← 各步骤 effect→作用 action→动作 via→工具 去重
- 采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
- 用法:
- python import_process_knowledge.py # 真实导入(采纳工序全量)
- python import_process_knowledge.py --dry-run # 只组装+打印,不调接口
- python import_process_knowledge.py --dry-run --verbose # 打印完整 payload JSON
- python import_process_knowledge.py --query-id q0001 # 只传某搜索任务下的采纳 case
- python import_process_knowledge.py --limit 5 # 只处理前 5 个 case(调试)
- python import_process_knowledge.py --api-url http://... # 指定后端地址
- python import_process_knowledge.py --delay 200 # 每次调用间隔 200ms
- """
- import argparse
- import json
- import logging
- import sys
- import time
- import requests
- import db
- # ── 配置(对齐参考实现)────────────────────────────────────────────────────────
- DEFAULT_API_URL = "http://47.236.83.130:8001"
- INGEST_ENDPOINT = "/api/v1/knowledge/ingest"
- DIM_ATTRIBUTES = ["how工序"]
- DIM_CREATIONS = ["制作"]
- EXT_KEY_EFFECT = "作用"
- EXT_KEY_ACTION = "动作"
- EXT_KEY_TOOL = "工具"
- # ── 日志 ──────────────────────────────────────────────────────────────────────
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(message)s",
- datefmt="%H:%M:%S",
- )
- logger = logging.getLogger(__name__)
- # ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)──────────────────────
- def iter_cases_from_db(query_id=None, limit=None):
- """产出 (case_id, source_data, procedures, version)。
- 先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。
- fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。
- version 用于上传去重:同 case 同工序、版本未变即视为已传过,跳过。
- """
- case_ids = db.fetch_adopted_process_cases(query_id)
- if limit:
- case_ids = case_ids[:limit]
- for case_id in case_ids:
- payload = db.fetch_process(case_id) # 最新版本
- if not payload:
- continue
- yield (case_id, (payload.get("source") or {}),
- (payload.get("procedures") or []), payload.get("version"))
- # ── 作用域提取(原样复用参考实现)──────────────────────────────────────────────
- def _split_values(raw):
- """按顿号分割,括号内的顿号不作为分隔符,结果去重保序。
- "高保真线框图、UI设计稿" → ["高保真线框图", "UI设计稿"]
- "修改后的照片(发型、服装)、二次元服装" → ["修改后的照片(发型、服装)", "二次元服装"]
- """
- parts, current, depth = [], [], 0
- for ch in raw:
- if ch in ("(", "("):
- depth += 1
- current.append(ch)
- elif ch in (")", ")"):
- depth -= 1
- current.append(ch)
- elif ch == "、" and depth == 0:
- part = "".join(current).strip()
- if part:
- parts.append(part)
- current = []
- else:
- current.append(ch)
- part = "".join(current).strip()
- if part:
- parts.append(part)
- seen, result = set(), []
- for p in parts:
- if p not in seen:
- seen.add(p)
- result.append(p)
- return result
- def build_scopes(procedure):
- """从所有步骤收集 substance / form,各自去重后返回 scope 列表。"""
- seen_sub, seen_form, scopes = set(), set(), []
- for step in (procedure.get("steps") or []):
- for sub in _split_values((step.get("substance") or "").strip()):
- if sub not in seen_sub:
- scopes.append({"scope_type": "substance", "value": sub})
- seen_sub.add(sub)
- for form in _split_values((step.get("form") or "").strip()):
- if form not in seen_form:
- scopes.append({"scope_type": "form", "value": form})
- seen_form.add(form)
- return scopes
- def build_custom_ext(procedure):
- """从所有步骤提取 effect/action/via,值按顿号分割,同 key 相同值去重。"""
- ext = []
- seen = {}
- def add(key, raw):
- if not raw:
- return
- for value in _split_values(raw):
- seen.setdefault(key, set())
- if value not in seen[key]:
- seen[key].add(value)
- ext.append({"key": key, "type": "str", "value": value})
- for step in (procedure.get("steps") or []):
- add(EXT_KEY_EFFECT, (step.get("effect") or "").strip())
- add(EXT_KEY_ACTION, (step.get("action") or "").strip())
- via = (step.get("via") or "").strip()
- if via and via != "human":
- add(EXT_KEY_TOOL, via)
- return ext
- # ── 单条 payload 组装(复用参考实现;source_id 直接用 case_id)─────────────────────
- def build_payload(source_id, source_data, procedure, proc_index):
- source_title = (source_data.get("title") or "").strip()
- source_author = (source_data.get("author") or "").strip() or None
- proc_name = (procedure.get("name") or "").strip()
- if proc_name:
- knowledge_title = proc_name
- elif source_title:
- knowledge_title = f"{source_title} — 工序{proc_index}"
- else:
- knowledge_title = f"工序{proc_index}"
- content = json.dumps(procedure, ensure_ascii=False)
- source_metadata = {
- "platform": source_data.get("platform") or "",
- "date": source_data.get("date") or "",
- "url": source_data.get("url") or None,
- "excerpt": (source_data.get("excerpt") or "")[:500],
- "procedure_id": procedure.get("id") or "",
- "procedure_name": proc_name,
- }
- payload = {
- "source": {
- "id": source_id,
- "source_type": "post",
- "title": source_title or None,
- "author": source_author,
- "source_metadata": source_metadata,
- },
- "title": knowledge_title[:512],
- "content": content,
- "dim_attributes": DIM_ATTRIBUTES,
- "dim_creations": DIM_CREATIONS,
- }
- scopes = build_scopes(procedure)
- if scopes:
- payload["scopes"] = scopes
- custom_ext = build_custom_ext(procedure)
- if custom_ext:
- payload["custom_ext"] = custom_ext
- return payload
- # ── 单条写入(原样复用参考实现)────────────────────────────────────────────────
- def ingest_one(api_url, payload, dry_run):
- """调用导入接口写入一条知识,返回 (success, info_message, knowledge_id)。"""
- if dry_run:
- return True, "(dry-run, skipped)", None
- url = api_url.rstrip("/") + INGEST_ENDPOINT
- try:
- resp = requests.post(url, json=payload, timeout=30)
- if resp.status_code == 201:
- kid = resp.json().get("knowledge_id", "?")
- return True, f"knowledge_id={kid}", kid
- try:
- detail = resp.json().get("detail", resp.text[:300])
- except Exception:
- detail = resp.text[:300]
- return False, f"HTTP {resp.status_code}: {detail}", None
- except requests.Timeout:
- return False, "超时(30s)", None
- except requests.RequestException as exc:
- return False, str(exc), None
- # ── 主循环 ────────────────────────────────────────────────────────────────────
- def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
- cases = list(iter_cases_from_db(query_id, limit))
- if not cases:
- scope = f"(query_id={query_id})" if query_id else ""
- logger.error("DB 中未发现任何「已采纳且有工序解构」的 case %s", scope)
- sys.exit(1)
- mode_tag = " [DRY-RUN]" if dry_run else ""
- force_tag = " [FORCE]" if force else ""
- logger.info("发现 %d 个采纳 case。目标接口:%s%s%s", len(cases), api_url, mode_tag, force_tag)
- ok_count = fail_count = skip_count = dup_count = 0
- for case_id, source_data, procedures, version in cases:
- logger.info("── %s", case_id)
- if not procedures:
- logger.warning(" 无 procedures,跳过")
- skip_count += 1
- continue
- # 去重台账:该 case 各工序已导入的版本。force 时清空(强制重导)。
- ingested = {} if force else db.fetch_ingested_map(case_id)
- logger.info(" source_id=%-45s procedures=%d version=%s", case_id, len(procedures), version)
- for idx, procedure in enumerate(procedures, 1):
- # 已导入且版本未变 → 跳过,杜绝重复上传(版本变了说明重解构过,应重导)。
- if not force and ingested.get(idx) == version:
- logger.info(" ♻️ [%d/%d] 已导入(版本 %s),跳过", idx, len(procedures), version)
- dup_count += 1
- continue
- payload = build_payload(case_id, source_data, procedure, idx)
- title = payload["title"]
- n_scopes = len(payload.get("scopes", []))
- n_ext = len(payload.get("custom_ext", []))
- n_steps = len(procedure.get("steps") or [])
- if dry_run and verbose:
- print(f"\n{'=' * 60}")
- print(f"[{case_id}] 工序 {idx}/{len(procedures)}")
- print(json.dumps(payload, ensure_ascii=False, indent=2))
- ok, msg, kid = ingest_one(api_url, payload, dry_run)
- status_icon = "✓" if ok else "✗"
- level = logging.INFO if ok else logging.WARNING
- logger.log(
- level,
- " %s [%d/%d] title=%r steps=%d scopes=%d ext=%d %s",
- status_icon, idx, len(procedures),
- title[:40], n_steps, n_scopes, n_ext, msg,
- )
- if ok:
- ok_count += 1
- # 仅真实导入成功才记台账(dry-run 不写,免污染去重状态)
- if not dry_run:
- try:
- db.mark_ingested(case_id, idx, version, kid, api_url)
- except Exception as exc:
- logger.warning(" ⚠️ 台账写入失败(不影响本次导入):%s", exc)
- else:
- fail_count += 1
- if delay_ms > 0 and not dry_run:
- time.sleep(delay_ms / 1000)
- logger.info(
- "完成。成功=%d 失败=%d 无工序跳过=%d 已传过跳过=%d 合计导入=%d",
- ok_count, fail_count, skip_count, dup_count, ok_count,
- )
- if fail_count:
- sys.exit(1)
- # ── CLI ───────────────────────────────────────────────────────────────────────
- def main():
- parser = argparse.ArgumentParser(
- description="把 DB 中已采纳的工序解构(mode_process)批量导入知识接口",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- parser.add_argument("--api-url", default=DEFAULT_API_URL, metavar="URL",
- help=f"后端 API 根地址(默认:{DEFAULT_API_URL})")
- parser.add_argument("--dry-run", action="store_true",
- help="仅从 DB 取数并组装 payload,不实际调用接口")
- parser.add_argument("--verbose", "-v", action="store_true",
- help="dry-run 时打印完整 payload JSON")
- parser.add_argument("--delay", type=int, default=100, metavar="MS",
- help="两次 API 调用之间的间隔毫秒数(默认:100)")
- parser.add_argument("--query-id", default=None, metavar="QID",
- help="只导入该搜索任务(query_id)下的采纳 case")
- parser.add_argument("--limit", type=int, default=None, metavar="N",
- help="只处理前 N 个 case(调试用)")
- parser.add_argument("--force", action="store_true",
- help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)")
- args = parser.parse_args()
- run(
- api_url=args.api_url,
- dry_run=args.dry_run,
- verbose=args.verbose,
- delay_ms=args.delay,
- query_id=args.query_id,
- limit=args.limit,
- force=args.force,
- )
- if __name__ == "__main__":
- main()
|