import_process_knowledge.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. """
  2. 把数据库里「已采纳」的工序解构(mode_process)批量导入到知识导入接口。
  3. 与 Downloads/import/how_process_knowledge/main.py 的区别只在数据来源:
  4. 参考实现 —— 扫 data/*/_model/workflow.json,读本地 JSON 文件。
  5. 本脚本 —— 从 MySQL 取数:db.fetch_adopted_process_cases() 拿采纳 case_id,
  6. 再 db.fetch_process(case_id) 重建 {source, procedures}。
  7. 两边的数据结构一致(source 块 + procedures[].steps[]),故 payload 组装逻辑完全复用。
  8. 字段映射(同参考实现):
  9. source.id ← case_id(DB 主键,无需从 JSON 兜底目录名)
  10. source.source_type ← 固定 "post"
  11. source.title/author ← source.title / source.author
  12. 每个 procedure → 一条知识:
  13. title ← procedure.name;为空回退 "来源标题 — 工序N"
  14. content ← 整个 procedure 对象的 JSON 串
  15. dim_attributes ← 固定 ["how工序"] dim_creations ← 固定 ["制作"]
  16. scopes ← 各步骤 substance/form 去重
  17. custom_ext ← 各步骤 effect→作用 action→动作 via→工具 去重
  18. 采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
  19. 用法:
  20. python import_process_knowledge.py # 真实导入(采纳工序全量)
  21. python import_process_knowledge.py --dry-run # 只组装+打印,不调接口
  22. python import_process_knowledge.py --dry-run --verbose # 打印完整 payload JSON
  23. python import_process_knowledge.py --query-id q0001 # 只传某搜索任务下的采纳 case
  24. python import_process_knowledge.py --limit 5 # 只处理前 5 个 case(调试)
  25. python import_process_knowledge.py --api-url http://... # 指定后端地址
  26. python import_process_knowledge.py --delay 200 # 每次调用间隔 200ms
  27. """
  28. import argparse
  29. import json
  30. import logging
  31. import sys
  32. import time
  33. import requests
  34. import db
  35. # ── 配置(对齐参考实现)────────────────────────────────────────────────────────
  36. DEFAULT_API_URL = "http://47.236.83.130:8001"
  37. INGEST_ENDPOINT = "/api/v1/knowledge/ingest"
  38. DIM_ATTRIBUTES = ["how工序"]
  39. DIM_CREATIONS = ["制作"]
  40. EXT_KEY_EFFECT = "作用"
  41. EXT_KEY_ACTION = "动作"
  42. EXT_KEY_TOOL = "工具"
  43. # ── 日志 ──────────────────────────────────────────────────────────────────────
  44. logging.basicConfig(
  45. level=logging.INFO,
  46. format="%(asctime)s [%(levelname)s] %(message)s",
  47. datefmt="%H:%M:%S",
  48. )
  49. logger = logging.getLogger(__name__)
  50. # ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)──────────────────────
  51. def iter_cases_from_db(query_id=None, limit=None):
  52. """产出 (case_id, source_data, procedures, version)。
  53. 先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。
  54. fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。
  55. version 用于上传去重:同 case 同工序、版本未变即视为已传过,跳过。
  56. """
  57. case_ids = db.fetch_adopted_process_cases(query_id)
  58. if limit:
  59. case_ids = case_ids[:limit]
  60. for case_id in case_ids:
  61. payload = db.fetch_process(case_id) # 最新版本
  62. if not payload:
  63. continue
  64. yield (case_id, (payload.get("source") or {}),
  65. (payload.get("procedures") or []), payload.get("version"))
  66. # ── 作用域提取(原样复用参考实现)──────────────────────────────────────────────
  67. def _split_values(raw):
  68. """按顿号分割,括号内的顿号不作为分隔符,结果去重保序。
  69. "高保真线框图、UI设计稿" → ["高保真线框图", "UI设计稿"]
  70. "修改后的照片(发型、服装)、二次元服装" → ["修改后的照片(发型、服装)", "二次元服装"]
  71. """
  72. parts, current, depth = [], [], 0
  73. for ch in raw:
  74. if ch in ("(", "("):
  75. depth += 1
  76. current.append(ch)
  77. elif ch in (")", ")"):
  78. depth -= 1
  79. current.append(ch)
  80. elif ch == "、" and depth == 0:
  81. part = "".join(current).strip()
  82. if part:
  83. parts.append(part)
  84. current = []
  85. else:
  86. current.append(ch)
  87. part = "".join(current).strip()
  88. if part:
  89. parts.append(part)
  90. seen, result = set(), []
  91. for p in parts:
  92. if p not in seen:
  93. seen.add(p)
  94. result.append(p)
  95. return result
  96. def build_scopes(procedure):
  97. """从所有步骤收集 substance / form,各自去重后返回 scope 列表。"""
  98. seen_sub, seen_form, scopes = set(), set(), []
  99. for step in (procedure.get("steps") or []):
  100. for sub in _split_values((step.get("substance") or "").strip()):
  101. if sub not in seen_sub:
  102. scopes.append({"scope_type": "substance", "value": sub})
  103. seen_sub.add(sub)
  104. for form in _split_values((step.get("form") or "").strip()):
  105. if form not in seen_form:
  106. scopes.append({"scope_type": "form", "value": form})
  107. seen_form.add(form)
  108. return scopes
  109. def build_custom_ext(procedure):
  110. """从所有步骤提取 effect/action/via,值按顿号分割,同 key 相同值去重。"""
  111. ext = []
  112. seen = {}
  113. def add(key, raw):
  114. if not raw:
  115. return
  116. for value in _split_values(raw):
  117. seen.setdefault(key, set())
  118. if value not in seen[key]:
  119. seen[key].add(value)
  120. ext.append({"key": key, "type": "str", "value": value})
  121. for step in (procedure.get("steps") or []):
  122. add(EXT_KEY_EFFECT, (step.get("effect") or "").strip())
  123. add(EXT_KEY_ACTION, (step.get("action") or "").strip())
  124. via = (step.get("via") or "").strip()
  125. if via and via != "human":
  126. add(EXT_KEY_TOOL, via)
  127. return ext
  128. # ── 单条 payload 组装(复用参考实现;source_id 直接用 case_id)─────────────────────
  129. def build_payload(source_id, source_data, procedure, proc_index):
  130. source_title = (source_data.get("title") or "").strip()
  131. source_author = (source_data.get("author") or "").strip() or None
  132. proc_name = (procedure.get("name") or "").strip()
  133. if proc_name:
  134. knowledge_title = proc_name
  135. elif source_title:
  136. knowledge_title = f"{source_title} — 工序{proc_index}"
  137. else:
  138. knowledge_title = f"工序{proc_index}"
  139. content = json.dumps(procedure, ensure_ascii=False)
  140. source_metadata = {
  141. "platform": source_data.get("platform") or "",
  142. "date": source_data.get("date") or "",
  143. "url": source_data.get("url") or None,
  144. "excerpt": (source_data.get("excerpt") or "")[:500],
  145. "procedure_id": procedure.get("id") or "",
  146. "procedure_name": proc_name,
  147. }
  148. payload = {
  149. "source": {
  150. "id": source_id,
  151. "source_type": "post",
  152. "title": source_title or None,
  153. "author": source_author,
  154. "source_metadata": source_metadata,
  155. },
  156. "title": knowledge_title[:512],
  157. "content": content,
  158. "dim_attributes": DIM_ATTRIBUTES,
  159. "dim_creations": DIM_CREATIONS,
  160. }
  161. scopes = build_scopes(procedure)
  162. if scopes:
  163. payload["scopes"] = scopes
  164. custom_ext = build_custom_ext(procedure)
  165. if custom_ext:
  166. payload["custom_ext"] = custom_ext
  167. return payload
  168. # ── 单条写入(原样复用参考实现)────────────────────────────────────────────────
  169. def ingest_one(api_url, payload, dry_run):
  170. """调用导入接口写入一条知识,返回 (success, info_message, knowledge_id)。"""
  171. if dry_run:
  172. return True, "(dry-run, skipped)", None
  173. url = api_url.rstrip("/") + INGEST_ENDPOINT
  174. try:
  175. resp = requests.post(url, json=payload, timeout=30)
  176. if resp.status_code == 201:
  177. kid = resp.json().get("knowledge_id", "?")
  178. return True, f"knowledge_id={kid}", kid
  179. try:
  180. detail = resp.json().get("detail", resp.text[:300])
  181. except Exception:
  182. detail = resp.text[:300]
  183. return False, f"HTTP {resp.status_code}: {detail}", None
  184. except requests.Timeout:
  185. return False, "超时(30s)", None
  186. except requests.RequestException as exc:
  187. return False, str(exc), None
  188. # ── 主循环 ────────────────────────────────────────────────────────────────────
  189. def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
  190. cases = list(iter_cases_from_db(query_id, limit))
  191. if not cases:
  192. scope = f"(query_id={query_id})" if query_id else ""
  193. logger.error("DB 中未发现任何「已采纳且有工序解构」的 case %s", scope)
  194. sys.exit(1)
  195. mode_tag = " [DRY-RUN]" if dry_run else ""
  196. force_tag = " [FORCE]" if force else ""
  197. logger.info("发现 %d 个采纳 case。目标接口:%s%s%s", len(cases), api_url, mode_tag, force_tag)
  198. ok_count = fail_count = skip_count = dup_count = 0
  199. for case_id, source_data, procedures, version in cases:
  200. logger.info("── %s", case_id)
  201. if not procedures:
  202. logger.warning(" 无 procedures,跳过")
  203. skip_count += 1
  204. continue
  205. # 去重台账:该 case 各工序已导入的版本。force 时清空(强制重导)。
  206. ingested = {} if force else db.fetch_ingested_map(case_id)
  207. logger.info(" source_id=%-45s procedures=%d version=%s", case_id, len(procedures), version)
  208. for idx, procedure in enumerate(procedures, 1):
  209. # 已导入且版本未变 → 跳过,杜绝重复上传(版本变了说明重解构过,应重导)。
  210. if not force and ingested.get(idx) == version:
  211. logger.info(" ♻️ [%d/%d] 已导入(版本 %s),跳过", idx, len(procedures), version)
  212. dup_count += 1
  213. continue
  214. payload = build_payload(case_id, source_data, procedure, idx)
  215. title = payload["title"]
  216. n_scopes = len(payload.get("scopes", []))
  217. n_ext = len(payload.get("custom_ext", []))
  218. n_steps = len(procedure.get("steps") or [])
  219. if dry_run and verbose:
  220. print(f"\n{'=' * 60}")
  221. print(f"[{case_id}] 工序 {idx}/{len(procedures)}")
  222. print(json.dumps(payload, ensure_ascii=False, indent=2))
  223. ok, msg, kid = ingest_one(api_url, payload, dry_run)
  224. status_icon = "✓" if ok else "✗"
  225. level = logging.INFO if ok else logging.WARNING
  226. logger.log(
  227. level,
  228. " %s [%d/%d] title=%r steps=%d scopes=%d ext=%d %s",
  229. status_icon, idx, len(procedures),
  230. title[:40], n_steps, n_scopes, n_ext, msg,
  231. )
  232. if ok:
  233. ok_count += 1
  234. # 仅真实导入成功才记台账(dry-run 不写,免污染去重状态)
  235. if not dry_run:
  236. try:
  237. db.mark_ingested(case_id, idx, version, kid, api_url)
  238. except Exception as exc:
  239. logger.warning(" ⚠️ 台账写入失败(不影响本次导入):%s", exc)
  240. else:
  241. fail_count += 1
  242. if delay_ms > 0 and not dry_run:
  243. time.sleep(delay_ms / 1000)
  244. logger.info(
  245. "完成。成功=%d 失败=%d 无工序跳过=%d 已传过跳过=%d 合计导入=%d",
  246. ok_count, fail_count, skip_count, dup_count, ok_count,
  247. )
  248. if fail_count:
  249. sys.exit(1)
  250. # ── CLI ───────────────────────────────────────────────────────────────────────
  251. def main():
  252. parser = argparse.ArgumentParser(
  253. description="把 DB 中已采纳的工序解构(mode_process)批量导入知识接口",
  254. formatter_class=argparse.RawDescriptionHelpFormatter,
  255. )
  256. parser.add_argument("--api-url", default=DEFAULT_API_URL, metavar="URL",
  257. help=f"后端 API 根地址(默认:{DEFAULT_API_URL})")
  258. parser.add_argument("--dry-run", action="store_true",
  259. help="仅从 DB 取数并组装 payload,不实际调用接口")
  260. parser.add_argument("--verbose", "-v", action="store_true",
  261. help="dry-run 时打印完整 payload JSON")
  262. parser.add_argument("--delay", type=int, default=100, metavar="MS",
  263. help="两次 API 调用之间的间隔毫秒数(默认:100)")
  264. parser.add_argument("--query-id", default=None, metavar="QID",
  265. help="只导入该搜索任务(query_id)下的采纳 case")
  266. parser.add_argument("--limit", type=int, default=None, metavar="N",
  267. help="只处理前 N 个 case(调试用)")
  268. parser.add_argument("--force", action="store_true",
  269. help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)")
  270. args = parser.parse_args()
  271. run(
  272. api_url=args.api_url,
  273. dry_run=args.dry_run,
  274. verbose=args.verbose,
  275. delay_ms=args.delay,
  276. query_id=args.query_id,
  277. limit=args.limit,
  278. force=args.force,
  279. )
  280. if __name__ == "__main__":
  281. main()