#!/usr/bin/env python3 """ 补全 knowledge.tools 的反向映射: 从 tool_table 的 tool_knowledge / case_knowledge / process_knowledge 提取知识引用, 反向构建 knowledge_id -> [tool_ids] 的映射,写入 knowledge.tools 字段。 用法: python fill_knowledge_tools.py # 正常执行 python fill_knowledge_tools.py --dry-run # 仅预览,不写入数据库 """ import os import sys import io import json import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv # 解决 Windows 终端编码问题 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') _dir = os.path.dirname(os.path.abspath(__file__)) _root = os.path.normpath(os.path.join(_dir, '..', '..')) load_dotenv(os.path.join(_root, '.env')) def get_conn(): conn = psycopg2.connect( host=os.getenv('KNOWHUB_DB'), port=int(os.getenv('KNOWHUB_PORT', 5432)), user=os.getenv('KNOWHUB_USER'), password=os.getenv('KNOWHUB_PASSWORD'), database=os.getenv('KNOWHUB_DB_NAME'), connect_timeout=10 ) conn.autocommit = True return conn def parse_jsonb_list(val): """将 JSONB 字段安全解析为 list""" if isinstance(val, list): return val if isinstance(val, str): try: parsed = json.loads(val) return parsed if isinstance(parsed, list) else [] except json.JSONDecodeError: return [] return [] def main(): dry_run = '--dry-run' in sys.argv conn = get_conn() cur = conn.cursor(cursor_factory=RealDictCursor) print("Connected.\n") # ── Step 1: 加载所有工具及其知识引用 ── print("=== [1] Loading tool_table knowledge references ===") cur.execute(""" SELECT id, name, tool_knowledge, case_knowledge, process_knowledge FROM tool_table ORDER BY id """) tools = cur.fetchall() print(f" Found {len(tools)} tools") # ── Step 2: 构建 knowledge_id -> [tool_ids] 反向映射 ── print("\n=== [2] Building reverse mapping: knowledge_id -> tool_ids ===") # knowledge_id -> set of tool_ids knowledge_to_tools = {} # 统计 tool_with_refs = 0 total_refs = 0 for tool in tools: tool_id = tool['id'] tool_name = tool['name'] # 合并三种知识引用 tk = parse_jsonb_list(tool['tool_knowledge']) ck = parse_jsonb_list(tool['case_knowledge']) pk = parse_jsonb_list(tool['process_knowledge']) all_knowledge_ids = set(tk + ck + pk) if not all_knowledge_ids: continue tool_with_refs += 1 total_refs += len(all_knowledge_ids) for kid in all_knowledge_ids: if kid not in knowledge_to_tools: knowledge_to_tools[kid] = set() knowledge_to_tools[kid].add(tool_id) print(f" Tools with knowledge refs: {tool_with_refs}") print(f" Total tool->knowledge references: {total_refs}") print(f" Unique knowledge IDs referenced: {len(knowledge_to_tools)}") # ── Step 3: 验证知识 ID 是否存在于 knowledge 表 ── print("\n=== [3] Validating knowledge IDs ===") cur.execute("SELECT id FROM knowledge") existing_knowledge_ids = {r['id'] for r in cur.fetchall()} print(f" Knowledge entries in DB: {len(existing_knowledge_ids)}") missing = set(knowledge_to_tools.keys()) - existing_knowledge_ids valid = set(knowledge_to_tools.keys()) & existing_knowledge_ids if missing: print(f" [!] {len(missing)} knowledge IDs referenced but not found in DB:") for kid in sorted(missing)[:10]: tools_ref = sorted(knowledge_to_tools[kid]) print(f" {kid} (referenced by {tools_ref[:3]}{'...' if len(tools_ref)>3 else ''})") if len(missing) > 10: print(f" ... and {len(missing) - 10} more") else: print(f" All {len(valid)} referenced knowledge IDs exist in DB") # ── Step 4: 写入 knowledge.tools ── print(f"\n=== [4] Updating knowledge.tools {'(DRY RUN)' if dry_run else ''} ===") updated = 0 skipped = 0 for kid in sorted(valid): tool_ids = sorted(knowledge_to_tools[kid]) if not dry_run: cur.execute( "UPDATE knowledge SET tools = %s WHERE id = %s", (json.dumps(tool_ids), kid) ) updated += 1 for kid in sorted(missing): skipped += 1 print(f" Updated: {updated} knowledge entries") print(f" Skipped: {skipped} (knowledge ID not found)") # ── Step 5: 验证 ── if not dry_run: print("\n=== [5] Verification ===") cur.execute(""" SELECT COUNT(*) as cnt FROM knowledge WHERE tools != '[]'::jsonb AND tools IS NOT NULL """) filled = cur.fetchone()['cnt'] print(f" Knowledge entries with non-empty tools: {filled}") print("\n -- Sample (first 10) --") cur.execute(""" SELECT id, tools FROM knowledge WHERE tools != '[]'::jsonb AND tools IS NOT NULL ORDER BY id LIMIT 10 """) for r in cur.fetchall(): tools_list = parse_jsonb_list(r['tools']) print(f" {r['id']}: tools={tools_list}") # 统计分布 cur.execute(""" SELECT jsonb_array_length(tools) as tool_count, COUNT(*) as cnt FROM knowledge WHERE tools != '[]'::jsonb AND tools IS NOT NULL GROUP BY jsonb_array_length(tools) ORDER BY tool_count """) print("\n -- Distribution: how many tools per knowledge --") for r in cur.fetchall(): print(f" {r['tool_count']} tool(s): {r['cnt']} knowledge entries") # ── Summary ── print(f"\n=== Summary ===") print(f" Knowledge entries updated: {updated}") print(f" Missing knowledge IDs: {skipped}") if dry_run: print(f"\n (DRY RUN mode - no changes written to database)") cur.close() conn.close() print("\nDone.") if __name__ == '__main__': main()