| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- #!/usr/bin/env python3
- """
- 补齐 94 个 alt knowledge 的 knowledge_capability 行。
- 原因:bk_20260422_strategy_capability 是 Phase 2 之前的快照,只有 99 个 selected strategy 的 cap。
- Phase 2 插入了 94 个 alt 的 cap 进 strategy_capability,Phase 4 迁移时读的是 live(包括 alt),
- 但 phase5_add_knowledge_capability.py 用的是 backup(丢 alt)。
- 修复方式:重新从源 strategy.json 解析每条 alt 的 workflow_outline caps,
- 然后用 (req_id, strategy_name) 匹配 knowledge,INSERT knowledge_capability。
- """
- import hashlib
- import json
- import re
- import sys
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
- from knowhub.scripts.rename_merged_capabilities import RENAMES
- from knowhub.scripts.llm_renames import LLM_RENAMES
- from knowhub.scripts.salvage_placeholder_strategies import LEGACY_REFS
- OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2')
- RERUN_DIR = Path('/Users/sunlit/Downloads/5')
- RERUN_FOLDERS = {'032', '046', '069', '085', '097'}
- def norm(s): return (s or '').strip().lower()
- def build_alias_and_member(cur):
- m2c = {}
- for canonical, members in MERGE_CLUSTERS.items():
- for m in members: m2c[m] = canonical
- def final(cid, limit=10):
- seen = set()
- while cid in m2c and cid not in seen and limit > 0:
- seen.add(cid); cid = m2c[cid]; limit -= 1
- return cid
- for m in list(m2c.keys()): m2c[m] = final(m)
- alias = {}
- cur.execute('SELECT id, name FROM capability')
- db_caps = {r['id']: r['name'] for r in cur.fetchall()}
- for cid, name in db_caps.items():
- alias[norm(name)] = cid
- for cid, (new_name, _) in RENAMES.items():
- alias[norm(new_name)] = final(cid)
- for llm_name, canonical in LLM_RENAMES.items():
- alias[norm(llm_name)] = final(canonical)
- return alias, m2c, db_caps
- def resolve_cap_ref(cap_ref, alias, m2c, db_caps):
- if not cap_ref: return None
- if isinstance(cap_ref, dict):
- cid = cap_ref.get('id')
- if cid and cid in db_caps: return cid
- if cid and cid in LEGACY_REFS: return LEGACY_REFS[cid]
- if cid in m2c: return m2c[cid]
- name = cap_ref.get('name', '')
- if name:
- cand = alias.get(norm(name))
- if cand and cand in db_caps: return cand
- return None
- if isinstance(cap_ref, str):
- if cap_ref in LEGACY_REFS: return LEGACY_REFS[cap_ref]
- m = re.match(r'^(CAP-[\w\-]+)', cap_ref)
- if m:
- if m.group(1) in db_caps: return m.group(1)
- if m.group(1) in LEGACY_REFS: return LEGACY_REFS[m.group(1)]
- cand = alias.get(norm(cap_ref))
- if cand and cand in db_caps: return cand
- return None
- def extract_strat_caps(s_data, alias, m2c, db_caps):
- wo = s_data.get('workflow_outline') or []
- caps = set()
- if isinstance(wo, list):
- for ph in wo:
- if not isinstance(ph, dict): continue
- for c in ph.get('capabilities', []) or []:
- r = resolve_cap_ref(c, alias, m2c, db_caps)
- if r: caps.add(r)
- return caps
- def main():
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- alias, m2c, db_caps = build_alias_and_member(cur)
- print(f'alias={len(alias)}, m2c={len(m2c)}, db_caps={len(db_caps)}', flush=True)
- # 先识别 0-cap 的 knowledge(绝大多是 alt strategies)
- cur.execute("""SELECT k.id, k.source FROM knowledge k
- WHERE k.version='howard_strategy_instance'
- AND NOT EXISTS (SELECT 1 FROM knowledge_capability kc
- WHERE kc.knowledge_id = k.id)""")
- zero_cap_knowledge = cur.fetchall()
- print(f'knowledge with 0 cap links: {len(zero_cap_knowledge)}', flush=True)
- # 为这些 knowledge 重新解析原 strategy 的 caps
- # knowledge.source.original_strategy_id → 老具体 strategy id(已被删除)
- # 但 knowledge.tags.original_strategy_name 保留了名字
- # 可以通过 req_id + name 在源 strategy.json 里定位
- # 1) 从源文件读所有 (req_id, strategy_name) → caps
- source_cap_map = {} # (req_id, strategy_name) → set(cap_id)
- # 获取 req_id → req_text 的映射,再 req_text → folder
- cur.execute('SELECT id, description FROM requirement')
- req_by_id = {r['id']: r['description'] for r in cur.fetchall()}
- folders = []
- for d in sorted(OUTPUT_DIR.iterdir()):
- if not d.is_dir(): continue
- if d.name in RERUN_FOLDERS:
- folders.append(RERUN_DIR / d.name)
- else:
- folders.append(d)
- for folder in folders:
- strat_path = folder / 'strategy.json'
- if not strat_path.exists(): continue
- try:
- sd = json.loads(strat_path.read_text(encoding='utf-8'))
- except Exception:
- continue
- if not isinstance(sd, dict): continue
- req_text = sd.get('requirement', '')
- if not req_text: continue
- # Find req_id by exact match
- matched_req = None
- for rid, rtext in req_by_id.items():
- if rtext == req_text:
- matched_req = rid; break
- if not matched_req: continue
- for s_data in sd.get('strategies', []):
- if not isinstance(s_data, dict): continue
- name = s_data.get('name', '')
- caps = extract_strat_caps(s_data, alias, m2c, db_caps)
- source_cap_map[(matched_req, name)] = caps
- print(f'source (req, name) → caps 映射共 {len(source_cap_map)} 条', flush=True)
- # 2) 匹配 0-cap knowledge 到 source,插入 knowledge_capability
- total_ins = 0
- not_matched = []
- for k in zero_cap_knowledge:
- src = k['source'] if isinstance(k['source'], dict) else json.loads(k['source'] or '{}')
- # tags 里有 original_strategy_name
- # 我们从 knowledge 里额外查
- cur.execute('SELECT tags FROM knowledge WHERE id = %s', (k['id'],))
- tags = cur.fetchone()['tags']
- tags = tags if isinstance(tags, dict) else json.loads(tags or '{}')
- orig_name = tags.get('original_strategy_name', '')
- # req_id 从 requirement_knowledge 查
- cur.execute("""SELECT requirement_id FROM requirement_knowledge
- WHERE knowledge_id = %s""", (k['id'],))
- row = cur.fetchone()
- if not row: continue
- req_id = row['requirement_id']
- caps = source_cap_map.get((req_id, orig_name), set())
- if not caps:
- not_matched.append((k['id'], req_id, orig_name))
- continue
- for cap_id in caps:
- cur.execute("""INSERT INTO knowledge_capability (knowledge_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (k['id'], cap_id))
- total_ins += 1
- print(f'\ninserted: {total_ins}', flush=True)
- print(f'still not matched: {len(not_matched)}', flush=True)
- for item in not_matched[:8]:
- print(f' {item}', flush=True)
- # 3) 验证
- cur.execute("""SELECT COUNT(*) c FROM knowledge
- WHERE version='howard_strategy_instance'
- AND NOT EXISTS (SELECT 1 FROM knowledge_capability kc
- WHERE kc.knowledge_id = knowledge.id)""")
- still_zero = cur.fetchone()['c']
- print(f'\n仍 0 cap 的 knowledge: {still_zero}', flush=True)
- # 约束验证
- cur.execute("""SELECT COUNT(*) c FROM knowledge_capability kc
- JOIN requirement_knowledge rk ON rk.knowledge_id = kc.knowledge_id
- LEFT JOIN requirement_capability rc
- ON rc.requirement_id = rk.requirement_id
- AND rc.capability_id = kc.capability_id
- WHERE rc.capability_id IS NULL""")
- print(f'knowledge_cap ⊄ req_cap 违规: {cur.fetchone()["c"]}', flush=True)
- cur.execute('SELECT COUNT(*) c FROM knowledge_capability')
- print(f'knowledge_capability 总行数: {cur.fetchone()["c"]}', flush=True)
- finally:
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|