| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- #!/usr/bin/env python3
- """
- 补全 knowledge.tools 的反向映射:
- 从 tool_table 的 tool_knowledge / case_knowledge / process_knowledge 提取知识引用,
- 反向构建 knowledge_id -> [tool_ids] 的映射,写入 knowledge.tools 字段。
- 用法:
- python fill_knowledge_tools.py # 正常执行
- python fill_knowledge_tools.py --dry-run # 仅预览,不写入数据库
- """
- import os
- import sys
- import io
- import json
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from dotenv import load_dotenv
- # 解决 Windows 终端编码问题
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
- _dir = os.path.dirname(os.path.abspath(__file__))
- _root = os.path.normpath(os.path.join(_dir, '..', '..'))
- load_dotenv(os.path.join(_root, '.env'))
- def get_conn():
- conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME'),
- connect_timeout=10
- )
- conn.autocommit = True
- return conn
- def parse_jsonb_list(val):
- """将 JSONB 字段安全解析为 list"""
- if isinstance(val, list):
- return val
- if isinstance(val, str):
- try:
- parsed = json.loads(val)
- return parsed if isinstance(parsed, list) else []
- except json.JSONDecodeError:
- return []
- return []
- def main():
- dry_run = '--dry-run' in sys.argv
- conn = get_conn()
- cur = conn.cursor(cursor_factory=RealDictCursor)
- print("Connected.\n")
- # ── Step 1: 加载所有工具及其知识引用 ──
- print("=== [1] Loading tool_table knowledge references ===")
- cur.execute("""
- SELECT id, name, tool_knowledge, case_knowledge, process_knowledge
- FROM tool_table
- ORDER BY id
- """)
- tools = cur.fetchall()
- print(f" Found {len(tools)} tools")
- # ── Step 2: 构建 knowledge_id -> [tool_ids] 反向映射 ──
- print("\n=== [2] Building reverse mapping: knowledge_id -> tool_ids ===")
- # knowledge_id -> set of tool_ids
- knowledge_to_tools = {}
- # 统计
- tool_with_refs = 0
- total_refs = 0
- for tool in tools:
- tool_id = tool['id']
- tool_name = tool['name']
- # 合并三种知识引用
- tk = parse_jsonb_list(tool['tool_knowledge'])
- ck = parse_jsonb_list(tool['case_knowledge'])
- pk = parse_jsonb_list(tool['process_knowledge'])
- all_knowledge_ids = set(tk + ck + pk)
- if not all_knowledge_ids:
- continue
- tool_with_refs += 1
- total_refs += len(all_knowledge_ids)
- for kid in all_knowledge_ids:
- if kid not in knowledge_to_tools:
- knowledge_to_tools[kid] = set()
- knowledge_to_tools[kid].add(tool_id)
- print(f" Tools with knowledge refs: {tool_with_refs}")
- print(f" Total tool->knowledge references: {total_refs}")
- print(f" Unique knowledge IDs referenced: {len(knowledge_to_tools)}")
- # ── Step 3: 验证知识 ID 是否存在于 knowledge 表 ──
- print("\n=== [3] Validating knowledge IDs ===")
- cur.execute("SELECT id FROM knowledge")
- existing_knowledge_ids = {r['id'] for r in cur.fetchall()}
- print(f" Knowledge entries in DB: {len(existing_knowledge_ids)}")
- missing = set(knowledge_to_tools.keys()) - existing_knowledge_ids
- valid = set(knowledge_to_tools.keys()) & existing_knowledge_ids
- if missing:
- print(f" [!] {len(missing)} knowledge IDs referenced but not found in DB:")
- for kid in sorted(missing)[:10]:
- tools_ref = sorted(knowledge_to_tools[kid])
- print(f" {kid} (referenced by {tools_ref[:3]}{'...' if len(tools_ref)>3 else ''})")
- if len(missing) > 10:
- print(f" ... and {len(missing) - 10} more")
- else:
- print(f" All {len(valid)} referenced knowledge IDs exist in DB")
- # ── Step 4: 写入 knowledge.tools ──
- print(f"\n=== [4] Updating knowledge.tools {'(DRY RUN)' if dry_run else ''} ===")
- updated = 0
- skipped = 0
- for kid in sorted(valid):
- tool_ids = sorted(knowledge_to_tools[kid])
- if not dry_run:
- cur.execute(
- "UPDATE knowledge SET tools = %s WHERE id = %s",
- (json.dumps(tool_ids), kid)
- )
- updated += 1
- for kid in sorted(missing):
- skipped += 1
- print(f" Updated: {updated} knowledge entries")
- print(f" Skipped: {skipped} (knowledge ID not found)")
- # ── Step 5: 验证 ──
- if not dry_run:
- print("\n=== [5] Verification ===")
- cur.execute("""
- SELECT COUNT(*) as cnt FROM knowledge
- WHERE tools != '[]'::jsonb AND tools IS NOT NULL
- """)
- filled = cur.fetchone()['cnt']
- print(f" Knowledge entries with non-empty tools: {filled}")
- print("\n -- Sample (first 10) --")
- cur.execute("""
- SELECT id, tools
- FROM knowledge
- WHERE tools != '[]'::jsonb AND tools IS NOT NULL
- ORDER BY id LIMIT 10
- """)
- for r in cur.fetchall():
- tools_list = parse_jsonb_list(r['tools'])
- print(f" {r['id']}: tools={tools_list}")
- # 统计分布
- cur.execute("""
- SELECT jsonb_array_length(tools) as tool_count, COUNT(*) as cnt
- FROM knowledge
- WHERE tools != '[]'::jsonb AND tools IS NOT NULL
- GROUP BY jsonb_array_length(tools)
- ORDER BY tool_count
- """)
- print("\n -- Distribution: how many tools per knowledge --")
- for r in cur.fetchall():
- print(f" {r['tool_count']} tool(s): {r['cnt']} knowledge entries")
- # ── Summary ──
- print(f"\n=== Summary ===")
- print(f" Knowledge entries updated: {updated}")
- print(f" Missing knowledge IDs: {skipped}")
- if dry_run:
- print(f"\n (DRY RUN mode - no changes written to database)")
- cur.close()
- conn.close()
- print("\nDone.")
- if __name__ == '__main__':
- main()
|