import_process_knowledge.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. """
  2. 把数据库里「已采纳」的工序解构(mode_process)批量导入到知识导入接口。
  3. 与 Downloads/import/how_process_knowledge/main.py 的区别只在数据来源:
  4. 参考实现 —— 扫 data/*/_model/workflow.json,读本地 JSON 文件。
  5. 本脚本 —— 从 MySQL 取数:db.fetch_adopted_process_cases() 拿采纳 case_id,
  6. 再 db.fetch_process(case_id) 重建 {source, procedures}。
  7. 两边的数据结构一致(source 块 + procedures[].steps[]),故 payload 组装逻辑完全复用。
  8. 字段映射(同参考实现):
  9. source.id ← case_id(DB 主键,无需从 JSON 兜底目录名)
  10. source.source_type ← 固定 "post"
  11. source.title/author ← source.title / source.author
  12. 每个 procedure → 一条知识:
  13. title ← procedure.name;为空回退 "来源标题 — 工序N"
  14. content ← 整个 procedure 对象的 JSON 串
  15. dim_attributes ← 固定 ["how工序"] dim_creations ← 固定 ["制作"]
  16. scopes ← 各步骤 substance/form 去重
  17. custom_ext ← 各步骤 effect→作用 action→动作 via→工具 去重
  18. 采纳口径:db.is_adopted_rel(相关性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
  19. 用法:
  20. python import_process_knowledge.py # 真实导入(采纳工序全量)
  21. python import_process_knowledge.py --dry-run # 只组装+打印,不调接口
  22. python import_process_knowledge.py --dry-run --verbose # 打印完整 payload JSON
  23. python import_process_knowledge.py --query-id q0001 # 只传某搜索任务下的采纳 case
  24. python import_process_knowledge.py --limit 5 # 只处理前 5 个 case(调试)
  25. python import_process_knowledge.py --api-url http://... # 指定后端地址
  26. python import_process_knowledge.py --delay 200 # 每次调用间隔 200ms
  27. """
  28. import argparse
  29. import json
  30. import logging
  31. import sys
  32. import time
  33. import requests
  34. import db
  35. # ── 配置(对齐参考实现)────────────────────────────────────────────────────────
  36. DEFAULT_API_URL = "http://47.236.83.130:8001"
  37. INGEST_ENDPOINT = "/api/v1/knowledge/ingest"
  38. DIM_ATTRIBUTES = ["how工序"]
  39. DIM_CREATIONS = ["制作"]
  40. EXT_KEY_EFFECT = "作用"
  41. EXT_KEY_ACTION = "动作"
  42. EXT_KEY_TOOL = "工具"
  43. # ── 日志 ──────────────────────────────────────────────────────────────────────
  44. logging.basicConfig(
  45. level=logging.INFO,
  46. format="%(asctime)s [%(levelname)s] %(message)s",
  47. datefmt="%H:%M:%S",
  48. )
  49. logger = logging.getLogger(__name__)
  50. # ── 数据来源:从 DB 取采纳工序(替代参考实现的本地文件扫描)──────────────────────
  51. def iter_cases_from_db(query_id=None, limit=None):
  52. """产出 (case_id, source_data, procedures)。
  53. 先取采纳 case_id 列表,再逐个 fetch_process 重建解构详情(取最新版本)。
  54. fetch_process 返回 None(无解构行)或 procedures 为空的 case 自动跳过。
  55. """
  56. case_ids = db.fetch_adopted_process_cases(query_id)
  57. if limit:
  58. case_ids = case_ids[:limit]
  59. for case_id in case_ids:
  60. payload = db.fetch_process(case_id) # 最新版本
  61. if not payload:
  62. continue
  63. yield case_id, (payload.get("source") or {}), (payload.get("procedures") or [])
  64. # ── 作用域提取(原样复用参考实现)──────────────────────────────────────────────
  65. def _split_values(raw):
  66. """按顿号分割,括号内的顿号不作为分隔符,结果去重保序。
  67. "高保真线框图、UI设计稿" → ["高保真线框图", "UI设计稿"]
  68. "修改后的照片(发型、服装)、二次元服装" → ["修改后的照片(发型、服装)", "二次元服装"]
  69. """
  70. parts, current, depth = [], [], 0
  71. for ch in raw:
  72. if ch in ("(", "("):
  73. depth += 1
  74. current.append(ch)
  75. elif ch in (")", ")"):
  76. depth -= 1
  77. current.append(ch)
  78. elif ch == "、" and depth == 0:
  79. part = "".join(current).strip()
  80. if part:
  81. parts.append(part)
  82. current = []
  83. else:
  84. current.append(ch)
  85. part = "".join(current).strip()
  86. if part:
  87. parts.append(part)
  88. seen, result = set(), []
  89. for p in parts:
  90. if p not in seen:
  91. seen.add(p)
  92. result.append(p)
  93. return result
  94. def build_scopes(procedure):
  95. """从所有步骤收集 substance / form,各自去重后返回 scope 列表。"""
  96. seen_sub, seen_form, scopes = set(), set(), []
  97. for step in (procedure.get("steps") or []):
  98. for sub in _split_values((step.get("substance") or "").strip()):
  99. if sub not in seen_sub:
  100. scopes.append({"scope_type": "substance", "value": sub})
  101. seen_sub.add(sub)
  102. for form in _split_values((step.get("form") or "").strip()):
  103. if form not in seen_form:
  104. scopes.append({"scope_type": "form", "value": form})
  105. seen_form.add(form)
  106. return scopes
  107. def build_custom_ext(procedure):
  108. """从所有步骤提取 effect/action/via,值按顿号分割,同 key 相同值去重。"""
  109. ext = []
  110. seen = {}
  111. def add(key, raw):
  112. if not raw:
  113. return
  114. for value in _split_values(raw):
  115. seen.setdefault(key, set())
  116. if value not in seen[key]:
  117. seen[key].add(value)
  118. ext.append({"key": key, "type": "str", "value": value})
  119. for step in (procedure.get("steps") or []):
  120. add(EXT_KEY_EFFECT, (step.get("effect") or "").strip())
  121. add(EXT_KEY_ACTION, (step.get("action") or "").strip())
  122. via = (step.get("via") or "").strip()
  123. if via and via != "human":
  124. add(EXT_KEY_TOOL, via)
  125. return ext
  126. # ── 单条 payload 组装(复用参考实现;source_id 直接用 case_id)─────────────────────
  127. def build_payload(source_id, source_data, procedure, proc_index):
  128. source_title = (source_data.get("title") or "").strip()
  129. source_author = (source_data.get("author") or "").strip() or None
  130. proc_name = (procedure.get("name") or "").strip()
  131. if proc_name:
  132. knowledge_title = proc_name
  133. elif source_title:
  134. knowledge_title = f"{source_title} — 工序{proc_index}"
  135. else:
  136. knowledge_title = f"工序{proc_index}"
  137. content = json.dumps(procedure, ensure_ascii=False)
  138. source_metadata = {
  139. "platform": source_data.get("platform") or "",
  140. "date": source_data.get("date") or "",
  141. "url": source_data.get("url") or None,
  142. "excerpt": (source_data.get("excerpt") or "")[:500],
  143. "procedure_id": procedure.get("id") or "",
  144. "procedure_name": proc_name,
  145. }
  146. payload = {
  147. "source": {
  148. "id": source_id,
  149. "source_type": "post",
  150. "title": source_title or None,
  151. "author": source_author,
  152. "source_metadata": source_metadata,
  153. },
  154. "title": knowledge_title[:512],
  155. "content": content,
  156. "dim_attributes": DIM_ATTRIBUTES,
  157. "dim_creations": DIM_CREATIONS,
  158. }
  159. scopes = build_scopes(procedure)
  160. if scopes:
  161. payload["scopes"] = scopes
  162. custom_ext = build_custom_ext(procedure)
  163. if custom_ext:
  164. payload["custom_ext"] = custom_ext
  165. return payload
  166. # ── 单条写入(原样复用参考实现)────────────────────────────────────────────────
  167. def ingest_one(api_url, payload, dry_run):
  168. """调用导入接口写入一条知识,返回 (success, info_message)。"""
  169. if dry_run:
  170. return True, "(dry-run, skipped)"
  171. url = api_url.rstrip("/") + INGEST_ENDPOINT
  172. try:
  173. resp = requests.post(url, json=payload, timeout=30)
  174. if resp.status_code == 201:
  175. kid = resp.json().get("knowledge_id", "?")
  176. return True, f"knowledge_id={kid}"
  177. try:
  178. detail = resp.json().get("detail", resp.text[:300])
  179. except Exception:
  180. detail = resp.text[:300]
  181. return False, f"HTTP {resp.status_code}: {detail}"
  182. except requests.Timeout:
  183. return False, "超时(30s)"
  184. except requests.RequestException as exc:
  185. return False, str(exc)
  186. # ── 主循环 ────────────────────────────────────────────────────────────────────
  187. def run(api_url, dry_run, verbose, delay_ms, query_id, limit):
  188. cases = list(iter_cases_from_db(query_id, limit))
  189. if not cases:
  190. scope = f"(query_id={query_id})" if query_id else ""
  191. logger.error("DB 中未发现任何「已采纳且有工序解构」的 case %s", scope)
  192. sys.exit(1)
  193. mode_tag = " [DRY-RUN]" if dry_run else ""
  194. logger.info("发现 %d 个采纳 case。目标接口:%s%s", len(cases), api_url, mode_tag)
  195. ok_count = fail_count = skip_count = 0
  196. for case_id, source_data, procedures in cases:
  197. logger.info("── %s", case_id)
  198. if not procedures:
  199. logger.warning(" 无 procedures,跳过")
  200. skip_count += 1
  201. continue
  202. logger.info(" source_id=%-45s procedures=%d", case_id, len(procedures))
  203. for idx, procedure in enumerate(procedures, 1):
  204. payload = build_payload(case_id, source_data, procedure, idx)
  205. title = payload["title"]
  206. n_scopes = len(payload.get("scopes", []))
  207. n_ext = len(payload.get("custom_ext", []))
  208. n_steps = len(procedure.get("steps") or [])
  209. if dry_run and verbose:
  210. print(f"\n{'=' * 60}")
  211. print(f"[{case_id}] 工序 {idx}/{len(procedures)}")
  212. print(json.dumps(payload, ensure_ascii=False, indent=2))
  213. ok, msg = ingest_one(api_url, payload, dry_run)
  214. status_icon = "✓" if ok else "✗"
  215. level = logging.INFO if ok else logging.WARNING
  216. logger.log(
  217. level,
  218. " %s [%d/%d] title=%r steps=%d scopes=%d ext=%d %s",
  219. status_icon, idx, len(procedures),
  220. title[:40], n_steps, n_scopes, n_ext, msg,
  221. )
  222. if ok:
  223. ok_count += 1
  224. else:
  225. fail_count += 1
  226. if delay_ms > 0 and not dry_run:
  227. time.sleep(delay_ms / 1000)
  228. logger.info(
  229. "完成。成功=%d 失败=%d 跳过=%d 合计导入=%d",
  230. ok_count, fail_count, skip_count, ok_count,
  231. )
  232. if fail_count:
  233. sys.exit(1)
  234. # ── CLI ───────────────────────────────────────────────────────────────────────
  235. def main():
  236. parser = argparse.ArgumentParser(
  237. description="把 DB 中已采纳的工序解构(mode_process)批量导入知识接口",
  238. formatter_class=argparse.RawDescriptionHelpFormatter,
  239. )
  240. parser.add_argument("--api-url", default=DEFAULT_API_URL, metavar="URL",
  241. help=f"后端 API 根地址(默认:{DEFAULT_API_URL})")
  242. parser.add_argument("--dry-run", action="store_true",
  243. help="仅从 DB 取数并组装 payload,不实际调用接口")
  244. parser.add_argument("--verbose", "-v", action="store_true",
  245. help="dry-run 时打印完整 payload JSON")
  246. parser.add_argument("--delay", type=int, default=100, metavar="MS",
  247. help="两次 API 调用之间的间隔毫秒数(默认:100)")
  248. parser.add_argument("--query-id", default=None, metavar="QID",
  249. help="只导入该搜索任务(query_id)下的采纳 case")
  250. parser.add_argument("--limit", type=int, default=None, metavar="N",
  251. help="只处理前 N 个 case(调试用)")
  252. args = parser.parse_args()
  253. run(
  254. api_url=args.api_url,
  255. dry_run=args.dry_run,
  256. verbose=args.verbose,
  257. delay_ms=args.delay,
  258. query_id=args.query_id,
  259. limit=args.limit,
  260. )
  261. if __name__ == "__main__":
  262. main()