taodev_ingest.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #!/usr/bin/env python3
  2. """
  3. tao_dev 版本:把 /Users/sunlit/Downloads/output-new 下 99 folders 原始数据全量入库。
  4. 原则:不做去重。CAP-001 在多个 folder 出现 → 各自生成独立的 cap 行。
  5. 关键细节(修订版):
  6. - LLM 输出里 is_new=True 的 cap 往往无 id / id 为空 → 用
  7. `{req_id}::NEW-{idx}` 合成 ID 入库,不能丢数据。
  8. - strategy.workflow_outline.capabilities[] 引用 cap 时有三种情况:
  9. a) 有 id → 按 id 查 folder_cap_ids
  10. b) 无 id 但有 name → 按 name 查 folder_cap_names
  11. c) 两者都无 → 跳过并计数
  12. - case_references 用中/西文冒号分隔均兼容。
  13. ID 方案:
  14. requirement: {orig_req_id}__td
  15. capability : {orig_req_id}::{raw_cap_id or 'NEW-<idx>'}
  16. strategy : strategy-taodev-{orig_req_id}-{idx}
  17. resource : resource/taodev/{orig_req_id}/{platform}/{case_id}
  18. 幂等:所有 INSERT 带 ON CONFLICT DO NOTHING;重跑不重复。
  19. """
  20. import json
  21. import re
  22. import sys
  23. import time
  24. from pathlib import Path
  25. import psycopg2.extras
  26. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  27. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  28. OUTPUT = Path('/Users/sunlit/Downloads/output-new')
  29. VERSION = 'tao_dev'
  30. CASE_REF_RE = re.compile(r'^([a-z]+)/(case_\w+)[::\s]')
  31. def parse_case_ref(ref: str):
  32. if not isinstance(ref, str):
  33. return None, None
  34. m = CASE_REF_RE.match(ref.strip())
  35. if m:
  36. return m.group(1), m.group(2)
  37. return None, None
  38. def norm_name(n):
  39. return (n or '').strip()
  40. def main():
  41. wipe_first = '--wipe' in sys.argv
  42. s = PostgreSQLCapabilityStore()
  43. cur = s._get_cursor()
  44. try:
  45. cur.execute("SET statement_timeout = '300s'")
  46. cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction'
  47. AND pid!=pg_backend_pid() AND datname=current_database()""")
  48. for r in cur.fetchall():
  49. cur.execute('SELECT pg_terminate_backend(%s)', (r['pid'],))
  50. if wipe_first:
  51. print('⚠ --wipe: 清空现有 tao_dev 数据', flush=True)
  52. # 清 junction(按 version 过滤父表)
  53. for j, parent, fk in [
  54. ('strategy_capability', 'strategy', 'strategy_id'),
  55. ('strategy_resource', 'strategy', 'strategy_id'),
  56. ('requirement_strategy', 'strategy', 'strategy_id'),
  57. ('requirement_capability', 'capability', 'capability_id'),
  58. ('requirement_resource', 'resource', 'resource_id'),
  59. ]:
  60. cur.execute(f"""DELETE FROM {j} WHERE {fk} IN
  61. (SELECT id FROM {parent} WHERE version=%s)""", (VERSION,))
  62. print(f' cleared {j}: {cur.rowcount}', flush=True)
  63. for t in ['strategy', 'capability', 'resource', 'requirement']:
  64. cur.execute(f'DELETE FROM {t} WHERE version=%s', (VERSION,))
  65. print(f' cleared {t}: {cur.rowcount}', flush=True)
  66. cur.execute('SELECT id, description, source_nodes, status, match_result FROM requirement WHERE version=%s', ('v0',))
  67. req_map = {r['description']: r for r in cur.fetchall()}
  68. print(f'v0 req 映射: {len(req_map)}', flush=True)
  69. folders = sorted([f for f in OUTPUT.iterdir() if f.is_dir()])
  70. print(f'folders: {len(folders)}', flush=True)
  71. stats = {'req': 0, 'cap': 0, 'cap_synth_id': 0, 'strat': 0, 'res': 0,
  72. 'req_cap': 0, 'req_strat': 0, 'req_res': 0,
  73. 'strat_cap_by_id': 0, 'strat_cap_by_name': 0, 'strat_cap_skip': 0,
  74. 'strat_res': 0, 'strat_res_skip': 0}
  75. for folder in folders:
  76. t0 = time.time()
  77. sd = json.loads((folder / 'strategy.json').read_text(encoding='utf-8'))
  78. cd = json.loads((folder / 'capabilities_extracted.json').read_text(encoding='utf-8'))
  79. req_text = sd.get('requirement') or cd.get('requirement')
  80. r0 = req_map.get(req_text)
  81. if not r0:
  82. print(f' [{folder.name}] 无法匹配 req,跳过', flush=True); continue
  83. orig_req = r0['id']
  84. new_req_id = f'{orig_req}__td'
  85. # ── 1. requirement ───────────────────────────────
  86. cur.execute("""INSERT INTO requirement (id, description, source_nodes, status,
  87. match_result, version)
  88. VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING""",
  89. (new_req_id, r0['description'],
  90. psycopg2.extras.Json(r0['source_nodes']) if r0['source_nodes'] is not None else None,
  91. r0['status'], r0['match_result'], VERSION))
  92. stats['req'] += cur.rowcount or 0
  93. # ── 2. capabilities(含无 id 的新能力)─────────────
  94. folder_cap_by_id = {}
  95. folder_cap_by_name = {}
  96. for idx, c in enumerate(cd.get('extracted_capabilities', [])):
  97. if not isinstance(c, dict): continue
  98. raw_id = c.get('id') or ''
  99. if raw_id.strip():
  100. cap_id = f'{orig_req}::{raw_id}'
  101. else:
  102. cap_id = f'{orig_req}::NEW-{idx}'
  103. stats['cap_synth_id'] += 1
  104. nm = norm_name(c.get('name'))
  105. if nm: folder_cap_by_name[nm] = cap_id
  106. if raw_id.strip(): folder_cap_by_id[raw_id] = cap_id
  107. effects = c.get('effects')
  108. effects_json = psycopg2.extras.Json(
  109. {'items': effects} if isinstance(effects, list) else effects
  110. ) if effects is not None else None
  111. cur.execute("""INSERT INTO capability (id, name, criterion, description, effects, version)
  112. VALUES (%s,%s,%s,%s,%s,%s)
  113. ON CONFLICT (id) DO NOTHING""",
  114. (cap_id, nm, c.get('criterion', ''), c.get('description', ''),
  115. effects_json, VERSION))
  116. stats['cap'] += cur.rowcount or 0
  117. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  118. VALUES (%s,%s) ON CONFLICT DO NOTHING""",
  119. (new_req_id, cap_id))
  120. stats['req_cap'] += cur.rowcount or 0
  121. # ── 3. resources(raw_cases)──────────────────────
  122. folder_res_ids = {}
  123. rc_dir = folder / 'raw_cases'
  124. if rc_dir.exists():
  125. for cf in sorted(rc_dir.iterdir()):
  126. if cf.suffix != '.json': continue
  127. try:
  128. cj = json.loads(cf.read_text(encoding='utf-8'))
  129. except Exception:
  130. continue
  131. for case in cj.get('cases', []):
  132. platform = case.get('platform', '')
  133. cid = case.get('id', '')
  134. if not platform or not cid: continue
  135. res_id = f'resource/taodev/{orig_req}/{platform}/{cid}'
  136. folder_res_ids[(platform, cid)] = res_id
  137. metadata = {
  138. 'source_url': case.get('source_url'),
  139. 'metrics': case.get('metrics'),
  140. 'user_feedback': case.get('user_feedback'),
  141. 'input_details': case.get('input_details'),
  142. 'output_details': case.get('output_details'),
  143. 'platform': platform,
  144. 'original_case_id': cid,
  145. }
  146. now_ts = int(time.time())
  147. cur.execute("""INSERT INTO resource (id, title, body, secure_body, content_type,
  148. metadata, sort_order, submitted_by, created_at,
  149. updated_at, images, version)
  150. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  151. ON CONFLICT (id) DO NOTHING""",
  152. (res_id, case.get('title', ''),
  153. case.get('workflow_process', ''),
  154. None, 'research_case',
  155. psycopg2.extras.Json(metadata), 0, None,
  156. now_ts, now_ts,
  157. psycopg2.extras.Json(case.get('images') or []),
  158. VERSION))
  159. stats['res'] += cur.rowcount or 0
  160. cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
  161. VALUES (%s,%s) ON CONFLICT DO NOTHING""",
  162. (new_req_id, res_id))
  163. stats['req_res'] += cur.rowcount or 0
  164. # ── 4. strategies ─────────────────────────────────
  165. for idx, st in enumerate(sd.get('strategies', [])):
  166. if not isinstance(st, dict): continue
  167. strat_id = f'strategy-taodev-{orig_req}-{idx}'
  168. is_sel = bool(st.get('is_selected'))
  169. body = {
  170. 'source': st.get('source'),
  171. 'workflow_outline': st.get('workflow_outline', []),
  172. 'original_strategy_name': st.get('name', ''),
  173. }
  174. now_ts = int(time.time())
  175. cur.execute("""INSERT INTO strategy (id, name, description, body, status,
  176. created_at, updated_at, version)
  177. VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
  178. ON CONFLICT (id) DO NOTHING""",
  179. (strat_id, (st.get('name', '') or '')[:250],
  180. st.get('source', ''),
  181. psycopg2.extras.Json(body),
  182. 'approved', now_ts, now_ts, VERSION))
  183. stats['strat'] += cur.rowcount or 0
  184. cur.execute("""INSERT INTO requirement_strategy
  185. (requirement_id, strategy_id, is_selected)
  186. VALUES (%s,%s,%s) ON CONFLICT DO NOTHING""",
  187. (new_req_id, strat_id, is_sel))
  188. stats['req_strat'] += cur.rowcount or 0
  189. for ph in st.get('workflow_outline', []) or []:
  190. if not isinstance(ph, dict): continue
  191. for c in ph.get('capabilities', []) or []:
  192. if not isinstance(c, dict): continue
  193. raw_id = (c.get('id') or '').strip()
  194. nm = norm_name(c.get('name'))
  195. # 先 id 查,再 name 查
  196. mapped = folder_cap_by_id.get(raw_id) if raw_id else None
  197. source_kind = 'by_id'
  198. if not mapped and nm:
  199. mapped = folder_cap_by_name.get(nm)
  200. source_kind = 'by_name'
  201. if not mapped:
  202. stats['strat_cap_skip'] += 1; continue
  203. cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id)
  204. VALUES (%s,%s) ON CONFLICT DO NOTHING""",
  205. (strat_id, mapped))
  206. stats[f'strat_cap_{source_kind}'] += cur.rowcount or 0
  207. for ref in c.get('case_references', []) or []:
  208. platform, case_id = parse_case_ref(ref)
  209. if not platform or not case_id:
  210. stats['strat_res_skip'] += 1; continue
  211. res_id = folder_res_ids.get((platform, case_id))
  212. if not res_id:
  213. stats['strat_res_skip'] += 1; continue
  214. cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
  215. VALUES (%s,%s) ON CONFLICT DO NOTHING""",
  216. (strat_id, res_id))
  217. stats['strat_res'] += cur.rowcount or 0
  218. print(f' [{folder.name}] {orig_req}: {time.time()-t0:.1f}s', flush=True)
  219. print('\n=== 入库统计 ===', flush=True)
  220. for k, v in stats.items():
  221. print(f' {k}: {v}', flush=True)
  222. print('\n=== tao_dev 版本分布 ===', flush=True)
  223. for tbl in ['requirement', 'capability', 'strategy', 'resource']:
  224. cur.execute(f'SELECT COUNT(*) c FROM {tbl} WHERE version=%s', (VERSION,))
  225. print(f' {tbl}: {cur.fetchone()["c"]}', flush=True)
  226. finally:
  227. cur.close(); s.close()
  228. if __name__ == '__main__':
  229. main()