""" dedup_db_records.py 语义去重:原子能力 / 制作策略 / 案例素材 用法: --dry-run 分析 + 生成 dedup_plan.json,不写库 --execute 读取 dedup_plan.json 并应用(不重新调用 LLM) (无参数) 分析 + 直接执行(不保存 plan) 去重逻辑: Pass 1 - Case: difflib title 相似度 > 80% → 合并 Pass 2 - Capability: difflib name 预筛 (>60%) → Qwen3.5-plus 精判 Pass 3 - Strategy: 对 capability_id 集合的 Jaccard 相似度 > 70% → 合并 (Strategy 去重必须在 Capability 去重之后,因依赖其 ID) """ import os, sys, json, asyncio, argparse, difflib from pathlib import Path repo_root = str(Path(__file__).parent.parent.parent.parent) if repo_root not in sys.path: sys.path.insert(0, repo_root) from dotenv import load_dotenv load_dotenv() from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore from knowhub.knowhub_db.pg_tool_store import PostgreSQLToolStore from agent.llm.openrouter import openrouter_llm_call PLAN_FILE = Path(__file__).parent / "dedup_plan.json" # ─── helpers ───────────────────────────────────────────────────────────────── def sim(a: str, b: str) -> float: return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() def apply_merges(store, entity_table: str, junction_tables: list, merges: list, dry_run: bool): """junction_tables: [(table_name, foreign_key_col), ...] Supports both flat {master_id, duplicate_ids[]} and enriched {master:{id}, duplicates:[{id}]} formats. """ if not merges: return cur = store._get_cursor() try: for merge in merges: # Support both flat and enriched plan formats if "master_id" in merge: master_id = merge["master_id"] dup_ids = merge.get("duplicate_ids", []) else: master_id = merge["master"]["id"] dup_ids = [d["id"] for d in merge.get("duplicates", [])] if not merge.get("enabled", True): print(" [SKIP disabled] master=%s" % master_id) continue for dup_id in dup_ids: print(" Merge %s -> %s" % (dup_id, master_id)) if dry_run: continue for (jtable, fkcol) in junction_tables: # INSERT master rows (skip existing) cur.execute("SELECT * FROM " + jtable + " WHERE " + fkcol + " = %s", (dup_id,)) rows = cur.fetchall() for row in rows: try: cols = [desc[0] for desc in cur.description] new_vals = [] for c, v in zip(cols, row): new_vals.append(master_id if c == fkcol else v) placeholders = ",".join(["%s"] * len(cols)) cur.execute( "INSERT INTO " + jtable + " (" + ",".join(cols) + ") VALUES (" + placeholders + ") ON CONFLICT DO NOTHING", new_vals ) except Exception: store.conn.rollback() cur.execute("DELETE FROM " + jtable + " WHERE " + fkcol + " = %s", (dup_id,)) # Instead of deleting, mark them as deprecated based on user request if entity_table == "resource": cur.execute("UPDATE " + entity_table + " SET title = COALESCE(title, '') || '-已废弃' WHERE id = %s AND COALESCE(title, '') NOT LIKE '%%-已废弃'", (dup_id,)) else: cur.execute("UPDATE " + entity_table + " SET name = COALESCE(name, '') || '-已废弃' WHERE id = %s AND COALESCE(name, '') NOT LIKE '%%-已废弃'", (dup_id,)) if not dry_run: store.conn.commit() finally: cur.close() # ─── Pass 0: Tools ──────────────────────────────────────────────────────────────── async def plan_tool_dedup(tool_store, model="anthropic/claude-sonnet-4.5") -> list: """ Deduplicate tools using LLM to catch aliases (e.g. "PS" vs "Photoshop"). """ print("\n[Pass 0] Tool dedup (LLM semantic batching)...") tools = tool_store.list_all(limit=10000) print(" Total tools: %d" % len(tools)) payload = [] # Deduplicate exact name matching locally first to reduce LLM load name_map = {} exact_merges = [] for t in tools: t_name = t.get("name") or t["id"] t_lower = t_name.strip().lower() full_tool = { "id": t["id"], "name": t_name, "introduction": t.get("introduction", ""), "tutorial": t.get("tutorial", "") } if t_lower in name_map: exact_merges.append((name_map[t_lower], t)) else: name_map[t_lower] = t payload.append(full_tool) print(" Unique entities to send to LLM: %d" % len(payload)) tools_schema = [{ "type": "function", "function": { "name": "submit_tool_merges", "description": "Submit semantically identical tool merge groups along with their synthesized complete field states", "parameters": { "type": "object", "properties": { "merges": { "type": "array", "items": { "type": "object", "properties": { "master_id": {"type": "string", "description": "ID of the tool chosen to be the master"}, "duplicate_ids": {"type": "array", "items": {"type": "string"}, "description": "IDs of the duplicate tools"}, "merged_state": { "type": "object", "description": "The ultimate merged state of this tool, combining the best information from all duplicates.", "properties": { "name": {"type": "string", "description": "The standardized, optimal industry-consensus name"}, "introduction": {"type": "string", "description": "Combined and normalized introduction text"}, "tutorial": {"type": "string", "description": "Combined tutorial text if any"} }, "required": ["name"] } }, "required": ["master_id", "duplicate_ids", "merged_state"] } } }, "required": ["merges"] } } }] prompt = ( "你是一个 AI 工具注册表维护专家。下面是一个软件/工具实体的列表。\n" "你的任务是找出所有因为别名、缩写或微小拼写差异(例如 'PS' 和 'Photoshop')造成的重复工具条目。\n" "对于每一组极其肯定为重复的项,请指定一个 master_id(保留的主干),列出 duplicate_ids(待废弃的重复项),并合成一个强大的 `merged_state` 对象。\n" "这个 `merged_state` 必须智能地吸收并结合它们各自的描述与教程信息,并敲定最官方权威的工具名称 `name`。\n\n" "工具列表:\n" + json.dumps(payload, ensure_ascii=False, indent=2) ) print(" Calling %s..." % model) try: resp = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model=model, tools=tools_schema, tool_choice="required", max_tokens=4096 ) tool_calls = resp.get("tool_calls", []) except Exception as e: print(" LLM call failed: %s" % e) tool_calls = [] merges = [] # Add the local exact matches first exact_groups = {} for master, dup in exact_merges: exact_groups.setdefault(master["id"], {"master": master, "dups": []})["dups"].append(dup) for m_id, grp in exact_groups.items(): merges.append({ "enabled": True, "master": {"id": m_id, "name": grp["master"]["name"]}, "duplicates": [{"id": d["id"], "name": d["name"], "match_reason": "exact_name_local"} for d in grp["dups"]], "merged_result": "Keep master, redirect relations" }) # Add the LLM matches if tool_calls: try: args = json.loads(tool_calls[0]["function"]["arguments"]) llm_merges = args.get("merges", []) print(" LLM returned %d merge groups." % len(llm_merges)) # map back to names id_name_map = {t["id"]: t["name"] for t in payload} for m in llm_merges: m_id = m["master_id"] d_ids = m["duplicate_ids"] m_state = m.get("merged_state", {}) if m_id not in id_name_map: continue dups = [] for d_id in d_ids: if d_id in id_name_map and d_id != m_id: dups.append({"id": d_id, "name": id_name_map[d_id], "match_reason": "llm_semantic"}) if dups: merges.append({ "enabled": True, "master": {"id": m_id, "name": id_name_map[m_id], "merged_state": m_state}, "duplicates": dups, "merged_result": "Keep master, update fields to merged_state, redirect relations" }) except Exception as e: print(" Error parsing LLM response:", e) print(" Total Tool clusters to merge: %d" % len(merges)) return merges # ─── Pass 1: Cases ──────────────────────────────────────────────────────────── def plan_case_dedup(res_store) -> list: print("\n[Pass 1] Case dedup (source_url exact + title fuzzy >80%%)...") cases = res_store.list_resources(content_type="case", limit=10000) print(" Total cases: %d" % len(cases)) def get_url(c): import json as _json meta = c.get("metadata") or {} if isinstance(meta, str): try: meta = _json.loads(meta) except Exception: meta = {} return (meta.get("source_url") or "").strip() processed = set() merges = [] # Step 1: Exact URL match url_groups = {} for c in cases: url = get_url(c) if url: url_groups.setdefault(url, []).append(c) for url, group in url_groups.items(): if len(group) < 2: continue master = group[0] dups = [] for b in group[1:]: if b["id"] in processed or master["id"] in processed: continue dups.append({"id": b["id"], "title": b.get("title", ""), "match_reason": "exact_url", "similarity": 1.0, "source_url": url}) processed.add(b["id"]) if dups: processed.add(master["id"]) merges.append({ "enabled": True, "master": {"id": master["id"], "title": master.get("title", ""), "source_url": url}, "duplicates": dups, "merged_result": "Keep master, redirect all relations to master id" }) # Step 2: Title fuzzy match fallback for i, a in enumerate(cases): if a["id"] in processed: continue dups = [] for j in range(i + 1, len(cases)): b = cases[j] if b["id"] in processed: continue score = sim(a.get("title", ""), b.get("title", "")) if score > 0.80: dups.append({"id": b["id"], "title": b.get("title", ""), "match_reason": "title_fuzzy", "similarity": round(score, 3)}) processed.add(b["id"]) if dups: processed.add(a["id"]) merges.append({ "enabled": True, "master": {"id": a["id"], "title": a.get("title", ""), "source_url": get_url(a)}, "duplicates": dups, "merged_result": "Keep master, redirect all relations to master id" }) url_m = sum(1 for m in merges if any(d.get("match_reason") == "exact_url" for d in m["duplicates"])) print(" URL exact: %d groups, Title fuzzy: %d groups" % (url_m, len(merges) - url_m)) return merges # ─── Pass 2: Capabilities ───────────────────────────────────────────────────── async def plan_cap_dedup(cap_store, model="anthropic/claude-sonnet-4.5") -> list: print("\n[Pass 2] Capability dedup (LLM Global Clustering + LLM Detailed Merging)...") caps = cap_store.list_all(limit=10000) print(" Total capabilities: %d" % len(caps)) # Phase 1: Global LLM Clustering print(" Running Phase 1: Global capability clustering via LLM...") minimal_caps = [{"id": c["id"], "name": c.get("name", "")} for c in caps] cluster_schema = [{ "type": "function", "function": { "name": "generate_candidate_clusters", "description": "Group identical or highly similar capability IDs together.", "parameters": { "type": "object", "properties": { "clusters": { "type": "array", "items": { "type": "array", "items": {"type": "string", "description": "capability ID"} } } }, "required": ["clusters"] } } }] cluster_prompt = ( "你是一个顶级的全景归类AI。下面是数据库中现存的所有「原子能力」极简清单。\n" "你的任务是仔细阅读它们的名字,找出所有**在业务意义上完全相同或可以合并**的能力,哪怕它们的字面长短不一、用词差异极大。\n" "将那些应当被认为是完全重复或高度同质化的原子能力ID打包在一起,形成聚类数组。\n" "注意:只有包含 2 个或以上名字的能力圈子才需要返回。你必须尽可能不遗漏任何有合并价值的核心节点。\n\n" "能力清单:\n" + json.dumps(minimal_caps, ensure_ascii=False) ) try: resp = await openrouter_llm_call( messages=[{"role": "user", "content": cluster_prompt}], model=model, tools=cluster_schema, tool_choice="required", max_tokens=8192 ) t_calls = resp.get("tool_calls") if not t_calls: print(" Phase 1 returned no tool calls/clusters.") return [] args = json.loads(t_calls[0]["function"]["arguments"]) cluster_ids = args.get("clusters", []) except Exception as e: print(" [Error] Phase 1 Global Clustering failed: %s" % e) return [] print(" Phase 1 raw LLM clustering returned %d clusters." % len(cluster_ids)) # Map IDs back to detailed capability objects caps_map = {c["id"]: c for c in caps} candidate_clusters = [] for id_list in cluster_ids: real_objs = [caps_map[cid] for cid in id_list if cid in caps_map] if len(real_objs) > 1: candidate_clusters.append(real_objs) print(" Valid candidate clusters hydrated: %d (total items: %d)" % ( len(candidate_clusters), sum(len(c) for c in candidate_clusters))) if not candidate_clusters: return [] print(" Running Phase 2: Detailed merging of candidate clusters...") # Build payload for Qwen - candidate clusters equipped with details payload = [] for cluster in candidate_clusters: grp = [] for c in cluster: effects_str = c.get("effects") or "[]" try: if isinstance(effects_str, str): effects_obj = json.loads(effects_str) else: effects_obj = effects_str except: effects_obj = [] implements_str = c.get("implements") or "{}" try: if isinstance(implements_str, str): imps_obj = json.loads(implements_str) else: imps_obj = implements_str except: imps_obj = {} criterion_str = c.get("criterion") or "[]" try: if isinstance(criterion_str, str): crit_obj = json.loads(criterion_str) else: crit_obj = criterion_str except: crit_obj = [] grp.append({ "id": c["id"], "name": c.get("name", ""), "description": c.get("description", ""), "effects": effects_obj, "implements": imps_obj, "criterion": crit_obj }) payload.append(grp) tools_schema = [{ "type": "function", "function": { "name": "submit_capability_merges", "description": "Submit semantically IDENTICAL capability merge groups, complete with synthesized fields", "parameters": { "type": "object", "properties": { "merges": { "type": "array", "items": { "type": "object", "properties": { "master_id": {"type": "string"}, "duplicate_ids": {"type": "array", "items": {"type": "string"}}, "merged_description": {"type": "string", "description": "A comprehensive, combined description integrating details from all merged capabilities."}, "synthesized_effects": { "type": "array", "items": {"type": "object"}, "description": "A deduplicated, logically combined array of all effects from the master and duplicates." }, "synthesized_implements": { "type": "object", "description": "A deduplicated, logically combined dictionary map of all implements/parameters from the master and duplicates." }, "synthesized_criterion": { "type": "array", "items": {"type": "object"}, "description": "A deduplicated, logically combined array of evaluation criteria from the master and duplicates." } }, "required": ["master_id", "duplicate_ids", "merged_description", "synthesized_effects", "synthesized_implements", "synthesized_criterion"] } } }, "required": ["merges"] } } }] prompt_template = ( "你是一个高级 AI 知识图谱工程师。下面是若干组经过粗筛的候选「原子能力」簇。\n" "请仔细分析每个簇中的能力,判断它们是否属于语义和操作本质上完全相同的原子动作。\n" "如果它们的核心本质和底层操作逻辑根本就是一回事(即使举例或关联对象有微小差异),你也**必须**将它们合并!\n" "对于每决定好的一组合并:选择一个最佳的 master_id,将其他的塞进 duplicate_ids。\n" "核心任务:你必须亲自操刀,合成一份大一统综合描述 `merged_description`;\n" "生成一个去重后的影响力效果数组 `synthesized_effects`;\n" "生成一个整合了所有实现级参数与独立用法的字典 `synthesized_implements`;\n" "最后,生成一个保留全部评判标准的去重评估数组 `synthesized_criterion`。\n\n" "候选原子能力簇:\n{payload_json}" ) BATCH_SIZE = 5 batches = [payload[i:i + BATCH_SIZE] for i in range(0, len(payload), BATCH_SIZE)] print(" Calling %s in %d batches..." % (model, len(batches))) async def process_batch(batch): prompt = prompt_template.format(payload_json=json.dumps(batch, ensure_ascii=False, indent=2)) try: resp = await openrouter_llm_call(messages=[{"role": "user", "content": prompt}], model=model, tools=tools_schema, tool_choice="required", max_tokens=4096) t_calls = resp.get("tool_calls") if not t_calls: return [] args = json.loads(t_calls[0]["function"]["arguments"]) return args.get("merges", []) except Exception as e: print(" [Error] LLM batch failed: %s" % e) return [] results = await asyncio.gather(*(process_batch(b) for b in batches)) final_merges = [] total_merges = 0 for merges in results: total_merges += len(merges) for m in merges: final_merges.append({ "enabled": True, "master": { "id": m["master_id"], "merged_description": m.get("merged_description", ""), "synthesized_effects": m.get("synthesized_effects", []), "synthesized_implements": m.get("synthesized_implements", {}), "synthesized_criterion": m.get("synthesized_criterion", []) }, "duplicates": [{"id": d_id} for d_id in m["duplicate_ids"]], "merged_result": "Keep master, update fields, redirect relations" }) print(" LLM suggested %d total merge groups across all batches." % total_merges) return final_merges # ─── Pass 3: Strategies (Jaccard on capability_ids) ─────────────────────────── async def plan_strategy_dedup(strat_store, model="anthropic/claude-sonnet-4.5", threshold=0.45) -> list: print("\n[Pass 3] Strategy dedup (Jaccard > %.0f%% + LLM workflow refinement)..." % (threshold * 100)) strats = strat_store.list_all(limit=10000) print(" Total strategies: %d" % len(strats)) cap_sets = {} for s in strats: ids = set(s.get("capability_ids") or []) if ids: cap_sets[s["id"]] = ids processed = set() candidate_clusters = [] strat_list = [s for s in strats if s["id"] in cap_sets] # Pre-filter clusters for i, a in enumerate(strat_list): if a["id"] in processed: continue cluster = [a] sa = cap_sets[a["id"]] for j in range(i + 1, len(strat_list)): b = strat_list[j] if b["id"] in processed: continue sb = cap_sets[b["id"]] if not sa and not sb: continue jaccard = len(sa & sb) / len(sa | sb) if jaccard >= threshold: cluster.append(b) processed.add(b["id"]) if len(cluster) > 1: processed.add(a["id"]) candidate_clusters.append(cluster) print(" Candidate strategy clusters (Jaccard > %.0f%%): %d" % (threshold * 100, len(candidate_clusters))) if not candidate_clusters: return [] payload = [] for cluster in candidate_clusters: grp = [] for s in cluster: body_str = s.get("body") or "{}" try: if isinstance(body_str, str): b_obj = json.loads(body_str) else: b_obj = body_str except: b_obj = {"_raw": body_str} # Extract just the steps to save tokens (user requested mainly description) workflow = b_obj.get("workflow", []) condensed_workflow = [{"phase": w.get("phase", ""), "description": w.get("description", "")} for w in workflow] grp.append({ "id": s["id"], "name": s.get("name", "Unnamed Strategy"), "workflow_summary": condensed_workflow }) payload.append(grp) strat_schema = [{ "type": "function", "function": { "name": "submit_strategy_merges", "description": "Submit semantically identical strategy workflows, consolidating their step instructions.", "parameters": { "type": "object", "properties": { "merges": { "type": "array", "items": { "type": "object", "properties": { "master_id": {"type": "string"}, "duplicate_ids": {"type": "array", "items": {"type": "string"}}, "merged_workflow": { "type": "array", "items": {"type": "object"}, "description": "A logically synthesized, unified pipeline array combining the best steps from the merged strategies." } }, "required": ["master_id", "duplicate_ids", "merged_workflow"] } } }, "required": ["merges"] } } }] prompt_template = ( "你是一个资深人工智能工作流引擎架构师。下面是一批疑似重合的候选「执行工序(Strategy)」簇。\n" "每个簇里包含多个工作流定义,请极其苛刻地审视它们的步骤顺序与逻辑意图。\n" "如果多个工作流指挥的是一段**从根本上完全一样的核心处理链路**(即使修饰词、举例、参数或个别无关紧要的废话步骤有别),你也**应该**将它们合并。\n" "注意:如果同一个簇里的工序代表了截然不同的概念流派分支,你可以分别输出多组合并(或直接决绝合并,不返回内容)。\n" "对于同意合并的组:指定一个 master_id、列出 duplicate_ids,并提炼一个 `merged_workflow`(合并后的工作流数组)。\n" "在这个 `merged_workflow` 里,请存放经过你清洗、压缩、取长补短后所形成的最干货、最权威的完整操作指令步骤。\n\n" "候选工序簇:\n{payload_json}" ) BATCH_SIZE = 8 batches = [payload[i:i + BATCH_SIZE] for i in range(0, len(payload), BATCH_SIZE)] print(" Calling %s in %d batches..." % (model, len(batches))) async def process_batch(batch): prompt = prompt_template.format(payload_json=json.dumps(batch, ensure_ascii=False, indent=2)) try: resp = await openrouter_llm_call(messages=[{"role": "user", "content": prompt}], model=model, tools=strat_schema, tool_choice="required", max_tokens=4096) t_calls = resp.get("tool_calls") if not t_calls: return [] args = json.loads(t_calls[0]["function"]["arguments"]) return args.get("merges", []) except Exception as e: print(" [Error] LLM batch failed: %s" % e) return [] results = await asyncio.gather(*(process_batch(b) for b in batches)) merges = [] total_merges = 0 for llm_merges in results: total_merges += len(llm_merges) for m in llm_merges: merges.append({ "enabled": True, "master": { "id": m["master_id"], "merged_workflow": m.get("merged_workflow", []) }, "duplicates": [{"id": d_id} for d_id in m["duplicate_ids"]], "merged_result": "Keep master, update workflow body, reparent links" }) print(" LLM suggested %d total merge groups across all batches." % total_merges) return merges # ─── Main ───────────────────────────────────────────────────────────────────── async def run_analysis(res_store, cap_store, strat_store, req_store, tool_store, caps_only=False): plan = {} plan["tools"] = [] plan["cases"] = [] plan["capabilities"] = await plan_cap_dedup(cap_store) if not caps_only: plan["strategies"] = await plan_strategy_dedup(strat_store) else: plan["strategies"] = [] return plan def execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run: bool): tag = "[DRY-RUN] " if dry_run else "" # 先跳过 tools 和 cases plan["tools"] = [] plan["cases"] = [] print("\n%s=== Applying Tool Merges & Synthesized Fields ===" % tag) tool_merges = plan.get("tools", []) if tool_merges: cur = tool_store._get_cursor() try: for m in tool_merges: if not m.get("enabled", True): continue st = m["master"].get("merged_state") if st: print(" Update fields for %s -> %s" % (m["master"]["id"], st.get("name"))) if not dry_run: cur.execute( "UPDATE tool SET name = COALESCE(%s, name), introduction = COALESCE(%s, introduction), tutorial = COALESCE(%s, tutorial) WHERE id = %s", (st.get("name"), st.get("introduction"), st.get("tutorial"), m["master"]["id"]) ) if not dry_run: tool_store.conn.commit() finally: cur.close() apply_merges(tool_store, "tool", [ ("capability_tool", "tool_id"), ("tool_knowledge", "tool_id"), ("tool_provider", "tool_id"), ], tool_merges, dry_run) print("\n%s=== Applying Case Merges ===" % tag) apply_merges(res_store, "resource", [ ("requirement_resource", "resource_id"), ("strategy_resource", "resource_id"), ("capability_resource", "resource_id"), ], plan.get("cases", []), dry_run) print("\n%s=== Applying Capability Merges & Synthesized Fields ===" % tag) cap_merges = plan.get("capabilities", []) if cap_merges: cur = cap_store._get_cursor() try: for m in cap_merges: if not m.get("enabled", True): continue m_desc = m["master"].get("merged_description") s_effs = m["master"].get("synthesized_effects") s_imps = m["master"].get("synthesized_implements") s_crit = m["master"].get("synthesized_criterion") if m_desc or s_effs or s_crit: print(" Update %s -> new fields" % m["master"]["id"]) if not dry_run: cur.execute("UPDATE capability SET description = COALESCE(%s, description), effects = COALESCE(%s::jsonb, effects), criterion = COALESCE(%s, criterion) WHERE id = %s", (m_desc, json.dumps(s_effs, ensure_ascii=False) if s_effs else None, json.dumps(s_crit, ensure_ascii=False) if s_crit else None, m["master"]["id"])) # implements 是关联表 capability_tool 中的 description if s_imps: for t_id, desc in s_imps.items(): cur.execute("UPDATE capability_tool SET description = %s WHERE tool_id = %s AND capability_id IN %s", (desc, t_id, tuple([m["master"]["id"]] + [d["id"] for d in m.get("duplicates", [])]))) if not dry_run: cap_store.conn.commit() finally: cur.close() apply_merges(cap_store, "capability", [ ("requirement_capability", "capability_id"), ("strategy_capability", "capability_id"), ("capability_tool", "capability_id"), ("capability_knowledge", "capability_id"), ("capability_resource", "capability_id"), ], plan.get("capabilities", []), dry_run) print("\n%s=== Applying Strategy Merges & Workflows ===" % tag) strat_merges = plan.get("strategies", []) if strat_merges: cur = strat_store._get_cursor() try: for m in strat_merges: if not m.get("enabled", True): continue m_wf = m["master"].get("merged_workflow") if m_wf: print(" Update %s -> new workflow array" % m["master"]["id"]) if not dry_run: # Strategy body is a jsonb. We update body -> 'workflow' cur.execute(""" UPDATE strategy SET body = jsonb_set(body, '{workflow}', %s::jsonb) WHERE id = %s """, (json.dumps(m_wf, ensure_ascii=False), m["master"]["id"])) if not dry_run: strat_store.conn.commit() finally: cur.close() apply_merges(strat_store, "strategy", [ ("requirement_strategy", "strategy_id"), ("strategy_capability", "strategy_id"), ("strategy_knowledge", "strategy_id"), ("strategy_resource", "strategy_id"), ], plan.get("strategies", []), dry_run) async def main(): parser = argparse.ArgumentParser(description="Semantic dedup for KnowHub DB") parser.add_argument("--dry-run", action="store_true", help="Analyze only, save plan, do not write DB") parser.add_argument("--execute", action="store_true", help="Execute saved dedup_plan.json without re-analyzing") parser.add_argument("--caps-only", action="store_true", help="Only run capability deduplication and skip strategies") args = parser.parse_args() print("Connecting to DB...") res_store = PostgreSQLResourceStore() cap_store = PostgreSQLCapabilityStore() strat_store = PostgreSQLStrategyStore() req_store = PostgreSQLRequirementStore() tool_store = PostgreSQLToolStore() try: if args.execute: if not PLAN_FILE.exists(): print("ERROR: dedup_plan.json not found. Run --dry-run first.") return print("Loading plan from %s ..." % PLAN_FILE) plan = json.loads(PLAN_FILE.read_text(encoding="utf-8")) execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=False) print("\nAll merges applied.") else: plan = await run_analysis(res_store, cap_store, strat_store, req_store, tool_store, caps_only=args.caps_only) PLAN_FILE.write_text(json.dumps(plan, ensure_ascii=False, indent=2), encoding="utf-8") print("\nPlan saved to: %s" % PLAN_FILE) total_merges = sum(len(plan.get(k, [])) for k in ("tools", "cases", "capabilities", "strategies")) print("Total merge groups planned: %d" % total_merges) if args.dry_run: print("\n[DRY-RUN] No DB changes made. Review dedup_plan.json then run --execute.") execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=True) else: execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=False) print("\nAll merges applied.") finally: res_store.close() cap_store.close() strat_store.close() req_store.close() tool_store.close() if __name__ == "__main__": asyncio.run(main())