| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803 |
- """
- dedup_db_records.py
- 语义去重:原子能力 / 制作策略 / 案例素材
- 用法:
- --dry-run 分析 + 生成 dedup_plan.json,不写库
- --execute 读取 dedup_plan.json 并应用(不重新调用 LLM)
- (无参数) 分析 + 直接执行(不保存 plan)
- 去重逻辑:
- Pass 1 - Case: difflib title 相似度 > 80% → 合并
- Pass 2 - Capability: difflib name 预筛 (>60%) → Qwen3.5-plus 精判
- Pass 3 - Strategy: 对 capability_id 集合的 Jaccard 相似度 > 70% → 合并
- (Strategy 去重必须在 Capability 去重之后,因依赖其 ID)
- """
- import os, sys, json, asyncio, argparse, difflib
- from pathlib import Path
- repo_root = str(Path(__file__).parent.parent.parent.parent)
- if repo_root not in sys.path:
- sys.path.insert(0, repo_root)
- from dotenv import load_dotenv
- load_dotenv()
- from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
- from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
- from knowhub.knowhub_db.pg_tool_store import PostgreSQLToolStore
- from agent.llm.openrouter import openrouter_llm_call
- PLAN_FILE = Path(__file__).parent / "dedup_plan.json"
- # ─── helpers ─────────────────────────────────────────────────────────────────
- def sim(a: str, b: str) -> float:
- return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
- def apply_merges(store, entity_table: str, junction_tables: list, merges: list, dry_run: bool):
- """junction_tables: [(table_name, foreign_key_col), ...]
- Supports both flat {master_id, duplicate_ids[]} and enriched {master:{id}, duplicates:[{id}]} formats.
- """
- if not merges:
- return
- cur = store._get_cursor()
- try:
- for merge in merges:
- # Support both flat and enriched plan formats
- if "master_id" in merge:
- master_id = merge["master_id"]
- dup_ids = merge.get("duplicate_ids", [])
- else:
- master_id = merge["master"]["id"]
- dup_ids = [d["id"] for d in merge.get("duplicates", [])]
- if not merge.get("enabled", True):
- print(" [SKIP disabled] master=%s" % master_id)
- continue
- for dup_id in dup_ids:
- print(" Merge %s -> %s" % (dup_id, master_id))
- if dry_run:
- continue
- for (jtable, fkcol) in junction_tables:
- # INSERT master rows (skip existing)
- cur.execute("SELECT * FROM " + jtable + " WHERE " + fkcol + " = %s", (dup_id,))
- rows = cur.fetchall()
- for row in rows:
- try:
- cols = [desc[0] for desc in cur.description]
- new_vals = []
- for c, v in zip(cols, row):
- new_vals.append(master_id if c == fkcol else v)
- placeholders = ",".join(["%s"] * len(cols))
- cur.execute(
- "INSERT INTO " + jtable + " (" + ",".join(cols) + ") VALUES (" + placeholders + ") ON CONFLICT DO NOTHING",
- new_vals
- )
- except Exception:
- store.conn.rollback()
- cur.execute("DELETE FROM " + jtable + " WHERE " + fkcol + " = %s", (dup_id,))
-
- # Instead of deleting, mark them as deprecated based on user request
- if entity_table == "resource":
- cur.execute("UPDATE " + entity_table + " SET title = COALESCE(title, '') || '-已废弃' WHERE id = %s AND COALESCE(title, '') NOT LIKE '%%-已废弃'", (dup_id,))
- else:
- cur.execute("UPDATE " + entity_table + " SET name = COALESCE(name, '') || '-已废弃' WHERE id = %s AND COALESCE(name, '') NOT LIKE '%%-已废弃'", (dup_id,))
- if not dry_run:
- store.conn.commit()
- finally:
- cur.close()
- # ─── Pass 0: Tools ────────────────────────────────────────────────────────────────
- async def plan_tool_dedup(tool_store, model="anthropic/claude-sonnet-4.5") -> list:
- """
- Deduplicate tools using LLM to catch aliases (e.g. "PS" vs "Photoshop").
- """
- print("\n[Pass 0] Tool dedup (LLM semantic batching)...")
- tools = tool_store.list_all(limit=10000)
- print(" Total tools: %d" % len(tools))
- payload = []
- # Deduplicate exact name matching locally first to reduce LLM load
- name_map = {}
- exact_merges = []
- for t in tools:
- t_name = t.get("name") or t["id"]
- t_lower = t_name.strip().lower()
-
- full_tool = {
- "id": t["id"],
- "name": t_name,
- "introduction": t.get("introduction", ""),
- "tutorial": t.get("tutorial", "")
- }
-
- if t_lower in name_map:
- exact_merges.append((name_map[t_lower], t))
- else:
- name_map[t_lower] = t
- payload.append(full_tool)
- print(" Unique entities to send to LLM: %d" % len(payload))
- tools_schema = [{
- "type": "function",
- "function": {
- "name": "submit_tool_merges",
- "description": "Submit semantically identical tool merge groups along with their synthesized complete field states",
- "parameters": {
- "type": "object",
- "properties": {
- "merges": {
- "type": "array",
- "items": {
- "type": "object",
- "properties": {
- "master_id": {"type": "string", "description": "ID of the tool chosen to be the master"},
- "duplicate_ids": {"type": "array", "items": {"type": "string"}, "description": "IDs of the duplicate tools"},
- "merged_state": {
- "type": "object",
- "description": "The ultimate merged state of this tool, combining the best information from all duplicates.",
- "properties": {
- "name": {"type": "string", "description": "The standardized, optimal industry-consensus name"},
- "introduction": {"type": "string", "description": "Combined and normalized introduction text"},
- "tutorial": {"type": "string", "description": "Combined tutorial text if any"}
- },
- "required": ["name"]
- }
- },
- "required": ["master_id", "duplicate_ids", "merged_state"]
- }
- }
- },
- "required": ["merges"]
- }
- }
- }]
- prompt = (
- "你是一个 AI 工具注册表维护专家。下面是一个软件/工具实体的列表。\n"
- "你的任务是找出所有因为别名、缩写或微小拼写差异(例如 'PS' 和 'Photoshop')造成的重复工具条目。\n"
- "对于每一组极其肯定为重复的项,请指定一个 master_id(保留的主干),列出 duplicate_ids(待废弃的重复项),并合成一个强大的 `merged_state` 对象。\n"
- "这个 `merged_state` 必须智能地吸收并结合它们各自的描述与教程信息,并敲定最官方权威的工具名称 `name`。\n\n"
- "工具列表:\n" + json.dumps(payload, ensure_ascii=False, indent=2)
- )
- print(" Calling %s..." % model)
- try:
- resp = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model=model,
- tools=tools_schema,
- tool_choice="required",
- max_tokens=4096
- )
- tool_calls = resp.get("tool_calls", [])
- except Exception as e:
- print(" LLM call failed: %s" % e)
- tool_calls = []
- merges = []
-
- # Add the local exact matches first
- exact_groups = {}
- for master, dup in exact_merges:
- exact_groups.setdefault(master["id"], {"master": master, "dups": []})["dups"].append(dup)
-
- for m_id, grp in exact_groups.items():
- merges.append({
- "enabled": True,
- "master": {"id": m_id, "name": grp["master"]["name"]},
- "duplicates": [{"id": d["id"], "name": d["name"], "match_reason": "exact_name_local"} for d in grp["dups"]],
- "merged_result": "Keep master, redirect relations"
- })
- # Add the LLM matches
- if tool_calls:
- try:
- args = json.loads(tool_calls[0]["function"]["arguments"])
- llm_merges = args.get("merges", [])
- print(" LLM returned %d merge groups." % len(llm_merges))
-
- # map back to names
- id_name_map = {t["id"]: t["name"] for t in payload}
-
- for m in llm_merges:
- m_id = m["master_id"]
- d_ids = m["duplicate_ids"]
- m_state = m.get("merged_state", {})
- if m_id not in id_name_map: continue
-
- dups = []
- for d_id in d_ids:
- if d_id in id_name_map and d_id != m_id:
- dups.append({"id": d_id, "name": id_name_map[d_id], "match_reason": "llm_semantic"})
-
- if dups:
- merges.append({
- "enabled": True,
- "master": {"id": m_id, "name": id_name_map[m_id], "merged_state": m_state},
- "duplicates": dups,
- "merged_result": "Keep master, update fields to merged_state, redirect relations"
- })
- except Exception as e:
- print(" Error parsing LLM response:", e)
- print(" Total Tool clusters to merge: %d" % len(merges))
- return merges
- # ─── Pass 1: Cases ────────────────────────────────────────────────────────────
- def plan_case_dedup(res_store) -> list:
- print("\n[Pass 1] Case dedup (source_url exact + title fuzzy >80%%)...")
- cases = res_store.list_resources(content_type="case", limit=10000)
- print(" Total cases: %d" % len(cases))
- def get_url(c):
- import json as _json
- meta = c.get("metadata") or {}
- if isinstance(meta, str):
- try: meta = _json.loads(meta)
- except Exception: meta = {}
- return (meta.get("source_url") or "").strip()
- processed = set()
- merges = []
- # Step 1: Exact URL match
- url_groups = {}
- for c in cases:
- url = get_url(c)
- if url:
- url_groups.setdefault(url, []).append(c)
- for url, group in url_groups.items():
- if len(group) < 2:
- continue
- master = group[0]
- dups = []
- for b in group[1:]:
- if b["id"] in processed or master["id"] in processed:
- continue
- dups.append({"id": b["id"], "title": b.get("title", ""),
- "match_reason": "exact_url", "similarity": 1.0, "source_url": url})
- processed.add(b["id"])
- if dups:
- processed.add(master["id"])
- merges.append({
- "enabled": True,
- "master": {"id": master["id"], "title": master.get("title", ""), "source_url": url},
- "duplicates": dups,
- "merged_result": "Keep master, redirect all relations to master id"
- })
- # Step 2: Title fuzzy match fallback
- for i, a in enumerate(cases):
- if a["id"] in processed:
- continue
- dups = []
- for j in range(i + 1, len(cases)):
- b = cases[j]
- if b["id"] in processed:
- continue
- score = sim(a.get("title", ""), b.get("title", ""))
- if score > 0.80:
- dups.append({"id": b["id"], "title": b.get("title", ""),
- "match_reason": "title_fuzzy", "similarity": round(score, 3)})
- processed.add(b["id"])
- if dups:
- processed.add(a["id"])
- merges.append({
- "enabled": True,
- "master": {"id": a["id"], "title": a.get("title", ""), "source_url": get_url(a)},
- "duplicates": dups,
- "merged_result": "Keep master, redirect all relations to master id"
- })
- url_m = sum(1 for m in merges if any(d.get("match_reason") == "exact_url" for d in m["duplicates"]))
- print(" URL exact: %d groups, Title fuzzy: %d groups" % (url_m, len(merges) - url_m))
- return merges
- # ─── Pass 2: Capabilities ─────────────────────────────────────────────────────
- async def plan_cap_dedup(cap_store, model="anthropic/claude-sonnet-4.5") -> list:
- print("\n[Pass 2] Capability dedup (LLM Global Clustering + LLM Detailed Merging)...")
- caps = cap_store.list_all(limit=10000)
- print(" Total capabilities: %d" % len(caps))
- # Phase 1: Global LLM Clustering
- print(" Running Phase 1: Global capability clustering via LLM...")
- minimal_caps = [{"id": c["id"], "name": c.get("name", "")} for c in caps]
-
- cluster_schema = [{
- "type": "function",
- "function": {
- "name": "generate_candidate_clusters",
- "description": "Group identical or highly similar capability IDs together.",
- "parameters": {
- "type": "object",
- "properties": {
- "clusters": {
- "type": "array",
- "items": {
- "type": "array",
- "items": {"type": "string", "description": "capability ID"}
- }
- }
- },
- "required": ["clusters"]
- }
- }
- }]
-
- cluster_prompt = (
- "你是一个顶级的全景归类AI。下面是数据库中现存的所有「原子能力」极简清单。\n"
- "你的任务是仔细阅读它们的名字,找出所有**在业务意义上完全相同或可以合并**的能力,哪怕它们的字面长短不一、用词差异极大。\n"
- "将那些应当被认为是完全重复或高度同质化的原子能力ID打包在一起,形成聚类数组。\n"
- "注意:只有包含 2 个或以上名字的能力圈子才需要返回。你必须尽可能不遗漏任何有合并价值的核心节点。\n\n"
- "能力清单:\n" + json.dumps(minimal_caps, ensure_ascii=False)
- )
-
- try:
- resp = await openrouter_llm_call(
- messages=[{"role": "user", "content": cluster_prompt}],
- model=model,
- tools=cluster_schema,
- tool_choice="required",
- max_tokens=8192
- )
- t_calls = resp.get("tool_calls")
- if not t_calls:
- print(" Phase 1 returned no tool calls/clusters.")
- return []
- args = json.loads(t_calls[0]["function"]["arguments"])
- cluster_ids = args.get("clusters", [])
- except Exception as e:
- print(" [Error] Phase 1 Global Clustering failed: %s" % e)
- return []
-
- print(" Phase 1 raw LLM clustering returned %d clusters." % len(cluster_ids))
-
- # Map IDs back to detailed capability objects
- caps_map = {c["id"]: c for c in caps}
- candidate_clusters = []
- for id_list in cluster_ids:
- real_objs = [caps_map[cid] for cid in id_list if cid in caps_map]
- if len(real_objs) > 1:
- candidate_clusters.append(real_objs)
- print(" Valid candidate clusters hydrated: %d (total items: %d)" % (
- len(candidate_clusters), sum(len(c) for c in candidate_clusters)))
- if not candidate_clusters:
- return []
- print(" Running Phase 2: Detailed merging of candidate clusters...")
- # Build payload for Qwen - candidate clusters equipped with details
- payload = []
- for cluster in candidate_clusters:
- grp = []
- for c in cluster:
- effects_str = c.get("effects") or "[]"
- try:
- if isinstance(effects_str, str): effects_obj = json.loads(effects_str)
- else: effects_obj = effects_str
- except: effects_obj = []
-
- implements_str = c.get("implements") or "{}"
- try:
- if isinstance(implements_str, str): imps_obj = json.loads(implements_str)
- else: imps_obj = implements_str
- except: imps_obj = {}
-
- criterion_str = c.get("criterion") or "[]"
- try:
- if isinstance(criterion_str, str): crit_obj = json.loads(criterion_str)
- else: crit_obj = criterion_str
- except: crit_obj = []
-
- grp.append({
- "id": c["id"],
- "name": c.get("name", ""),
- "description": c.get("description", ""),
- "effects": effects_obj,
- "implements": imps_obj,
- "criterion": crit_obj
- })
- payload.append(grp)
- tools_schema = [{
- "type": "function",
- "function": {
- "name": "submit_capability_merges",
- "description": "Submit semantically IDENTICAL capability merge groups, complete with synthesized fields",
- "parameters": {
- "type": "object",
- "properties": {
- "merges": {
- "type": "array",
- "items": {
- "type": "object",
- "properties": {
- "master_id": {"type": "string"},
- "duplicate_ids": {"type": "array", "items": {"type": "string"}},
- "merged_description": {"type": "string", "description": "A comprehensive, combined description integrating details from all merged capabilities."},
- "synthesized_effects": {
- "type": "array",
- "items": {"type": "object"},
- "description": "A deduplicated, logically combined array of all effects from the master and duplicates."
- },
- "synthesized_implements": {
- "type": "object",
- "description": "A deduplicated, logically combined dictionary map of all implements/parameters from the master and duplicates."
- },
- "synthesized_criterion": {
- "type": "array",
- "items": {"type": "object"},
- "description": "A deduplicated, logically combined array of evaluation criteria from the master and duplicates."
- }
- },
- "required": ["master_id", "duplicate_ids", "merged_description", "synthesized_effects", "synthesized_implements", "synthesized_criterion"]
- }
- }
- },
- "required": ["merges"]
- }
- }
- }]
- prompt_template = (
- "你是一个高级 AI 知识图谱工程师。下面是若干组经过粗筛的候选「原子能力」簇。\n"
- "请仔细分析每个簇中的能力,判断它们是否属于语义和操作本质上完全相同的原子动作。\n"
- "如果它们的核心本质和底层操作逻辑根本就是一回事(即使举例或关联对象有微小差异),你也**必须**将它们合并!\n"
- "对于每决定好的一组合并:选择一个最佳的 master_id,将其他的塞进 duplicate_ids。\n"
- "核心任务:你必须亲自操刀,合成一份大一统综合描述 `merged_description`;\n"
- "生成一个去重后的影响力效果数组 `synthesized_effects`;\n"
- "生成一个整合了所有实现级参数与独立用法的字典 `synthesized_implements`;\n"
- "最后,生成一个保留全部评判标准的去重评估数组 `synthesized_criterion`。\n\n"
- "候选原子能力簇:\n{payload_json}"
- )
- BATCH_SIZE = 5
- batches = [payload[i:i + BATCH_SIZE] for i in range(0, len(payload), BATCH_SIZE)]
- print(" Calling %s in %d batches..." % (model, len(batches)))
- async def process_batch(batch):
- prompt = prompt_template.format(payload_json=json.dumps(batch, ensure_ascii=False, indent=2))
- try:
- resp = await openrouter_llm_call(messages=[{"role": "user", "content": prompt}], model=model, tools=tools_schema, tool_choice="required", max_tokens=4096)
- t_calls = resp.get("tool_calls")
- if not t_calls: return []
- args = json.loads(t_calls[0]["function"]["arguments"])
- return args.get("merges", [])
- except Exception as e:
- print(" [Error] LLM batch failed: %s" % e)
- return []
- results = await asyncio.gather(*(process_batch(b) for b in batches))
-
- final_merges = []
- total_merges = 0
- for merges in results:
- total_merges += len(merges)
- for m in merges:
- final_merges.append({
- "enabled": True,
- "master": {
- "id": m["master_id"],
- "merged_description": m.get("merged_description", ""),
- "synthesized_effects": m.get("synthesized_effects", []),
- "synthesized_implements": m.get("synthesized_implements", {}),
- "synthesized_criterion": m.get("synthesized_criterion", [])
- },
- "duplicates": [{"id": d_id} for d_id in m["duplicate_ids"]],
- "merged_result": "Keep master, update fields, redirect relations"
- })
-
- print(" LLM suggested %d total merge groups across all batches." % total_merges)
- return final_merges
- # ─── Pass 3: Strategies (Jaccard on capability_ids) ───────────────────────────
- async def plan_strategy_dedup(strat_store, model="anthropic/claude-sonnet-4.5", threshold=0.45) -> list:
- print("\n[Pass 3] Strategy dedup (Jaccard > %.0f%% + LLM workflow refinement)..." % (threshold * 100))
- strats = strat_store.list_all(limit=10000)
- print(" Total strategies: %d" % len(strats))
- cap_sets = {}
- for s in strats:
- ids = set(s.get("capability_ids") or [])
- if ids: cap_sets[s["id"]] = ids
- processed = set()
- candidate_clusters = []
- strat_list = [s for s in strats if s["id"] in cap_sets]
- # Pre-filter clusters
- for i, a in enumerate(strat_list):
- if a["id"] in processed: continue
- cluster = [a]
- sa = cap_sets[a["id"]]
- for j in range(i + 1, len(strat_list)):
- b = strat_list[j]
- if b["id"] in processed: continue
- sb = cap_sets[b["id"]]
- if not sa and not sb: continue
- jaccard = len(sa & sb) / len(sa | sb)
- if jaccard >= threshold:
- cluster.append(b)
- processed.add(b["id"])
- if len(cluster) > 1:
- processed.add(a["id"])
- candidate_clusters.append(cluster)
- print(" Candidate strategy clusters (Jaccard > %.0f%%): %d" % (threshold * 100, len(candidate_clusters)))
- if not candidate_clusters: return []
- payload = []
- for cluster in candidate_clusters:
- grp = []
- for s in cluster:
- body_str = s.get("body") or "{}"
- try:
- if isinstance(body_str, str): b_obj = json.loads(body_str)
- else: b_obj = body_str
- except: b_obj = {"_raw": body_str}
-
- # Extract just the steps to save tokens (user requested mainly description)
- workflow = b_obj.get("workflow", [])
- condensed_workflow = [{"phase": w.get("phase", ""), "description": w.get("description", "")} for w in workflow]
-
- grp.append({
- "id": s["id"],
- "name": s.get("name", "Unnamed Strategy"),
- "workflow_summary": condensed_workflow
- })
- payload.append(grp)
- strat_schema = [{
- "type": "function",
- "function": {
- "name": "submit_strategy_merges",
- "description": "Submit semantically identical strategy workflows, consolidating their step instructions.",
- "parameters": {
- "type": "object",
- "properties": {
- "merges": {
- "type": "array",
- "items": {
- "type": "object",
- "properties": {
- "master_id": {"type": "string"},
- "duplicate_ids": {"type": "array", "items": {"type": "string"}},
- "merged_workflow": {
- "type": "array",
- "items": {"type": "object"},
- "description": "A logically synthesized, unified pipeline array combining the best steps from the merged strategies."
- }
- },
- "required": ["master_id", "duplicate_ids", "merged_workflow"]
- }
- }
- },
- "required": ["merges"]
- }
- }
- }]
- prompt_template = (
- "你是一个资深人工智能工作流引擎架构师。下面是一批疑似重合的候选「执行工序(Strategy)」簇。\n"
- "每个簇里包含多个工作流定义,请极其苛刻地审视它们的步骤顺序与逻辑意图。\n"
- "如果多个工作流指挥的是一段**从根本上完全一样的核心处理链路**(即使修饰词、举例、参数或个别无关紧要的废话步骤有别),你也**应该**将它们合并。\n"
- "注意:如果同一个簇里的工序代表了截然不同的概念流派分支,你可以分别输出多组合并(或直接决绝合并,不返回内容)。\n"
- "对于同意合并的组:指定一个 master_id、列出 duplicate_ids,并提炼一个 `merged_workflow`(合并后的工作流数组)。\n"
- "在这个 `merged_workflow` 里,请存放经过你清洗、压缩、取长补短后所形成的最干货、最权威的完整操作指令步骤。\n\n"
- "候选工序簇:\n{payload_json}"
- )
- BATCH_SIZE = 8
- batches = [payload[i:i + BATCH_SIZE] for i in range(0, len(payload), BATCH_SIZE)]
- print(" Calling %s in %d batches..." % (model, len(batches)))
- async def process_batch(batch):
- prompt = prompt_template.format(payload_json=json.dumps(batch, ensure_ascii=False, indent=2))
- try:
- resp = await openrouter_llm_call(messages=[{"role": "user", "content": prompt}], model=model, tools=strat_schema, tool_choice="required", max_tokens=4096)
- t_calls = resp.get("tool_calls")
- if not t_calls: return []
- args = json.loads(t_calls[0]["function"]["arguments"])
- return args.get("merges", [])
- except Exception as e:
- print(" [Error] LLM batch failed: %s" % e)
- return []
- results = await asyncio.gather(*(process_batch(b) for b in batches))
- merges = []
- total_merges = 0
- for llm_merges in results:
- total_merges += len(llm_merges)
- for m in llm_merges:
- merges.append({
- "enabled": True,
- "master": {
- "id": m["master_id"],
- "merged_workflow": m.get("merged_workflow", [])
- },
- "duplicates": [{"id": d_id} for d_id in m["duplicate_ids"]],
- "merged_result": "Keep master, update workflow body, reparent links"
- })
-
- print(" LLM suggested %d total merge groups across all batches." % total_merges)
- return merges
- # ─── Main ─────────────────────────────────────────────────────────────────────
- async def run_analysis(res_store, cap_store, strat_store, req_store, tool_store, caps_only=False):
- plan = {}
- plan["tools"] = []
- plan["cases"] = []
- plan["capabilities"] = await plan_cap_dedup(cap_store)
- if not caps_only:
- plan["strategies"] = await plan_strategy_dedup(strat_store)
- else:
- plan["strategies"] = []
- return plan
- def execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run: bool):
- tag = "[DRY-RUN] " if dry_run else ""
- # 先跳过 tools 和 cases
- plan["tools"] = []
- plan["cases"] = []
- print("\n%s=== Applying Tool Merges & Synthesized Fields ===" % tag)
- tool_merges = plan.get("tools", [])
- if tool_merges:
- cur = tool_store._get_cursor()
- try:
- for m in tool_merges:
- if not m.get("enabled", True): continue
- st = m["master"].get("merged_state")
- if st:
- print(" Update fields for %s -> %s" % (m["master"]["id"], st.get("name")))
- if not dry_run:
- cur.execute(
- "UPDATE tool SET name = COALESCE(%s, name), introduction = COALESCE(%s, introduction), tutorial = COALESCE(%s, tutorial) WHERE id = %s",
- (st.get("name"), st.get("introduction"), st.get("tutorial"), m["master"]["id"])
- )
- if not dry_run:
- tool_store.conn.commit()
- finally:
- cur.close()
- apply_merges(tool_store, "tool", [
- ("capability_tool", "tool_id"),
- ("tool_knowledge", "tool_id"),
- ("tool_provider", "tool_id"),
- ], tool_merges, dry_run)
- print("\n%s=== Applying Case Merges ===" % tag)
- apply_merges(res_store, "resource", [
- ("requirement_resource", "resource_id"),
- ("strategy_resource", "resource_id"),
- ("capability_resource", "resource_id"),
- ], plan.get("cases", []), dry_run)
- print("\n%s=== Applying Capability Merges & Synthesized Fields ===" % tag)
- cap_merges = plan.get("capabilities", [])
- if cap_merges:
- cur = cap_store._get_cursor()
- try:
- for m in cap_merges:
- if not m.get("enabled", True): continue
- m_desc = m["master"].get("merged_description")
- s_effs = m["master"].get("synthesized_effects")
- s_imps = m["master"].get("synthesized_implements")
- s_crit = m["master"].get("synthesized_criterion")
- if m_desc or s_effs or s_crit:
- print(" Update %s -> new fields" % m["master"]["id"])
- if not dry_run:
- cur.execute("UPDATE capability SET description = COALESCE(%s, description), effects = COALESCE(%s::jsonb, effects), criterion = COALESCE(%s, criterion) WHERE id = %s",
- (m_desc, json.dumps(s_effs, ensure_ascii=False) if s_effs else None, json.dumps(s_crit, ensure_ascii=False) if s_crit else None, m["master"]["id"]))
-
- # implements 是关联表 capability_tool 中的 description
- if s_imps:
- for t_id, desc in s_imps.items():
- cur.execute("UPDATE capability_tool SET description = %s WHERE tool_id = %s AND capability_id IN %s",
- (desc, t_id, tuple([m["master"]["id"]] + [d["id"] for d in m.get("duplicates", [])])))
- if not dry_run:
- cap_store.conn.commit()
- finally:
- cur.close()
- apply_merges(cap_store, "capability", [
- ("requirement_capability", "capability_id"),
- ("strategy_capability", "capability_id"),
- ("capability_tool", "capability_id"),
- ("capability_knowledge", "capability_id"),
- ("capability_resource", "capability_id"),
- ], plan.get("capabilities", []), dry_run)
- print("\n%s=== Applying Strategy Merges & Workflows ===" % tag)
- strat_merges = plan.get("strategies", [])
- if strat_merges:
- cur = strat_store._get_cursor()
- try:
- for m in strat_merges:
- if not m.get("enabled", True): continue
- m_wf = m["master"].get("merged_workflow")
- if m_wf:
- print(" Update %s -> new workflow array" % m["master"]["id"])
- if not dry_run:
- # Strategy body is a jsonb. We update body -> 'workflow'
- cur.execute("""
- UPDATE strategy
- SET body = jsonb_set(body, '{workflow}', %s::jsonb)
- WHERE id = %s
- """, (json.dumps(m_wf, ensure_ascii=False), m["master"]["id"]))
- if not dry_run:
- strat_store.conn.commit()
- finally:
- cur.close()
- apply_merges(strat_store, "strategy", [
- ("requirement_strategy", "strategy_id"),
- ("strategy_capability", "strategy_id"),
- ("strategy_knowledge", "strategy_id"),
- ("strategy_resource", "strategy_id"),
- ], plan.get("strategies", []), dry_run)
- async def main():
- parser = argparse.ArgumentParser(description="Semantic dedup for KnowHub DB")
- parser.add_argument("--dry-run", action="store_true", help="Analyze only, save plan, do not write DB")
- parser.add_argument("--execute", action="store_true", help="Execute saved dedup_plan.json without re-analyzing")
- parser.add_argument("--caps-only", action="store_true", help="Only run capability deduplication and skip strategies")
- args = parser.parse_args()
- print("Connecting to DB...")
- res_store = PostgreSQLResourceStore()
- cap_store = PostgreSQLCapabilityStore()
- strat_store = PostgreSQLStrategyStore()
- req_store = PostgreSQLRequirementStore()
- tool_store = PostgreSQLToolStore()
- try:
- if args.execute:
- if not PLAN_FILE.exists():
- print("ERROR: dedup_plan.json not found. Run --dry-run first.")
- return
- print("Loading plan from %s ..." % PLAN_FILE)
- plan = json.loads(PLAN_FILE.read_text(encoding="utf-8"))
- execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=False)
- print("\nAll merges applied.")
- else:
- plan = await run_analysis(res_store, cap_store, strat_store, req_store, tool_store, caps_only=args.caps_only)
- PLAN_FILE.write_text(json.dumps(plan, ensure_ascii=False, indent=2), encoding="utf-8")
- print("\nPlan saved to: %s" % PLAN_FILE)
- total_merges = sum(len(plan.get(k, [])) for k in ("tools", "cases", "capabilities", "strategies"))
- print("Total merge groups planned: %d" % total_merges)
- if args.dry_run:
- print("\n[DRY-RUN] No DB changes made. Review dedup_plan.json then run --execute.")
- execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=True)
- else:
- execute_plan(plan, res_store, cap_store, strat_store, req_store, tool_store, dry_run=False)
- print("\nAll merges applied.")
- finally:
- res_store.close()
- cap_store.close()
- strat_store.close()
- req_store.close()
- tool_store.close()
- if __name__ == "__main__":
- asyncio.run(main())
|