pg_store.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. """
  2. PostgreSQL 存储封装(替代 Milvus)
  3. 使用远程 PostgreSQL + pgvector/fastann 存储知识数据
  4. """
  5. import os
  6. import json
  7. import psycopg2
  8. from psycopg2.extras import RealDictCursor, execute_batch
  9. from typing import List, Dict, Optional
  10. from dotenv import load_dotenv
  11. from knowhub.knowhub_db.cascade import cascade_delete
  12. from knowhub.knowhub_db.version_context import version_where
  13. load_dotenv()
  14. # 关联字段的子查询(从 junction table 读取)
  15. # 对于带 relation_type 的 *_knowledge 边,同时暴露两种视图:
  16. # - *_ids : 扁平 ID 列表(向后兼容,不含 type)
  17. # - *_links : [{id, relation_type}](含 type)
  18. _REL_SUBQUERIES = """
  19. (SELECT COALESCE(json_agg(rk.requirement_id), '[]'::json)
  20. FROM requirement_knowledge rk WHERE rk.knowledge_id = knowledge.id) AS requirement_ids,
  21. (SELECT COALESCE(json_agg(json_build_object(
  22. 'id', rk2.requirement_id, 'relation_type', rk2.relation_type
  23. )), '[]'::json)
  24. FROM requirement_knowledge rk2 WHERE rk2.knowledge_id = knowledge.id) AS requirement_links,
  25. (SELECT COALESCE(json_agg(ck.capability_id), '[]'::json)
  26. FROM capability_knowledge ck WHERE ck.knowledge_id = knowledge.id) AS capability_ids,
  27. (SELECT COALESCE(json_agg(json_build_object(
  28. 'id', ck2.capability_id, 'relation_type', ck2.relation_type
  29. )), '[]'::json)
  30. FROM capability_knowledge ck2 WHERE ck2.knowledge_id = knowledge.id) AS capability_links,
  31. (SELECT COALESCE(json_agg(tk.tool_id), '[]'::json)
  32. FROM tool_knowledge tk WHERE tk.knowledge_id = knowledge.id) AS tool_ids,
  33. (SELECT COALESCE(json_agg(json_build_object(
  34. 'id', tk2.tool_id, 'relation_type', tk2.relation_type
  35. )), '[]'::json)
  36. FROM tool_knowledge tk2 WHERE tk2.knowledge_id = knowledge.id) AS tool_links,
  37. (SELECT COALESCE(json_agg(kr.resource_id), '[]'::json)
  38. FROM knowledge_resource kr WHERE kr.knowledge_id = knowledge.id) AS resource_ids,
  39. (SELECT COALESCE(json_agg(json_build_object(
  40. 'target_id', krel.target_id, 'relation_type', krel.relation_type
  41. )), '[]'::json)
  42. FROM knowledge_relation krel WHERE krel.source_id = knowledge.id) AS relations
  43. """
  44. # 基础字段(不含 embedding)
  45. _BASE_FIELDS = (
  46. "id, message_id, task, content, types, tags, tag_keys, "
  47. "scopes, owner, source, eval, "
  48. "created_at, updated_at, status, version"
  49. )
  50. # 完整 SELECT(含关联子查询)
  51. _SELECT_FIELDS = f"{_BASE_FIELDS}, {_REL_SUBQUERIES}"
  52. # 含 embedding 的 SELECT
  53. _SELECT_FIELDS_WITH_EMB = f"task_embedding, content_embedding, {_SELECT_FIELDS}"
  54. def _normalize_links(data: Dict, links_key: str, ids_key: str, default_type: str):
  55. """
  56. 统一两种输入格式:
  57. - {links_key: [{id, relation_type}, ...]} → 使用指定 type
  58. - {ids_key: [id1, id2, ...]} → 使用 default_type
  59. 两个 key 都没有返回 None(不更新)
  60. """
  61. if links_key in data and data[links_key] is not None:
  62. out = []
  63. for item in data[links_key]:
  64. if isinstance(item, dict):
  65. out.append((item['id'], item.get('relation_type', default_type)))
  66. else:
  67. out.append((item, default_type))
  68. return out
  69. if ids_key in data and data[ids_key] is not None:
  70. return [(i, default_type) for i in data[ids_key]]
  71. return None
  72. class PostgreSQLStore:
  73. def __init__(self):
  74. """初始化 PostgreSQL 连接"""
  75. self.conn = psycopg2.connect(
  76. host=os.getenv('KNOWHUB_DB'),
  77. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  78. user=os.getenv('KNOWHUB_USER'),
  79. password=os.getenv('KNOWHUB_PASSWORD'),
  80. database=os.getenv('KNOWHUB_DB_NAME')
  81. )
  82. self.conn.autocommit = True
  83. print(f"[PostgreSQL] 已连接到远程数据库: {os.getenv('KNOWHUB_DB')}")
  84. def _reconnect(self):
  85. self.conn = psycopg2.connect(
  86. host=os.getenv('KNOWHUB_DB'),
  87. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  88. user=os.getenv('KNOWHUB_USER'),
  89. password=os.getenv('KNOWHUB_PASSWORD'),
  90. database=os.getenv('KNOWHUB_DB_NAME')
  91. )
  92. self.conn.autocommit = True
  93. def _ensure_connection(self):
  94. if self.conn.closed != 0:
  95. self._reconnect()
  96. else:
  97. try:
  98. c = self.conn.cursor()
  99. c.execute("SELECT 1")
  100. c.close()
  101. except (psycopg2.OperationalError, psycopg2.InterfaceError):
  102. self._reconnect()
  103. def _get_cursor(self):
  104. """获取游标"""
  105. self._ensure_connection()
  106. return self.conn.cursor(cursor_factory=RealDictCursor)
  107. def insert(self, knowledge: Dict):
  108. """插入单条知识。若同 id 已存在会先删再插(AnalyticDB beam 不支持 ON CONFLICT UPDATE)。"""
  109. cursor = self._get_cursor()
  110. try:
  111. cursor.execute("DELETE FROM knowledge WHERE id = %s", (knowledge['id'],))
  112. cursor.execute("""
  113. INSERT INTO knowledge (
  114. id, task_embedding, content_embedding, message_id, task, content, types, tags,
  115. tag_keys, scopes, owner, source, eval,
  116. created_at, updated_at, status, version
  117. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  118. """, (
  119. knowledge['id'],
  120. knowledge.get('task_embedding') or knowledge.get('embedding'),
  121. knowledge.get('content_embedding'),
  122. knowledge['message_id'],
  123. knowledge['task'],
  124. knowledge['content'],
  125. knowledge.get('types', []),
  126. json.dumps(knowledge.get('tags', {})),
  127. knowledge.get('tag_keys', []),
  128. knowledge.get('scopes', []),
  129. knowledge['owner'],
  130. json.dumps(knowledge.get('source', {})),
  131. json.dumps(knowledge.get('eval', {})),
  132. knowledge['created_at'],
  133. knowledge['updated_at'],
  134. knowledge.get('status', 'approved'),
  135. knowledge.get('version', 'v0'),
  136. ))
  137. # 写入关联表
  138. kid = knowledge['id']
  139. req_links = _normalize_links(knowledge, 'requirement_links', 'requirement_ids', 'related') or []
  140. for req_id, rtype in req_links:
  141. cursor.execute(
  142. "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
  143. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  144. (req_id, kid, rtype))
  145. cap_links = _normalize_links(knowledge, 'capability_links', 'capability_ids', 'related') or []
  146. for cap_id, rtype in cap_links:
  147. cursor.execute(
  148. "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
  149. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  150. (cap_id, kid, rtype))
  151. tool_links = _normalize_links(knowledge, 'tool_links', 'tool_ids', 'related') or []
  152. for tool_id, rtype in tool_links:
  153. cursor.execute(
  154. "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
  155. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  156. (tool_id, kid, rtype))
  157. for res_id in knowledge.get('resource_ids', []):
  158. cursor.execute(
  159. "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
  160. (kid, res_id))
  161. self.conn.commit()
  162. finally:
  163. cursor.close()
  164. def _apply_relation_filters(self, where_clause: str, relation_filters: Optional[Dict[str, str]], params: list) -> str:
  165. if not relation_filters:
  166. return where_clause
  167. rel_clauses = []
  168. for k, v in relation_filters.items():
  169. if not v: continue
  170. if k == 'requirement_id':
  171. rel_clauses.append("EXISTS (SELECT 1 FROM requirement_knowledge rk WHERE rk.knowledge_id = knowledge.id AND rk.requirement_id = %s)")
  172. params.append(v)
  173. elif k == 'capability_id':
  174. rel_clauses.append("EXISTS (SELECT 1 FROM capability_knowledge ck WHERE ck.knowledge_id = knowledge.id AND ck.capability_id = %s)")
  175. params.append(v)
  176. elif k == 'tool_id':
  177. rel_clauses.append("EXISTS (SELECT 1 FROM tool_knowledge tk WHERE tk.knowledge_id = knowledge.id AND tk.tool_id = %s)")
  178. params.append(v)
  179. if not rel_clauses:
  180. return where_clause
  181. rel_where = " AND ".join(rel_clauses)
  182. if where_clause.strip():
  183. return f"{where_clause} AND {rel_where}"
  184. else:
  185. return f"WHERE {rel_where}"
  186. def _inject_version(self, where_clause: str, params: list) -> str:
  187. """向 where_clause 注入 version 过滤(include_v0=True,knowledge 共享 v0 基层)"""
  188. vf, vp = version_where(include_v0=True)
  189. params.extend(vp)
  190. if where_clause.strip():
  191. # 已有 WHERE x
  192. return f'{where_clause} AND {vf}'
  193. return f'WHERE {vf}'
  194. def search(self, query_embedding: List[float], filters: Optional[str] = None, limit: int = 10, relation_filters: Optional[Dict[str, str]] = None) -> List[Dict]:
  195. """向量检索(使用余弦相似度)"""
  196. cursor = self._get_cursor()
  197. try:
  198. where_clause = self._build_where_clause(filters) if filters else ""
  199. params = []
  200. where_clause = self._apply_relation_filters(where_clause, relation_filters, params)
  201. where_clause = self._inject_version(where_clause, params)
  202. sql = f"""
  203. SELECT {_SELECT_FIELDS},
  204. 1 - (task_embedding <=> %s::real[]) as score
  205. FROM knowledge
  206. {where_clause}
  207. ORDER BY task_embedding <=> %s::real[]
  208. LIMIT %s
  209. """
  210. final_params = [query_embedding] + params + [query_embedding, limit]
  211. cursor.execute(sql, tuple(final_params))
  212. results = cursor.fetchall()
  213. return [self._format_result(r) for r in results]
  214. finally:
  215. cursor.close()
  216. def query(self, filters: str, limit: int = 100, relation_filters: Optional[Dict[str, str]] = None) -> List[Dict]:
  217. """纯标量查询"""
  218. cursor = self._get_cursor()
  219. try:
  220. where_clause = self._build_where_clause(filters) if filters else ""
  221. params = []
  222. where_clause = self._apply_relation_filters(where_clause, relation_filters, params)
  223. where_clause = self._inject_version(where_clause, params)
  224. sql = f"""
  225. SELECT {_SELECT_FIELDS}
  226. FROM knowledge
  227. {where_clause}
  228. LIMIT %s
  229. """
  230. final_params = params + [limit]
  231. cursor.execute(sql, tuple(final_params))
  232. results = cursor.fetchall()
  233. return [self._format_result(r) for r in results]
  234. finally:
  235. cursor.close()
  236. def get_by_id(self, knowledge_id: str, include_embedding: bool = False) -> Optional[Dict]:
  237. """根据ID获取知识(默认不返回embedding以提升性能)"""
  238. cursor = self._get_cursor()
  239. try:
  240. fields = _SELECT_FIELDS_WITH_EMB if include_embedding else _SELECT_FIELDS
  241. vf, vp = version_where(include_v0=True)
  242. cursor.execute(f"""
  243. SELECT {fields}
  244. FROM knowledge WHERE id = %s AND {vf}
  245. """, (knowledge_id, *vp))
  246. result = cursor.fetchone()
  247. return self._format_result(result) if result else None
  248. finally:
  249. cursor.close()
  250. def update(self, knowledge_id: str, updates: Dict):
  251. """更新知识"""
  252. cursor = self._get_cursor()
  253. try:
  254. # 分离关联字段和实体字段
  255. rel_keys = ('requirement_ids', 'requirement_links',
  256. 'capability_ids', 'capability_links',
  257. 'tool_ids', 'tool_links', 'resource_ids')
  258. rel_data = {k: updates.pop(k) for k in rel_keys if k in updates}
  259. if updates:
  260. set_parts = []
  261. params = []
  262. for key, value in updates.items():
  263. if key in ('tags', 'source', 'eval'):
  264. set_parts.append(f"{key} = %s")
  265. params.append(json.dumps(value))
  266. else:
  267. set_parts.append(f"{key} = %s")
  268. params.append(value)
  269. params.append(knowledge_id)
  270. sql = f"UPDATE knowledge SET {', '.join(set_parts)} WHERE id = %s"
  271. cursor.execute(sql, params)
  272. # 更新关联表(全量替换)
  273. req_links = _normalize_links(rel_data, 'requirement_links', 'requirement_ids', 'related')
  274. if req_links is not None:
  275. cursor.execute("DELETE FROM requirement_knowledge WHERE knowledge_id = %s", (knowledge_id,))
  276. for req_id, rtype in req_links:
  277. cursor.execute(
  278. "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
  279. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  280. (req_id, knowledge_id, rtype))
  281. cap_links = _normalize_links(rel_data, 'capability_links', 'capability_ids', 'related')
  282. if cap_links is not None:
  283. cursor.execute("DELETE FROM capability_knowledge WHERE knowledge_id = %s", (knowledge_id,))
  284. for cap_id, rtype in cap_links:
  285. cursor.execute(
  286. "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
  287. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  288. (cap_id, knowledge_id, rtype))
  289. tool_links = _normalize_links(rel_data, 'tool_links', 'tool_ids', 'related')
  290. if tool_links is not None:
  291. cursor.execute("DELETE FROM tool_knowledge WHERE knowledge_id = %s", (knowledge_id,))
  292. for tool_id, rtype in tool_links:
  293. cursor.execute(
  294. "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
  295. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  296. (tool_id, knowledge_id, rtype))
  297. if 'resource_ids' in rel_data and rel_data['resource_ids'] is not None:
  298. cursor.execute("DELETE FROM knowledge_resource WHERE knowledge_id = %s", (knowledge_id,))
  299. for res_id in rel_data['resource_ids']:
  300. cursor.execute(
  301. "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
  302. (knowledge_id, res_id))
  303. self.conn.commit()
  304. finally:
  305. cursor.close()
  306. def delete(self, knowledge_id: str):
  307. """删除知识及其关联表记录"""
  308. cursor = self._get_cursor()
  309. try:
  310. cascade_delete(cursor, 'knowledge', knowledge_id)
  311. self.conn.commit()
  312. finally:
  313. cursor.close()
  314. def add_relation(self, source_id: str, target_id: str, relation_type: str):
  315. """添加一条知识间关系(不删除已有关系)"""
  316. cursor = self._get_cursor()
  317. try:
  318. cursor.execute(
  319. "INSERT INTO knowledge_relation (source_id, target_id, relation_type) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  320. (source_id, target_id, relation_type))
  321. self.conn.commit()
  322. finally:
  323. cursor.close()
  324. def add_resource(self, knowledge_id: str, resource_id: str):
  325. """添加一条知识-资源关联(不删除已有关联)"""
  326. cursor = self._get_cursor()
  327. try:
  328. cursor.execute(
  329. "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
  330. (knowledge_id, resource_id))
  331. self.conn.commit()
  332. finally:
  333. cursor.close()
  334. def add_requirement(self, knowledge_id: str, requirement_id: str,
  335. relation_type: str = 'related'):
  336. """增量挂接 requirement-knowledge 边"""
  337. cursor = self._get_cursor()
  338. try:
  339. cursor.execute(
  340. "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
  341. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  342. (requirement_id, knowledge_id, relation_type))
  343. self.conn.commit()
  344. finally:
  345. cursor.close()
  346. def add_capability(self, knowledge_id: str, capability_id: str,
  347. relation_type: str = 'related'):
  348. """增量挂接 capability-knowledge 边"""
  349. cursor = self._get_cursor()
  350. try:
  351. cursor.execute(
  352. "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
  353. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  354. (capability_id, knowledge_id, relation_type))
  355. self.conn.commit()
  356. finally:
  357. cursor.close()
  358. def add_tool(self, knowledge_id: str, tool_id: str,
  359. relation_type: str = 'related'):
  360. """增量挂接 tool-knowledge 边"""
  361. cursor = self._get_cursor()
  362. try:
  363. cursor.execute(
  364. "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
  365. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  366. (tool_id, knowledge_id, relation_type))
  367. self.conn.commit()
  368. finally:
  369. cursor.close()
  370. def count(self) -> int:
  371. """返回知识总数"""
  372. cursor = self._get_cursor()
  373. try:
  374. vf, vp = version_where(include_v0=True)
  375. cursor.execute(f"SELECT COUNT(*) as count FROM knowledge WHERE {vf}", vp)
  376. return cursor.fetchone()['count']
  377. finally:
  378. cursor.close()
  379. def _build_where_clause(self, filters: str) -> str:
  380. """将Milvus风格的过滤表达式转换为PostgreSQL WHERE子句"""
  381. if not filters:
  382. return ""
  383. where = filters
  384. import re
  385. # 替换操作符
  386. where = where.replace(' == ', ' = ')
  387. where = where.replace(' or ', ' OR ')
  388. where = where.replace(' and ', ' AND ')
  389. # 处理数组包含操作
  390. where = re.sub(r'array_contains\((\w+),\s*"([^"]+)"\)', r"\1 @> ARRAY['\2']", where)
  391. # 处理 eval["score"] 语法
  392. where = where.replace('eval["score"]', "(eval->>'score')::int")
  393. # 把所有剩余的双引号字符串值替换为单引号(PostgreSQL标准)
  394. where = re.sub(r'"([^"]*)"', r"'\1'", where)
  395. return f"WHERE {where}"
  396. def _format_result(self, row: Dict) -> Dict:
  397. """格式化查询结果"""
  398. if not row:
  399. return None
  400. result = dict(row)
  401. if 'tags' in result and isinstance(result['tags'], str):
  402. result['tags'] = json.loads(result['tags'])
  403. if 'source' in result and isinstance(result['source'], str):
  404. result['source'] = json.loads(result['source'])
  405. if 'eval' in result and isinstance(result['eval'], str):
  406. result['eval'] = json.loads(result['eval'])
  407. # 关联字段(来自 junction table 子查询,可能是 JSON 字符串或已解析的列表)
  408. for field in ('requirement_ids', 'capability_ids', 'tool_ids', 'resource_ids',
  409. 'requirement_links', 'capability_links', 'tool_links'):
  410. if field in result and isinstance(result[field], str):
  411. result[field] = json.loads(result[field])
  412. elif field in result and result[field] is None:
  413. result[field] = []
  414. if 'relations' in result and isinstance(result['relations'], str):
  415. result['relations'] = json.loads(result['relations'])
  416. elif 'relations' in result and result['relations'] is None:
  417. result['relations'] = []
  418. if 'created_at' in result and result['created_at']:
  419. result['created_at'] = result['created_at'] * 1000
  420. if 'updated_at' in result and result['updated_at']:
  421. result['updated_at'] = result['updated_at'] * 1000
  422. return result
  423. def close(self):
  424. """关闭连接"""
  425. if self.conn:
  426. self.conn.close()
  427. def insert_batch(self, knowledge_list: List[Dict]):
  428. """批量插入知识"""
  429. if not knowledge_list:
  430. return
  431. cursor = self._get_cursor()
  432. try:
  433. data = []
  434. for k in knowledge_list:
  435. data.append((
  436. k['id'], k.get('task_embedding') or k.get('embedding'),
  437. k.get('content_embedding'),
  438. k['message_id'], k['task'],
  439. k['content'], k.get('types', []),
  440. json.dumps(k.get('tags', {})), k.get('tag_keys', []),
  441. k.get('scopes', []), k['owner'],
  442. json.dumps(k.get('source', {})), json.dumps(k.get('eval', {})),
  443. k['created_at'], k['updated_at'], k.get('status', 'approved'),
  444. k.get('version', 'v0'),
  445. ))
  446. execute_batch(cursor, """
  447. INSERT INTO knowledge (
  448. id, task_embedding, content_embedding, message_id, task, content, types, tags,
  449. tag_keys, scopes, owner, source, eval,
  450. created_at, updated_at, status, version
  451. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  452. """, data)
  453. # 批量写入关联表
  454. for k in knowledge_list:
  455. kid = k['id']
  456. req_links = _normalize_links(k, 'requirement_links', 'requirement_ids', 'related') or []
  457. for req_id, rtype in req_links:
  458. cursor.execute(
  459. "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
  460. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  461. (req_id, kid, rtype))
  462. cap_links = _normalize_links(k, 'capability_links', 'capability_ids', 'related') or []
  463. for cap_id, rtype in cap_links:
  464. cursor.execute(
  465. "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
  466. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  467. (cap_id, kid, rtype))
  468. tool_links = _normalize_links(k, 'tool_links', 'tool_ids', 'related') or []
  469. for tool_id, rtype in tool_links:
  470. cursor.execute(
  471. "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
  472. "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  473. (tool_id, kid, rtype))
  474. for res_id in k.get('resource_ids', []):
  475. cursor.execute(
  476. "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
  477. (kid, res_id))
  478. self.conn.commit()
  479. finally:
  480. cursor.close()