| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- #!/usr/bin/env python3
- """
- Phase 2:入库全部 strategy(含备选),填入 coverage 数据。
- 源数据:
- - /Users/sunlit/Downloads/output 2/ 中 94 个标准 folder(不含 5 rerun)
- - /Users/sunlit/Downloads/5/ 中 5 个 rerun folder
- - /Users/sunlit/Downloads/coverage_scores.json coverage 评分
- 行为:
- 1. 对每个 folder,读 strategy.json 全部 strategies(不过滤 is_selected)
- 2. 计算 strategy_id = hash8(req_text + "|" + strategy_name)
- 3. 已存在则更新 body(保留原 body,加 coverage_score / coverage_explanation 字段)
- 不存在则新建 strategy 行(是 alt)
- 4. 写/更新 requirement_strategy 行:is_selected / coverage_score / coverage_explanation
- 5. 为新建的 alt 写 strategy_capability
- 注意:5 个占位 folder(004/031/053/066/070)已被 Phase 1 正规化,不在此脚本扫描范围
- (它们的 strategy.json 非标准,只有 selected 那一条能救;其 alt blueprint 难以可靠提取,
- 本阶段暂不尝试)。
- coverage 匹配:
- - selected:按 (req_id, is_selected=true) 唯一对齐
- - alt:名字精确 > 2-gram 字符重合度 > 顺序兜底
- """
- import hashlib
- import json
- import re
- import sys
- import time
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
- from knowhub.scripts.rename_merged_capabilities import RENAMES
- from knowhub.scripts.llm_renames import LLM_RENAMES
- OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2')
- RERUN_DIR = Path('/Users/sunlit/Downloads/5')
- COVERAGE_FILE = Path('/Users/sunlit/Downloads/coverage_scores.json')
- RERUN_FOLDERS = {'032', '046', '069', '085', '097'}
- PLACEHOLDER_FOLDERS = {'004', '031', '053', '066', '070'} # 已 Phase 1 处理,跳过
- DEDUP_VERSION = 'howard_dedup'
- def norm(s):
- return (s or '').strip().lower()
- def hash8(text):
- return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
- def gen_strategy_id(req_text, strategy_name):
- return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
- def ch_bigrams(s):
- """Character 2-grams — Chinese 名字相似度用。"""
- s = re.sub(r'\s+', '', s or '')
- return set(s[i:i+2] for i in range(len(s) - 1))
- def name_similarity(a, b):
- """Jaccard on char 2-grams."""
- if not a or not b: return 0
- ba, bb = ch_bigrams(a), ch_bigrams(b)
- if not ba or not bb: return 0
- return len(ba & bb) / len(ba | bb)
- # ═══════════════════════════════════════════════════════════
- def build_alias_and_member(cur):
- m2c = {}
- for canonical, members in MERGE_CLUSTERS.items():
- for m in members:
- m2c[m] = canonical
- def final(cid, limit=10):
- seen = set()
- while cid in m2c and cid not in seen and limit > 0:
- seen.add(cid); cid = m2c[cid]; limit -= 1
- return cid
- for m in list(m2c.keys()):
- m2c[m] = final(m)
- alias = {}
- cur.execute('SELECT id, name FROM capability')
- db_caps = {r['id']: r['name'] for r in cur.fetchall()}
- for cid, name in db_caps.items():
- alias[norm(name)] = cid
- for cid, (new_name, _) in RENAMES.items():
- alias[norm(new_name)] = final(cid)
- for llm_name, canonical in LLM_RENAMES.items():
- alias[norm(llm_name)] = final(canonical)
- return alias, db_caps
- def resolve_cap_ref(cap_ref, alias, db_caps):
- """cap_ref = {id, name} dict 或 string。"""
- if not cap_ref: return None
- if isinstance(cap_ref, dict):
- cid = cap_ref.get('id')
- if cid and cid in db_caps: return cid
- name = cap_ref.get('name', '')
- if name:
- cand = alias.get(norm(name))
- if cand and cand in db_caps: return cand
- return None
- if isinstance(cap_ref, str):
- # "CAP-xxx name" or pure name
- m = re.match(r'^(CAP-[\w\-]+)', cap_ref)
- if m and m.group(1) in db_caps: return m.group(1)
- cand = alias.get(norm(cap_ref))
- if cand and cand in db_caps: return cand
- return None
- def extract_strat_caps(strategy_dict, alias, db_caps):
- """workflow_outline 里的 caps → set of canonical ids。"""
- wo = strategy_dict.get('workflow_outline') or []
- cap_ids = set()
- if isinstance(wo, list):
- for ph in wo:
- if not isinstance(ph, dict): continue
- for c in ph.get('capabilities', []) or []:
- r = resolve_cap_ref(c, alias, db_caps)
- if r: cap_ids.add(r)
- return cap_ids
- # ═══════════════════════════════════════════════════════════
- def match_coverage_for_req(db_strats_with_is_sel, coverage_strats):
- """
- Return dict: db_strat_idx → coverage entry.
- 不再按 is_selected 分区,因为 coverage 文件有时所有条目都标 is_selected=True。
- 改为:exact name → fuzzy name → selected 兜底 → 顺序兜底
- """
- matched = {}
- if not coverage_strats: return matched
- remaining_cov = list(coverage_strats)
- # 1. exact name match(最强信号)
- for i, db_s in enumerate(db_strats_with_is_sel):
- db_name = db_s.get('name', '')
- if not db_name: continue
- m = next((c for c in remaining_cov if c.get('strategy_name') == db_name), None)
- if m:
- matched[i] = m
- remaining_cov.remove(m)
- # 2. fuzzy name match(2-gram Jaccard >= 0.25)
- for i, db_s in enumerate(db_strats_with_is_sel):
- if i in matched: continue
- db_name = db_s.get('name', '')
- if not db_name or not remaining_cov: continue
- best = None; best_sim = 0
- for c in remaining_cov:
- sim = name_similarity(c.get('strategy_name', ''), db_name)
- if sim > best_sim:
- best = c; best_sim = sim
- if best and best_sim >= 0.25:
- matched[i] = best
- remaining_cov.remove(best)
- # 3. selected 特殊对齐:db 和 cov 各剩 1 条 selected,互相对齐
- db_unsel_sel = [(i, s) for i, s in enumerate(db_strats_with_is_sel)
- if s.get('is_selected') and i not in matched]
- cov_sel_rem = [c for c in remaining_cov if c.get('is_selected')]
- if len(db_unsel_sel) == 1 and len(cov_sel_rem) >= 1:
- matched[db_unsel_sel[0][0]] = cov_sel_rem[0]
- remaining_cov.remove(cov_sel_rem[0])
- # 4. 顺序兜底
- unmatched_db_idx = [i for i, _ in enumerate(db_strats_with_is_sel) if i not in matched]
- for j, c in enumerate(remaining_cov):
- if j < len(unmatched_db_idx):
- matched[unmatched_db_idx[j]] = c
- return matched
- # ═══════════════════════════════════════════════════════════
- def ingest_folder(folder, cur, alias, db_caps, coverage_map, stats):
- """处理一个 folder,返回 {strategy_id: {inserted, updated, is_selected, ...}} """
- folder_key = folder.name
- # 1. req_text + req_id
- req_text = ''
- for fn in ['blueprint.json', 'strategy.json', 'capabilities_extracted.json']:
- fp = folder / fn
- if not fp.exists(): continue
- try:
- d = json.loads(fp.read_text(encoding='utf-8'))
- if isinstance(d, dict):
- rt = d.get('requirement', '')
- if rt:
- req_text = rt
- break
- except Exception:
- continue
- if not req_text:
- stats['no_req_text'].append(folder_key)
- return
- cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
- row = cur.fetchone()
- if not row:
- stats['no_req_match'].append(folder_key)
- return
- req_id = row['id']
- # 2. load strategy.json
- strat_path = folder / 'strategy.json'
- try:
- sd = json.loads(strat_path.read_text(encoding='utf-8'))
- except Exception as e:
- stats['bad_strat_file'].append(folder_key)
- return
- if not isinstance(sd, dict):
- stats['bad_strat_file'].append(folder_key)
- return
- all_strats = sd.get('strategies', [])
- if not isinstance(all_strats, list) or not all_strats:
- stats['no_strats_list'].append(folder_key)
- return
- # 3. coverage for this req
- cov_entry = coverage_map.get(req_id)
- cov_strats = cov_entry.get('strategies', []) if cov_entry else []
- match_map = match_coverage_for_req(all_strats, cov_strats)
- # 4. ingest each strategy
- now = int(time.time())
- for idx, s_data in enumerate(all_strats):
- if not isinstance(s_data, dict): continue
- s_name = s_data.get('name') or f'Strategy-{folder_key}-{idx}'
- is_sel = bool(s_data.get('is_selected'))
- s_id = gen_strategy_id(req_text, s_name)
- # coverage
- cov = match_map.get(idx)
- cov_score = cov.get('coverage_score') if cov else None
- cov_expl = cov.get('explanation') if cov else None
- # body 增强:保留原 body 内所有字段,加上 coverage 数据
- body = dict(s_data)
- body['coverage_score'] = cov_score
- body['coverage_explanation'] = cov_expl
- # 4a. strategy 表:存在则 UPDATE,不存在则 INSERT
- cur.execute('SELECT id FROM strategy WHERE id = %s', (s_id,))
- exists = cur.fetchone() is not None
- if exists:
- cur.execute("""UPDATE strategy SET
- name = %s, description = %s, body = %s, updated_at = %s
- WHERE id = %s""",
- (s_name, (s_data.get('reasoning') or '')[:2000],
- json.dumps(body, ensure_ascii=False), now, s_id))
- stats['strat_updated'] += 1
- else:
- cur.execute("""INSERT INTO strategy (id, name, description, body,
- status, created_at, updated_at, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
- (s_id, s_name, (s_data.get('reasoning') or '')[:2000],
- json.dumps(body, ensure_ascii=False),
- 'draft', now, now, DEDUP_VERSION))
- stats['strat_inserted'] += 1
- # 4b. requirement_strategy:DELETE + INSERT
- cur.execute("""DELETE FROM requirement_strategy
- WHERE requirement_id = %s AND strategy_id = %s""", (req_id, s_id))
- cur.execute("""INSERT INTO requirement_strategy
- (requirement_id, strategy_id, is_selected,
- coverage_score, coverage_explanation)
- VALUES (%s, %s, %s, %s, %s)""",
- (req_id, s_id, is_sel, cov_score, cov_expl))
- stats['req_strat_rows'] += 1
- # 4c. strategy_capability:清空重建(防止旧 junction 不准)
- cur.execute('DELETE FROM strategy_capability WHERE strategy_id = %s', (s_id,))
- strat_caps = extract_strat_caps(s_data, alias, db_caps)
- for cid in strat_caps:
- cur.execute("""INSERT INTO strategy_capability
- (strategy_id, capability_id, relation_type)
- VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""",
- (s_id, cid))
- stats['strat_cap_rows'] += len(strat_caps)
- # 4d. req_cap superset: 备选的 caps 也算研究发现的,并入 req_cap
- for cid in strat_caps:
- cur.execute("""INSERT INTO requirement_capability
- (requirement_id, capability_id) VALUES (%s, %s)
- ON CONFLICT DO NOTHING""", (req_id, cid))
- print(f'[{folder_key}] req={req_id} strategies={len(all_strats)} '
- f'cov_matched={sum(1 for m in match_map.values() if m is not None)}/{len(cov_strats)}',
- flush=True)
- # ═══════════════════════════════════════════════════════════
- def fill_placeholder_req_strats(cur, coverage_map, stats):
- """为 5 个 placeholder folder 的已有 strategy 填充 requirement_strategy 的新字段。"""
- for req_id in ['REQ_004', 'REQ_031', 'REQ_053', 'REQ_066', 'REQ_070']:
- cur.execute("""SELECT s.id, s.name FROM strategy s
- JOIN requirement_strategy rs ON rs.strategy_id = s.id
- WHERE rs.requirement_id = %s""", (req_id,))
- row = cur.fetchone()
- if not row: continue
- # placeholder folder 本来就是 is_selected = true(只有这一条)
- cur.execute("""DELETE FROM requirement_strategy
- WHERE requirement_id = %s AND strategy_id = %s""",
- (req_id, row['id']))
- cur.execute("""INSERT INTO requirement_strategy
- (requirement_id, strategy_id, is_selected,
- coverage_score, coverage_explanation)
- VALUES (%s, %s, %s, NULL, NULL)""",
- (req_id, row['id'], True))
- stats['placeholder_filled'] += 1
- # ═══════════════════════════════════════════════════════════
- def main():
- coverage_map = json.loads(COVERAGE_FILE.read_text(encoding='utf-8'))
- print(f'Loaded coverage for {len(coverage_map)} reqs', flush=True)
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- alias, db_caps = build_alias_and_member(cur)
- print(f'Alias entries: {len(alias)}, DB caps: {len(db_caps)}', flush=True)
- stats = {'strat_inserted': 0, 'strat_updated': 0,
- 'req_strat_rows': 0, 'strat_cap_rows': 0,
- 'placeholder_filled': 0,
- 'no_req_text': [], 'no_req_match': [], 'bad_strat_file': [],
- 'no_strats_list': []}
- # 处理 output 2/ 所有 folder(除 rerun 5 用 downloads/5/ 替代;5 placeholder 跳过)
- folders = []
- for d in sorted(OUTPUT_DIR.iterdir()):
- if not d.is_dir(): continue
- if d.name in PLACEHOLDER_FOLDERS:
- continue
- if d.name in RERUN_FOLDERS:
- folders.append(RERUN_DIR / d.name)
- else:
- folders.append(d)
- print(f'\nProcessing {len(folders)} folders...', flush=True)
- for folder in folders:
- ingest_folder(folder, cur, alias, db_caps, coverage_map, stats)
- # placeholder folder 的 strategy 只填 requirement_strategy 新字段
- print('\n=== 填充 5 个 placeholder req 的 requirement_strategy 新字段 ===', flush=True)
- fill_placeholder_req_strats(cur, coverage_map, stats)
- # ═════ 验证输出 ═════
- print(f'\n{"="*60}\nStats:', flush=True)
- for k, v in stats.items():
- if isinstance(v, list):
- print(f' {k}: {len(v)} {v[:5]}', flush=True)
- else:
- print(f' {k}: {v}', flush=True)
- cur.execute("""SELECT
- (SELECT COUNT(*) FROM strategy) s_total,
- (SELECT COUNT(*) FROM requirement_strategy) rs_total,
- (SELECT COUNT(*) FROM requirement_strategy WHERE is_selected=TRUE) rs_sel,
- (SELECT COUNT(*) FROM requirement_strategy WHERE is_selected=FALSE) rs_alt,
- (SELECT COUNT(*) FROM requirement_strategy WHERE coverage_score IS NOT NULL) rs_covered,
- (SELECT COUNT(*) FROM strategy_capability) sc_total,
- (SELECT COUNT(*) FROM requirement_capability) rc_total""")
- r = cur.fetchone()
- print(f'\n最终:', flush=True)
- for k, v in dict(r).items(): print(f' {k}: {v}', flush=True)
- # 每 req 的 strategy 数分布
- cur.execute("""SELECT strat_count, COUNT(*) reqs FROM
- (SELECT requirement_id, COUNT(*) strat_count FROM requirement_strategy
- GROUP BY requirement_id) t
- GROUP BY strat_count ORDER BY strat_count""")
- print(f'\n每 req 的 strategy 数分布:', flush=True)
- for r in cur.fetchall():
- print(f' {r["strat_count"]} strategies → {r["reqs"]} reqs', flush=True)
- finally:
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|