dedup_db_records.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. """
  2. dedup_db_records.py
  3. 语义去重:原子能力 / 制作策略 / 案例素材
  4. 用法:
  5. --dry-run 分析 + 生成 dedup_plan.json,不写库
  6. --execute 读取 dedup_plan.json 并应用(不重新调用 LLM)
  7. (无参数) 分析 + 直接执行(不保存 plan)
  8. 去重逻辑:
  9. Pass 1 - Case: difflib title 相似度 > 80% → 合并
  10. Pass 2 - Capability: difflib name 预筛 (>60%) → Qwen3.5-plus 精判
  11. Pass 3 - Strategy: 对 capability_id 集合的 Jaccard 相似度 > 70% → 合并
  12. (Strategy 去重必须在 Capability 去重之后,因依赖其 ID)
  13. """
  14. import os, sys, json, asyncio, argparse, difflib
  15. from pathlib import Path
  16. repo_root = str(Path(__file__).parent.parent.parent.parent)
  17. if repo_root not in sys.path:
  18. sys.path.insert(0, repo_root)
  19. from dotenv import load_dotenv
  20. load_dotenv()
  21. from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
  22. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  23. from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
  24. from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
  25. from knowhub.knowhub_db.pg_tool_store import PostgreSQLToolStore
  26. from agent.llm.openrouter import openrouter_llm_call
  27. PLAN_FILE = Path(__file__).parent / "dedup_plan.json"
  28. # ─── helpers ─────────────────────────────────────────────────────────────────
  29. def sim(a: str, b: str) -> float:
  30. return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
  31. def apply_merges(store, entity_table: str, junction_tables: list, merges: list, dry_run: bool):
  32. """junction_tables: [(table_name, foreign_key_col), ...]
  33. Supports both flat {master_id, duplicate_ids[]} and enriched {master:{id}, duplicates:[{id}]} formats.
  34. """
  35. if not merges:
  36. return
  37. cur = store._get_cursor()
  38. try:
  39. for merge in merges:
  40. # Support both flat and enriched plan formats
  41. if "master_id" in merge:
  42. master_id = merge["master_id"]
  43. dup_ids = merge.get("duplicate_ids", [])
  44. else:
  45. master_id = merge["master"]["id"]
  46. dup_ids = [d["id"] for d in merge.get("duplicates", [])]
  47. if not merge.get("enabled", True):
  48. print(" [SKIP disabled] master=%s" % master_id)
  49. continue
  50. for dup_id in dup_ids:
  51. print(" Merge %s -> %s" % (dup_id, master_id))
  52. if dry_run:
  53. continue
  54. for (jtable, fkcol) in junction_tables:
  55. # INSERT master rows (skip existing)
  56. cur.execute("SELECT * FROM " + jtable + " WHERE " + fkcol + " = %s", (dup_id,))
  57. rows = cur.fetchall()
  58. for row in rows:
  59. try:
  60. cols = [desc[0] for desc in cur.description]
  61. new_vals = []
  62. for c, v in zip(cols, row):
  63. new_vals.append(master_id if c == fkcol else v)
  64. placeholders = ",".join(["%s"] * len(cols))
  65. cur.execute(
  66. "INSERT INTO " + jtable + " (" + ",".join(cols) + ") VALUES (" + placeholders + ") ON CONFLICT DO NOTHING",
  67. new_vals
  68. )
  69. except Exception:
  70. store.conn.rollback()
  71. cur.execute("DELETE FROM " + jtable + " WHERE " + fkcol + " = %s", (dup_id,))
  72. # Instead of deleting, mark them as deprecated based on user request
  73. if entity_table == "resource":
  74. cur.execute("UPDATE " + entity_table + " SET title = COALESCE(title, '') || '-已废弃' WHERE id = %s AND COALESCE(title, '') NOT LIKE '%%-已废弃'", (dup_id,))
  75. else:
  76. cur.execute("UPDATE " + entity_table + " SET name = COALESCE(name, '') || '-已废弃' WHERE id = %s AND COALESCE(name, '') NOT LIKE '%%-已废弃'", (dup_id,))
  77. if not dry_run:
  78. store.conn.commit()
  79. finally:
  80. cur.close()
  81. # ─── Pass 0: Tools ────────────────────────────────────────────────────────────────
  82. async def plan_tool_dedup(tool_store, model="anthropic/claude-sonnet-4.5") -> list:
  83. """
  84. Deduplicate tools using LLM to catch aliases (e.g. "PS" vs "Photoshop").
  85. """
  86. print("\n[Pass 0] Tool dedup (LLM semantic batching)...")
  87. tools = tool_store.list_all(limit=10000)
  88. print(" Total tools: %d" % len(tools))
  89. payload = []
  90. # Deduplicate exact name matching locally first to reduce LLM load
  91. name_map = {}
  92. exact_merges = []
  93. for t in tools:
  94. t_name = t.get("name") or t["id"]
  95. t_lower = t_name.strip().lower()
  96. full_tool = {
  97. "id": t["id"],
  98. "name": t_name,
  99. "introduction": t.get("introduction", ""),
  100. "tutorial": t.get("tutorial", "")
  101. }
  102. if t_lower in name_map:
  103. exact_merges.append((name_map[t_lower], t))
  104. else:
  105. name_map[t_lower] = t
  106. payload.append(full_tool)
  107. print(" Unique entities to send to LLM: %d" % len(payload))
  108. tools_schema = [{
  109. "type": "function",
  110. "function": {
  111. "name": "submit_tool_merges",
  112. "description": "Submit semantically identical tool merge groups along with their synthesized complete field states",
  113. "parameters": {
  114. "type": "object",
  115. "properties": {
  116. "merges": {
  117. "type": "array",
  118. "items": {
  119. "type": "object",
  120. "properties": {
  121. "master_id": {"type": "string", "description": "ID of the tool chosen to be the master"},
  122. "duplicate_ids": {"type": "array", "items": {"type": "string"}, "description": "IDs of the duplicate tools"},
  123. "merged_state": {
  124. "type": "object",
  125. "description": "The ultimate merged state of this tool, combining the best information from all duplicates.",
  126. "properties": {
  127. "name": {"type": "string", "description": "The standardized, optimal industry-consensus name"},
  128. "introduction": {"type": "string", "description": "Combined and normalized introduction text"},
  129. "tutorial": {"type": "string", "description": "Combined tutorial text if any"}
  130. },
  131. "required": ["name"]
  132. }
  133. },
  134. "required": ["master_id", "duplicate_ids", "merged_state"]
  135. }
  136. }
  137. },
  138. "required": ["merges"]
  139. }
  140. }
  141. }]
  142. prompt = (
  143. "你是一个 AI 工具注册表维护专家。下面是一个软件/工具实体的列表。\n"
  144. "你的任务是找出所有因为别名、缩写或微小拼写差异(例如 'PS' 和 'Photoshop')造成的重复工具条目。\n"
  145. "对于每一组极其肯定为重复的项,请指定一个 master_id(保留的主干),列出 duplicate_ids(待废弃的重复项),并合成一个强大的 `merged_state` 对象。\n"
  146. "这个 `merged_state` 必须智能地吸收并结合它们各自的描述与教程信息,并敲定最官方权威的工具名称 `name`。\n\n"
  147. "工具列表:\n" + json.dumps(payload, ensure_ascii=False, indent=2)
  148. )
  149. print(" Calling %s..." % model)
  150. try:
  151. resp = await openrouter_llm_call(
  152. messages=[{"role": "user", "content": prompt}],
  153. model=model,
  154. tools=tools_schema,
  155. tool_choice="required",
  156. max_tokens=4096
  157. )
  158. tool_calls = resp.get("tool_calls", [])
  159. except Exception as e:
  160. print(" LLM call failed: %s" % e)
  161. tool_calls = []
  162. merges = []
  163. # Add the local exact matches first
  164. exact_groups = {}
  165. for master, dup in exact_merges:
  166. exact_groups.setdefault(master["id"], {"master": master, "dups": []})["dups"].append(dup)
  167. for m_id, grp in exact_groups.items():
  168. merges.append({
  169. "enabled": True,
  170. "master": {"id": m_id, "name": grp["master"]["name"]},
  171. "duplicates": [{"id": d["id"], "name": d["name"], "match_reason": "exact_name_local"} for d in grp["dups"]],
  172. "merged_result": "Keep master, redirect relations"
  173. })
  174. # Add the LLM matches
  175. if tool_calls:
  176. try:
  177. args = json.loads(tool_calls[0]["function"]["arguments"])
  178. llm_merges = args.get("merges", [])
  179. print(" LLM returned %d merge groups." % len(llm_merges))
  180. # map back to names
  181. id_name_map = {t["id"]: t["name"] for t in payload}
  182. for m in llm_merges:
  183. m_id = m["master_id"]
  184. d_ids = m["duplicate_ids"]
  185. m_state = m.get("merged_state", {})
  186. if m_id not in id_name_map: continue
  187. dups = []
  188. for d_id in d_ids:
  189. if d_id in id_name_map and d_id != m_id:
  190. dups.append({"id": d_id, "name": id_name_map[d_id], "match_reason": "llm_semantic"})
  191. if dups:
  192. merges.append({
  193. "enabled": True,
  194. "master": {"id": m_id, "name": id_name_map[m_id], "merged_state": m_state},
  195. "duplicates": dups,
  196. "merged_result": "Keep master, update fields to merged_state, redirect relations"
  197. })
  198. except Exception as e:
  199. print(" Error parsing LLM response:", e)
  200. print(" Total Tool clusters to merge: %d" % len(merges))
  201. return merges
  202. # ─── Pass 1: Cases ────────────────────────────────────────────────────────────
  203. def plan_case_dedup(res_store) -> list:
  204. print("\n[Pass 1] Case dedup (source_url exact + title fuzzy >80%%)...")
  205. cases = res_store.list_resources(content_type="case", limit=10000)
  206. print(" Total cases: %d" % len(cases))
  207. def get_url(c):
  208. import json as _json
  209. meta = c.get("metadata") or {}
  210. if isinstance(meta, str):
  211. try: meta = _json.loads(meta)
  212. except Exception: meta = {}
  213. return (meta.get("source_url") or "").strip()
  214. processed = set()
  215. merges = []
  216. # Step 1: Exact URL match
  217. url_groups = {}
  218. for c in cases:
  219. url = get_url(c)
  220. if url:
  221. url_groups.setdefault(url, []).append(c)
  222. for url, group in url_groups.items():
  223. if len(group) < 2:
  224. continue
  225. master = group[0]
  226. dups = []
  227. for b in group[1:]:
  228. if b["id"] in processed or master["id"] in processed:
  229. continue
  230. dups.append({"id": b["id"], "title": b.get("title", ""),
  231. "match_reason": "exact_url", "similarity": 1.0, "source_url": url})
  232. processed.add(b["id"])
  233. if dups:
  234. processed.add(master["id"])
  235. merges.append({
  236. "enabled": True,
  237. "master": {"id": master["id"], "title": master.get("title", ""), "source_url": url},
  238. "duplicates": dups,
  239. "merged_result": "Keep master, redirect all relations to master id"
  240. })
  241. # Step 2: Title fuzzy match fallback
  242. for i, a in enumerate(cases):
  243. if a["id"] in processed:
  244. continue
  245. dups = []
  246. for j in range(i + 1, len(cases)):
  247. b = cases[j]
  248. if b["id"] in processed:
  249. continue
  250. score = sim(a.get("title", ""), b.get("title", ""))
  251. if score > 0.80:
  252. dups.append({"id": b["id"], "title": b.get("title", ""),
  253. "match_reason": "title_fuzzy", "similarity": round(score, 3)})
  254. processed.add(b["id"])
  255. if dups:
  256. processed.add(a["id"])
  257. merges.append({
  258. "enabled": True,
  259. "master": {"id": a["id"], "title": a.get("title", ""), "source_url": get_url(a)},
  260. "duplicates": dups,
  261. "merged_result": "Keep master, redirect all relations to master id"
  262. })
  263. url_m = sum(1 for m in merges if any(d.get("match_reason") == "exact_url" for d in m["duplicates"]))
  264. print(" URL exact: %d groups, Title fuzzy: %d groups" % (url_m, len(merges) - url_m))
  265. return merges
  266. # ─── Pass 2: Capabilities ─────────────────────────────────────────────────────
  267. async def plan_cap_dedup(cap_store, model="anthropic/claude-sonnet-4.5") -> list:
  268. print("\n[Pass 2] Capability dedup (LLM Global Clustering + LLM Detailed Merging)...")
  269. caps = cap_store.list_all(limit=10000)
  270. print(" Total capabilities: %d" % len(caps))
  271. # Phase 1: Global LLM Clustering
  272. print(" Running Phase 1: Global capability clustering via LLM...")
  273. minimal_caps = [{"id": c["id"], "name": c.get("name", "")} for c in caps]
  274. cluster_schema = [{
  275. "type": "function",
  276. "function": {
  277. "name": "generate_candidate_clusters",
  278. "description": "Group identical or highly similar capability IDs together.",
  279. "parameters": {
  280. "type": "object",
  281. "properties": {
  282. "clusters": {
  283. "type": "array",
  284. "items": {
  285. "type": "array",
  286. "items": {"type": "string", "description": "capability ID"}
  287. }
  288. }
  289. },
  290. "required": ["clusters"]
  291. }
  292. }
  293. }]
  294. cluster_prompt = (
  295. "你是一个顶级的全景归类AI。下面是数据库中现存的所有「原子能力」极简清单。\n"
  296. "你的任务是仔细阅读它们的名字,找出所有**在业务意义上完全相同或可以合并**的能力,哪怕它们的字面长短不一、用词差异极大。\n"
  297. "将那些应当被认为是完全重复或高度同质化的原子能力ID打包在一起,形成聚类数组。\n"
  298. "注意:只有包含 2 个或以上名字的能力圈子才需要返回。你必须尽可能不遗漏任何有合并价值的核心节点。\n\n"
  299. "能力清单:\n" + json.dumps(minimal_caps, ensure_ascii=False)
  300. )
  301. try:
  302. resp = await openrouter_llm_call(
  303. messages=[{"role": "user", "content": cluster_prompt}],
  304. model=model,
  305. tools=cluster_schema,
  306. tool_choice="required",
  307. max_tokens=8192
  308. )
  309. t_calls = resp.get("tool_calls")
  310. if not t_calls:
  311. print(" Phase 1 returned no tool calls/clusters.")
  312. return []
  313. args = json.loads(t_calls[0]["function"]["arguments"])
  314. cluster_ids = args.get("clusters", [])
  315. except Exception as e:
  316. print(" [Error] Phase 1 Global Clustering failed: %s" % e)
  317. return []
  318. print(" Phase 1 raw LLM clustering returned %d clusters." % len(cluster_ids))
  319. # Map IDs back to detailed capability objects
  320. caps_map = {c["id"]: c for c in caps}
  321. candidate_clusters = []
  322. for id_list in cluster_ids:
  323. real_objs = [caps_map[cid] for cid in id_list if cid in caps_map]
  324. if len(real_objs) > 1:
  325. candidate_clusters.append(real_objs)
  326. print(" Valid candidate clusters hydrated: %d (total items: %d)" % (
  327. len(candidate_clusters), sum(len(c) for c in candidate_clusters)))
  328. if not candidate_clusters:
  329. return []
  330. print(" Running Phase 2: Detailed merging of candidate clusters...")
  331. # Build payload for Qwen - candidate clusters equipped with details
  332. payload = []
  333. for cluster in candidate_clusters:
  334. grp = []
  335. for c in cluster:
  336. effects_str = c.get("effects") or "[]"
  337. try:
  338. if isinstance(effects_str, str): effects_obj = json.loads(effects_str)
  339. else: effects_obj = effects_str
  340. except: effects_obj = []
  341. implements_str = c.get("implements") or "{}"
  342. try:
  343. if isinstance(implements_str, str): imps_obj = json.loads(implements_str)
  344. else: imps_obj = implements_str
  345. except: imps_obj = {}
  346. criterion_str = c.get("criterion") or "[]"
  347. try:
  348. if isinstance(criterion_str, str): crit_obj = json.loads(criterion_str)
  349. else: crit_obj = criterion_str
  350. except: crit_obj = []
  351. grp.append({
  352. "id": c["id"],
  353. "name": c.get("name", ""),
  354. "description": c.get("description", ""),
  355. "effects": effects_obj,
  356. "implements": imps_obj,
  357. "criterion": crit_obj
  358. })
  359. payload.append(grp)
  360. tools_schema = [{
  361. "type": "function",
  362. "function": {
  363. "name": "submit_capability_merges",
  364. "description": "Submit semantically IDENTICAL capability merge groups, complete with synthesized fields",
  365. "parameters": {
  366. "type": "object",
  367. "properties": {
  368. "merges": {
  369. "type": "array",
  370. "items": {
  371. "type": "object",
  372. "properties": {
  373. "master_id": {"type": "string"},
  374. "duplicate_ids": {"type": "array", "items": {"type": "string"}},
  375. "merged_description": {"type": "string", "description": "A comprehensive, combined description integrating details from all merged capabilities."},
  376. "synthesized_effects": {
  377. "type": "array",
  378. "items": {"type": "object"},
  379. "description": "A deduplicated, logically combined array of all effects from the master and duplicates."
  380. },
  381. "synthesized_implements": {
  382. "type": "object",
  383. "description": "A deduplicated, logically combined dictionary map of all implements/parameters from the master and duplicates."
  384. },
  385. "synthesized_criterion": {
  386. "type": "array",
  387. "items": {"type": "object"},
  388. "description": "A deduplicated, logically combined array of evaluation criteria from the master and duplicates."
  389. }
  390. },
  391. "required": ["master_id", "duplicate_ids", "merged_description", "synthesized_effects", "synthesized_implements", "synthesized_criterion"]
  392. }
  393. }
  394. },
  395. "required": ["merges"]
  396. }
  397. }
  398. }]
  399. prompt_template = (
  400. "你是一个高级 AI 知识图谱工程师。下面是若干组经过粗筛的候选「原子能力」簇。\n"
  401. "请仔细分析每个簇中的能力,判断它们是否属于语义和操作本质上完全相同的原子动作。\n"
  402. "如果它们的核心本质和底层操作逻辑根本就是一回事(即使举例或关联对象有微小差异),你也**必须**将它们合并!\n"
  403. "对于每决定好的一组合并:选择一个最佳的 master_id,将其他的塞进 duplicate_ids。\n"
  404. "核心任务:你必须亲自操刀,合成一份大一统综合描述 `merged_description`;\n"
  405. "生成一个去重后的影响力效果数组 `synthesized_effects`;\n"
  406. "生成一个整合了所有实现级参数与独立用法的字典 `synthesized_implements`;\n"
  407. "最后,生成一个保留全部评判标准的去重评估数组 `synthesized_criterion`。\n\n"
  408. "候选原子能力簇:\n{payload_json}"
  409. )
  410. BATCH_SIZE = 5
  411. batches = [payload[i:i + BATCH_SIZE] for i in range(0, len(payload), BATCH_SIZE)]
  412. print(" Calling %s in %d batches..." % (model, len(batches)))
  413. async def process_batch(batch):
  414. prompt = prompt_template.format(payload_json=json.dumps(batch, ensure_ascii=False, indent=2))
  415. try:
  416. resp = await openrouter_llm_call(messages=[{"role": "user", "content": prompt}], model=model, tools=tools_schema, tool_choice="required", max_tokens=4096)
  417. t_calls = resp.get("tool_calls")
  418. if not t_calls: return []
  419. args = json.loads(t_calls[0]["function"]["arguments"])
  420. return args.get("merges", [])
  421. except Exception as e:
  422. print(" [Error] LLM batch failed: %s" % e)
  423. return []
  424. results = await asyncio.gather(*(process_batch(b) for b in batches))
  425. final_merges = []
  426. total_merges = 0
  427. for merges in results:
  428. total_merges += len(merges)
  429. for m in merges:
  430. final_merges.append({
  431. "enabled": True,
  432. "master": {
  433. "id": m["master_id"],
  434. "merged_description": m.get("merged_description", ""),
  435. "synthesized_effects": m.get("synthesized_effects", []),
  436. "synthesized_implements": m.get("synthesized_implements", {}),
  437. "synthesized_criterion": m.get("synthesized_criterion", [])
  438. },
  439. "duplicates": [{"id": d_id} for d_id in m["duplicate_ids"]],
  440. "merged_result": "Keep master, update fields, redirect relations"
  441. })
  442. print(" LLM suggested %d total merge groups across all batches." % total_merges)
  443. return final_merges
  444. # ─── Pass 3: Strategies (Jaccard on capability_ids) ───────────────────────────
  445. async def plan_strategy_dedup(strat_store, model="anthropic/claude-sonnet-4.5", threshold=0.45) -> list:
  446. print("\n[Pass 3] Strategy dedup (Jaccard > %.0f%% + LLM workflow refinement)..." % (threshold * 100))
  447. strats = strat_store.list_all(limit=10000)
  448. print(" Total strategies: %d" % len(strats))
  449. cap_sets = {}
  450. for s in strats:
  451. ids = set(s.get("capability_ids") or [])
  452. if ids: cap_sets[s["id"]] = ids
  453. processed = set()
  454. candidate_clusters = []
  455. strat_list = [s for s in strats if s["id"] in cap_sets]
  456. # Pre-filter clusters
  457. for i, a in enumerate(strat_list):
  458. if a["id"] in processed: continue
  459. cluster = [a]
  460. sa = cap_sets[a["id"]]
  461. for j in range(i + 1, len(strat_list)):
  462. b = strat_list[j]
  463. if b["id"] in processed: continue
  464. sb = cap_sets[b["id"]]
  465. if not sa and not sb: continue
  466. jaccard = len(sa & sb) / len(sa | sb)
  467. if jaccard >= threshold:
  468. cluster.append(b)
  469. processed.add(b["id"])
  470. if len(cluster) > 1:
  471. processed.add(a["id"])
  472. candidate_clusters.append(cluster)
  473. print(" Candidate strategy clusters (Jaccard > %.0f%%): %d" % (threshold * 100, len(candidate_clusters)))
  474. if not candidate_clusters: return []
  475. payload = []
  476. for cluster in candidate_clusters:
  477. grp = []
  478. for s in cluster:
  479. body_str = s.get("body") or "{}"
  480. try:
  481. if isinstance(body_str, str): b_obj = json.loads(body_str)
  482. else: b_obj = body_str
  483. except: b_obj = {"_raw": body_str}
  484. # Extract just the steps to save tokens (user requested mainly description)
  485. workflow = b_obj.get("workflow", [])
  486. condensed_workflow = [{"phase": w.get("phase", ""), "description": w.get("description", "")} for w in workflow]
  487. grp.append({
  488. "id": s["id"],
  489. "name": s.get("name", "Unnamed Strategy"),
  490. "workflow_summary": condensed_workflow
  491. })
  492. payload.append(grp)
  493. strat_schema = [{
  494. "type": "function",
  495. "function": {
  496. "name": "submit_strategy_merges",
  497. "description": "Submit semantically identical strategy workflows, consolidating their step instructions.",
  498. "parameters": {
  499. "type": "object",
  500. "properties": {
  501. "merges": {
  502. "type": "array",
  503. "items": {
  504. "type": "object",
  505. "properties": {
  506. "master_id": {"type": "string"},
  507. "duplicate_ids": {"type": "array", "items": {"type": "string"}},
  508. "merged_workflow": {
  509. "type": "array",
  510. "items": {"type": "object"},
  511. "description": "A logically synthesized, unified pipeline array combining the best steps from the merged strategies."
  512. }
  513. },
  514. "required": ["master_id", "duplicate_ids", "merged_workflow"]
  515. }
  516. }
  517. },
  518. "required": ["merges"]
  519. }
  520. }
  521. }]
  522. prompt_template = (
  523. "你是一个资深人工智能工作流引擎架构师。下面是一批疑似重合的候选「执行工序(Strategy)」簇。\n"
  524. "每个簇里包含多个工作流定义,请极其苛刻地审视它们的步骤顺序与逻辑意图。\n"
  525. "如果多个工作流指挥的是一段**从根本上完全一样的核心处理链路**(即使修饰词、举例、参数或个别无关紧要的废话步骤有别),你也**应该**将它们合并。\n"
  526. "注意:如果同一个簇里的工序代表了截然不同的概念流派分支,你可以分别输出多组合并(或直接决绝合并,不返回内容)。\n"
  527. "对于同意合并的组:指定一个 master_id、列出 duplicate_ids,并提炼一个 `merged_workflow`(合并后的工作流数组)。\n"
  528. "在这个 `merged_workflow` 里,请存放经过你清洗、压缩、取长补短后所形成的最干货、最权威的完整操作指令步骤。\n\n"
  529. "候选工序簇:\n{payload_json}"
  530. )
  531. BATCH_SIZE = 8
  532. batches = [payload[i:i + BATCH_SIZE] for i in range(0, len(payload), BATCH_SIZE)]
  533. print(" Calling %s in %d batches..." % (model, len(batches)))
  534. async def process_batch(batch):
  535. prompt = prompt_template.format(payload_json=json.dumps(batch, ensure_ascii=False, indent=2))
  536. try:
  537. resp = await openrouter_llm_call(messages=[{"role": "user", "content": prompt}], model=model, tools=strat_schema, tool_choice="required", max_tokens=4096)
  538. t_calls = resp.get("tool_calls")
  539. if not t_calls: return []
  540. args = json.loads(t_calls[0]["function"]["arguments"])
  541. return args.get("merges", [])
  542. except Exception as e:
  543. print(" [Error] LLM batch failed: %s" % e)
  544. return []
  545. results = await asyncio.gather(*(process_batch(b) for b in batches))
  546. merges = []
  547. total_merges = 0
  548. for llm_merges in results:
  549. total_merges += len(llm_merges)
  550. for m in llm_merges:
  551. merges.append({
  552. "enabled": True,
  553. "master": {
  554. "id": m["master_id"],
  555. "merged_workflow": m.get("merged_workflow", [])
  556. },
  557. "duplicates": [{"id": d_id} for d_id in m["duplicate_ids"]],
  558. "merged_result": "Keep master, update workflow body, reparent links"
  559. })
  560. print(" LLM suggested %d total merge groups across all batches." % total_merges)
  561. return merges
  562. # ─── Main ─────────────────────────────────────────────────────────────────────
  563. async def run_analysis(res_store, cap_store, strat_store, req_store, tool_store, caps_only=False):
  564. plan = {}
  565. plan["tools"] = []
  566. plan["cases"] = []
  567. plan["capabilities"] = await plan_cap_dedup(cap_store)
  568. if not caps_only:
  569. plan["strategies"] = await plan_strategy_dedup(strat_store)
  570. else:
  571. plan["strategies"] = []
  572. return plan
  573. def execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run: bool):
  574. tag = "[DRY-RUN] " if dry_run else ""
  575. # 先跳过 tools 和 cases
  576. plan["tools"] = []
  577. plan["cases"] = []
  578. print("\n%s=== Applying Tool Merges & Synthesized Fields ===" % tag)
  579. tool_merges = plan.get("tools", [])
  580. if tool_merges:
  581. cur = tool_store._get_cursor()
  582. try:
  583. for m in tool_merges:
  584. if not m.get("enabled", True): continue
  585. st = m["master"].get("merged_state")
  586. if st:
  587. print(" Update fields for %s -> %s" % (m["master"]["id"], st.get("name")))
  588. if not dry_run:
  589. cur.execute(
  590. "UPDATE tool SET name = COALESCE(%s, name), introduction = COALESCE(%s, introduction), tutorial = COALESCE(%s, tutorial) WHERE id = %s",
  591. (st.get("name"), st.get("introduction"), st.get("tutorial"), m["master"]["id"])
  592. )
  593. if not dry_run:
  594. tool_store.conn.commit()
  595. finally:
  596. cur.close()
  597. apply_merges(tool_store, "tool", [
  598. ("capability_tool", "tool_id"),
  599. ("tool_knowledge", "tool_id"),
  600. ("tool_provider", "tool_id"),
  601. ], tool_merges, dry_run)
  602. print("\n%s=== Applying Case Merges ===" % tag)
  603. apply_merges(res_store, "resource", [
  604. ("requirement_resource", "resource_id"),
  605. ("strategy_resource", "resource_id"),
  606. ("capability_resource", "resource_id"),
  607. ], plan.get("cases", []), dry_run)
  608. print("\n%s=== Applying Capability Merges & Synthesized Fields ===" % tag)
  609. cap_merges = plan.get("capabilities", [])
  610. if cap_merges:
  611. cur = cap_store._get_cursor()
  612. try:
  613. for m in cap_merges:
  614. if not m.get("enabled", True): continue
  615. m_desc = m["master"].get("merged_description")
  616. s_effs = m["master"].get("synthesized_effects")
  617. s_imps = m["master"].get("synthesized_implements")
  618. s_crit = m["master"].get("synthesized_criterion")
  619. if m_desc or s_effs or s_crit:
  620. print(" Update %s -> new fields" % m["master"]["id"])
  621. if not dry_run:
  622. cur.execute("UPDATE capability SET description = COALESCE(%s, description), effects = COALESCE(%s::jsonb, effects), criterion = COALESCE(%s, criterion) WHERE id = %s",
  623. (m_desc, json.dumps(s_effs, ensure_ascii=False) if s_effs else None, json.dumps(s_crit, ensure_ascii=False) if s_crit else None, m["master"]["id"]))
  624. # implements 是关联表 capability_tool 中的 description
  625. if s_imps:
  626. for t_id, desc in s_imps.items():
  627. cur.execute("UPDATE capability_tool SET description = %s WHERE tool_id = %s AND capability_id IN %s",
  628. (desc, t_id, tuple([m["master"]["id"]] + [d["id"] for d in m.get("duplicates", [])])))
  629. if not dry_run:
  630. cap_store.conn.commit()
  631. finally:
  632. cur.close()
  633. apply_merges(cap_store, "capability", [
  634. ("requirement_capability", "capability_id"),
  635. ("strategy_capability", "capability_id"),
  636. ("capability_tool", "capability_id"),
  637. ("capability_knowledge", "capability_id"),
  638. ("capability_resource", "capability_id"),
  639. ], plan.get("capabilities", []), dry_run)
  640. print("\n%s=== Applying Strategy Merges & Workflows ===" % tag)
  641. strat_merges = plan.get("strategies", [])
  642. if strat_merges:
  643. cur = strat_store._get_cursor()
  644. try:
  645. for m in strat_merges:
  646. if not m.get("enabled", True): continue
  647. m_wf = m["master"].get("merged_workflow")
  648. if m_wf:
  649. print(" Update %s -> new workflow array" % m["master"]["id"])
  650. if not dry_run:
  651. # Strategy body is a jsonb. We update body -> 'workflow'
  652. cur.execute("""
  653. UPDATE strategy
  654. SET body = jsonb_set(body, '{workflow}', %s::jsonb)
  655. WHERE id = %s
  656. """, (json.dumps(m_wf, ensure_ascii=False), m["master"]["id"]))
  657. if not dry_run:
  658. strat_store.conn.commit()
  659. finally:
  660. cur.close()
  661. apply_merges(strat_store, "strategy", [
  662. ("requirement_strategy", "strategy_id"),
  663. ("strategy_capability", "strategy_id"),
  664. ("strategy_knowledge", "strategy_id"),
  665. ("strategy_resource", "strategy_id"),
  666. ], plan.get("strategies", []), dry_run)
  667. async def main():
  668. parser = argparse.ArgumentParser(description="Semantic dedup for KnowHub DB")
  669. parser.add_argument("--dry-run", action="store_true", help="Analyze only, save plan, do not write DB")
  670. parser.add_argument("--execute", action="store_true", help="Execute saved dedup_plan.json without re-analyzing")
  671. parser.add_argument("--caps-only", action="store_true", help="Only run capability deduplication and skip strategies")
  672. args = parser.parse_args()
  673. print("Connecting to DB...")
  674. res_store = PostgreSQLResourceStore()
  675. cap_store = PostgreSQLCapabilityStore()
  676. strat_store = PostgreSQLStrategyStore()
  677. req_store = PostgreSQLRequirementStore()
  678. tool_store = PostgreSQLToolStore()
  679. try:
  680. if args.execute:
  681. if not PLAN_FILE.exists():
  682. print("ERROR: dedup_plan.json not found. Run --dry-run first.")
  683. return
  684. print("Loading plan from %s ..." % PLAN_FILE)
  685. plan = json.loads(PLAN_FILE.read_text(encoding="utf-8"))
  686. execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=False)
  687. print("\nAll merges applied.")
  688. else:
  689. plan = await run_analysis(res_store, cap_store, strat_store, req_store, tool_store, caps_only=args.caps_only)
  690. PLAN_FILE.write_text(json.dumps(plan, ensure_ascii=False, indent=2), encoding="utf-8")
  691. print("\nPlan saved to: %s" % PLAN_FILE)
  692. total_merges = sum(len(plan.get(k, [])) for k in ("tools", "cases", "capabilities", "strategies"))
  693. print("Total merge groups planned: %d" % total_merges)
  694. if args.dry_run:
  695. print("\n[DRY-RUN] No DB changes made. Review dedup_plan.json then run --execute.")
  696. execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=True)
  697. else:
  698. execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=False)
  699. print("\nAll merges applied.")
  700. finally:
  701. res_store.close()
  702. cap_store.close()
  703. strat_store.close()
  704. req_store.close()
  705. tool_store.close()
  706. if __name__ == "__main__":
  707. asyncio.run(main())