ingest_all_strategies.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. #!/usr/bin/env python3
  2. """
  3. Phase 2:入库全部 strategy(含备选),填入 coverage 数据。
  4. 源数据:
  5. - /Users/sunlit/Downloads/output 2/ 中 94 个标准 folder(不含 5 rerun)
  6. - /Users/sunlit/Downloads/5/ 中 5 个 rerun folder
  7. - /Users/sunlit/Downloads/coverage_scores.json coverage 评分
  8. 行为:
  9. 1. 对每个 folder,读 strategy.json 全部 strategies(不过滤 is_selected)
  10. 2. 计算 strategy_id = hash8(req_text + "|" + strategy_name)
  11. 3. 已存在则更新 body(保留原 body,加 coverage_score / coverage_explanation 字段)
  12. 不存在则新建 strategy 行(是 alt)
  13. 4. 写/更新 requirement_strategy 行:is_selected / coverage_score / coverage_explanation
  14. 5. 为新建的 alt 写 strategy_capability
  15. 注意:5 个占位 folder(004/031/053/066/070)已被 Phase 1 正规化,不在此脚本扫描范围
  16. (它们的 strategy.json 非标准,只有 selected 那一条能救;其 alt blueprint 难以可靠提取,
  17. 本阶段暂不尝试)。
  18. coverage 匹配:
  19. - selected:按 (req_id, is_selected=true) 唯一对齐
  20. - alt:名字精确 > 2-gram 字符重合度 > 顺序兜底
  21. """
  22. import hashlib
  23. import json
  24. import re
  25. import sys
  26. import time
  27. from pathlib import Path
  28. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  29. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  30. from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
  31. from knowhub.scripts.rename_merged_capabilities import RENAMES
  32. from knowhub.scripts.llm_renames import LLM_RENAMES
  33. OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2')
  34. RERUN_DIR = Path('/Users/sunlit/Downloads/5')
  35. COVERAGE_FILE = Path('/Users/sunlit/Downloads/coverage_scores.json')
  36. RERUN_FOLDERS = {'032', '046', '069', '085', '097'}
  37. PLACEHOLDER_FOLDERS = {'004', '031', '053', '066', '070'} # 已 Phase 1 处理,跳过
  38. DEDUP_VERSION = 'howard_dedup'
  39. def norm(s):
  40. return (s or '').strip().lower()
  41. def hash8(text):
  42. return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
  43. def gen_strategy_id(req_text, strategy_name):
  44. return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
  45. def ch_bigrams(s):
  46. """Character 2-grams — Chinese 名字相似度用。"""
  47. s = re.sub(r'\s+', '', s or '')
  48. return set(s[i:i+2] for i in range(len(s) - 1))
  49. def name_similarity(a, b):
  50. """Jaccard on char 2-grams."""
  51. if not a or not b: return 0
  52. ba, bb = ch_bigrams(a), ch_bigrams(b)
  53. if not ba or not bb: return 0
  54. return len(ba & bb) / len(ba | bb)
  55. # ═══════════════════════════════════════════════════════════
  56. def build_alias_and_member(cur):
  57. m2c = {}
  58. for canonical, members in MERGE_CLUSTERS.items():
  59. for m in members:
  60. m2c[m] = canonical
  61. def final(cid, limit=10):
  62. seen = set()
  63. while cid in m2c and cid not in seen and limit > 0:
  64. seen.add(cid); cid = m2c[cid]; limit -= 1
  65. return cid
  66. for m in list(m2c.keys()):
  67. m2c[m] = final(m)
  68. alias = {}
  69. cur.execute('SELECT id, name FROM capability')
  70. db_caps = {r['id']: r['name'] for r in cur.fetchall()}
  71. for cid, name in db_caps.items():
  72. alias[norm(name)] = cid
  73. for cid, (new_name, _) in RENAMES.items():
  74. alias[norm(new_name)] = final(cid)
  75. for llm_name, canonical in LLM_RENAMES.items():
  76. alias[norm(llm_name)] = final(canonical)
  77. return alias, db_caps
  78. def resolve_cap_ref(cap_ref, alias, db_caps):
  79. """cap_ref = {id, name} dict 或 string。"""
  80. if not cap_ref: return None
  81. if isinstance(cap_ref, dict):
  82. cid = cap_ref.get('id')
  83. if cid and cid in db_caps: return cid
  84. name = cap_ref.get('name', '')
  85. if name:
  86. cand = alias.get(norm(name))
  87. if cand and cand in db_caps: return cand
  88. return None
  89. if isinstance(cap_ref, str):
  90. # "CAP-xxx name" or pure name
  91. m = re.match(r'^(CAP-[\w\-]+)', cap_ref)
  92. if m and m.group(1) in db_caps: return m.group(1)
  93. cand = alias.get(norm(cap_ref))
  94. if cand and cand in db_caps: return cand
  95. return None
  96. def extract_strat_caps(strategy_dict, alias, db_caps):
  97. """workflow_outline 里的 caps → set of canonical ids。"""
  98. wo = strategy_dict.get('workflow_outline') or []
  99. cap_ids = set()
  100. if isinstance(wo, list):
  101. for ph in wo:
  102. if not isinstance(ph, dict): continue
  103. for c in ph.get('capabilities', []) or []:
  104. r = resolve_cap_ref(c, alias, db_caps)
  105. if r: cap_ids.add(r)
  106. return cap_ids
  107. # ═══════════════════════════════════════════════════════════
  108. def match_coverage_for_req(db_strats_with_is_sel, coverage_strats):
  109. """
  110. Return dict: db_strat_idx → coverage entry.
  111. 不再按 is_selected 分区,因为 coverage 文件有时所有条目都标 is_selected=True。
  112. 改为:exact name → fuzzy name → selected 兜底 → 顺序兜底
  113. """
  114. matched = {}
  115. if not coverage_strats: return matched
  116. remaining_cov = list(coverage_strats)
  117. # 1. exact name match(最强信号)
  118. for i, db_s in enumerate(db_strats_with_is_sel):
  119. db_name = db_s.get('name', '')
  120. if not db_name: continue
  121. m = next((c for c in remaining_cov if c.get('strategy_name') == db_name), None)
  122. if m:
  123. matched[i] = m
  124. remaining_cov.remove(m)
  125. # 2. fuzzy name match(2-gram Jaccard >= 0.25)
  126. for i, db_s in enumerate(db_strats_with_is_sel):
  127. if i in matched: continue
  128. db_name = db_s.get('name', '')
  129. if not db_name or not remaining_cov: continue
  130. best = None; best_sim = 0
  131. for c in remaining_cov:
  132. sim = name_similarity(c.get('strategy_name', ''), db_name)
  133. if sim > best_sim:
  134. best = c; best_sim = sim
  135. if best and best_sim >= 0.25:
  136. matched[i] = best
  137. remaining_cov.remove(best)
  138. # 3. selected 特殊对齐:db 和 cov 各剩 1 条 selected,互相对齐
  139. db_unsel_sel = [(i, s) for i, s in enumerate(db_strats_with_is_sel)
  140. if s.get('is_selected') and i not in matched]
  141. cov_sel_rem = [c for c in remaining_cov if c.get('is_selected')]
  142. if len(db_unsel_sel) == 1 and len(cov_sel_rem) >= 1:
  143. matched[db_unsel_sel[0][0]] = cov_sel_rem[0]
  144. remaining_cov.remove(cov_sel_rem[0])
  145. # 4. 顺序兜底
  146. unmatched_db_idx = [i for i, _ in enumerate(db_strats_with_is_sel) if i not in matched]
  147. for j, c in enumerate(remaining_cov):
  148. if j < len(unmatched_db_idx):
  149. matched[unmatched_db_idx[j]] = c
  150. return matched
  151. # ═══════════════════════════════════════════════════════════
  152. def ingest_folder(folder, cur, alias, db_caps, coverage_map, stats):
  153. """处理一个 folder,返回 {strategy_id: {inserted, updated, is_selected, ...}} """
  154. folder_key = folder.name
  155. # 1. req_text + req_id
  156. req_text = ''
  157. for fn in ['blueprint.json', 'strategy.json', 'capabilities_extracted.json']:
  158. fp = folder / fn
  159. if not fp.exists(): continue
  160. try:
  161. d = json.loads(fp.read_text(encoding='utf-8'))
  162. if isinstance(d, dict):
  163. rt = d.get('requirement', '')
  164. if rt:
  165. req_text = rt
  166. break
  167. except Exception:
  168. continue
  169. if not req_text:
  170. stats['no_req_text'].append(folder_key)
  171. return
  172. cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
  173. row = cur.fetchone()
  174. if not row:
  175. stats['no_req_match'].append(folder_key)
  176. return
  177. req_id = row['id']
  178. # 2. load strategy.json
  179. strat_path = folder / 'strategy.json'
  180. try:
  181. sd = json.loads(strat_path.read_text(encoding='utf-8'))
  182. except Exception as e:
  183. stats['bad_strat_file'].append(folder_key)
  184. return
  185. if not isinstance(sd, dict):
  186. stats['bad_strat_file'].append(folder_key)
  187. return
  188. all_strats = sd.get('strategies', [])
  189. if not isinstance(all_strats, list) or not all_strats:
  190. stats['no_strats_list'].append(folder_key)
  191. return
  192. # 3. coverage for this req
  193. cov_entry = coverage_map.get(req_id)
  194. cov_strats = cov_entry.get('strategies', []) if cov_entry else []
  195. match_map = match_coverage_for_req(all_strats, cov_strats)
  196. # 4. ingest each strategy
  197. now = int(time.time())
  198. for idx, s_data in enumerate(all_strats):
  199. if not isinstance(s_data, dict): continue
  200. s_name = s_data.get('name') or f'Strategy-{folder_key}-{idx}'
  201. is_sel = bool(s_data.get('is_selected'))
  202. s_id = gen_strategy_id(req_text, s_name)
  203. # coverage
  204. cov = match_map.get(idx)
  205. cov_score = cov.get('coverage_score') if cov else None
  206. cov_expl = cov.get('explanation') if cov else None
  207. # body 增强:保留原 body 内所有字段,加上 coverage 数据
  208. body = dict(s_data)
  209. body['coverage_score'] = cov_score
  210. body['coverage_explanation'] = cov_expl
  211. # 4a. strategy 表:存在则 UPDATE,不存在则 INSERT
  212. cur.execute('SELECT id FROM strategy WHERE id = %s', (s_id,))
  213. exists = cur.fetchone() is not None
  214. if exists:
  215. cur.execute("""UPDATE strategy SET
  216. name = %s, description = %s, body = %s, updated_at = %s
  217. WHERE id = %s""",
  218. (s_name, (s_data.get('reasoning') or '')[:2000],
  219. json.dumps(body, ensure_ascii=False), now, s_id))
  220. stats['strat_updated'] += 1
  221. else:
  222. cur.execute("""INSERT INTO strategy (id, name, description, body,
  223. status, created_at, updated_at, version)
  224. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  225. (s_id, s_name, (s_data.get('reasoning') or '')[:2000],
  226. json.dumps(body, ensure_ascii=False),
  227. 'draft', now, now, DEDUP_VERSION))
  228. stats['strat_inserted'] += 1
  229. # 4b. requirement_strategy:DELETE + INSERT
  230. cur.execute("""DELETE FROM requirement_strategy
  231. WHERE requirement_id = %s AND strategy_id = %s""", (req_id, s_id))
  232. cur.execute("""INSERT INTO requirement_strategy
  233. (requirement_id, strategy_id, is_selected,
  234. coverage_score, coverage_explanation)
  235. VALUES (%s, %s, %s, %s, %s)""",
  236. (req_id, s_id, is_sel, cov_score, cov_expl))
  237. stats['req_strat_rows'] += 1
  238. # 4c. strategy_capability:清空重建(防止旧 junction 不准)
  239. cur.execute('DELETE FROM strategy_capability WHERE strategy_id = %s', (s_id,))
  240. strat_caps = extract_strat_caps(s_data, alias, db_caps)
  241. for cid in strat_caps:
  242. cur.execute("""INSERT INTO strategy_capability
  243. (strategy_id, capability_id, relation_type)
  244. VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""",
  245. (s_id, cid))
  246. stats['strat_cap_rows'] += len(strat_caps)
  247. # 4d. req_cap superset: 备选的 caps 也算研究发现的,并入 req_cap
  248. for cid in strat_caps:
  249. cur.execute("""INSERT INTO requirement_capability
  250. (requirement_id, capability_id) VALUES (%s, %s)
  251. ON CONFLICT DO NOTHING""", (req_id, cid))
  252. print(f'[{folder_key}] req={req_id} strategies={len(all_strats)} '
  253. f'cov_matched={sum(1 for m in match_map.values() if m is not None)}/{len(cov_strats)}',
  254. flush=True)
  255. # ═══════════════════════════════════════════════════════════
  256. def fill_placeholder_req_strats(cur, coverage_map, stats):
  257. """为 5 个 placeholder folder 的已有 strategy 填充 requirement_strategy 的新字段。"""
  258. for req_id in ['REQ_004', 'REQ_031', 'REQ_053', 'REQ_066', 'REQ_070']:
  259. cur.execute("""SELECT s.id, s.name FROM strategy s
  260. JOIN requirement_strategy rs ON rs.strategy_id = s.id
  261. WHERE rs.requirement_id = %s""", (req_id,))
  262. row = cur.fetchone()
  263. if not row: continue
  264. # placeholder folder 本来就是 is_selected = true(只有这一条)
  265. cur.execute("""DELETE FROM requirement_strategy
  266. WHERE requirement_id = %s AND strategy_id = %s""",
  267. (req_id, row['id']))
  268. cur.execute("""INSERT INTO requirement_strategy
  269. (requirement_id, strategy_id, is_selected,
  270. coverage_score, coverage_explanation)
  271. VALUES (%s, %s, %s, NULL, NULL)""",
  272. (req_id, row['id'], True))
  273. stats['placeholder_filled'] += 1
  274. # ═══════════════════════════════════════════════════════════
  275. def main():
  276. coverage_map = json.loads(COVERAGE_FILE.read_text(encoding='utf-8'))
  277. print(f'Loaded coverage for {len(coverage_map)} reqs', flush=True)
  278. s = PostgreSQLCapabilityStore()
  279. cur = s._get_cursor()
  280. try:
  281. alias, db_caps = build_alias_and_member(cur)
  282. print(f'Alias entries: {len(alias)}, DB caps: {len(db_caps)}', flush=True)
  283. stats = {'strat_inserted': 0, 'strat_updated': 0,
  284. 'req_strat_rows': 0, 'strat_cap_rows': 0,
  285. 'placeholder_filled': 0,
  286. 'no_req_text': [], 'no_req_match': [], 'bad_strat_file': [],
  287. 'no_strats_list': []}
  288. # 处理 output 2/ 所有 folder(除 rerun 5 用 downloads/5/ 替代;5 placeholder 跳过)
  289. folders = []
  290. for d in sorted(OUTPUT_DIR.iterdir()):
  291. if not d.is_dir(): continue
  292. if d.name in PLACEHOLDER_FOLDERS:
  293. continue
  294. if d.name in RERUN_FOLDERS:
  295. folders.append(RERUN_DIR / d.name)
  296. else:
  297. folders.append(d)
  298. print(f'\nProcessing {len(folders)} folders...', flush=True)
  299. for folder in folders:
  300. ingest_folder(folder, cur, alias, db_caps, coverage_map, stats)
  301. # placeholder folder 的 strategy 只填 requirement_strategy 新字段
  302. print('\n=== 填充 5 个 placeholder req 的 requirement_strategy 新字段 ===', flush=True)
  303. fill_placeholder_req_strats(cur, coverage_map, stats)
  304. # ═════ 验证输出 ═════
  305. print(f'\n{"="*60}\nStats:', flush=True)
  306. for k, v in stats.items():
  307. if isinstance(v, list):
  308. print(f' {k}: {len(v)} {v[:5]}', flush=True)
  309. else:
  310. print(f' {k}: {v}', flush=True)
  311. cur.execute("""SELECT
  312. (SELECT COUNT(*) FROM strategy) s_total,
  313. (SELECT COUNT(*) FROM requirement_strategy) rs_total,
  314. (SELECT COUNT(*) FROM requirement_strategy WHERE is_selected=TRUE) rs_sel,
  315. (SELECT COUNT(*) FROM requirement_strategy WHERE is_selected=FALSE) rs_alt,
  316. (SELECT COUNT(*) FROM requirement_strategy WHERE coverage_score IS NOT NULL) rs_covered,
  317. (SELECT COUNT(*) FROM strategy_capability) sc_total,
  318. (SELECT COUNT(*) FROM requirement_capability) rc_total""")
  319. r = cur.fetchone()
  320. print(f'\n最终:', flush=True)
  321. for k, v in dict(r).items(): print(f' {k}: {v}', flush=True)
  322. # 每 req 的 strategy 数分布
  323. cur.execute("""SELECT strat_count, COUNT(*) reqs FROM
  324. (SELECT requirement_id, COUNT(*) strat_count FROM requirement_strategy
  325. GROUP BY requirement_id) t
  326. GROUP BY strat_count ORDER BY strat_count""")
  327. print(f'\n每 req 的 strategy 数分布:', flush=True)
  328. for r in cur.fetchall():
  329. print(f' {r["strat_count"]} strategies → {r["reqs"]} reqs', flush=True)
  330. finally:
  331. cur.close()
  332. s.close()
  333. if __name__ == '__main__':
  334. main()