Sfoglia il codice sorgente

feat: strategy table in knowhub & relation type

Talegorithm 1 giorno fa
parent
commit
1f44e5fa76

+ 145 - 0
knowhub/docs/db-operations.md

@@ -0,0 +1,145 @@
+# 数据库操作规范
+
+**对象**:阿里云 AnalyticDB for PostgreSQL(基于 Greenplum,MPP 架构)
+
+**用途**:写 migration、运维脚本、排查数据库卡死问题时**先读这篇**。所有条目都来自踩过的坑,不是推测。
+
+---
+
+## 1. 致命操作(永远不要做)
+
+| 操作 | 后果 | 替代方案 |
+|------|------|---------|
+| `ALTER TABLE ... RENAME` | 表损坏,需重启实例 | `CREATE TABLE AS SELECT` → `DROP TABLE 旧表` |
+| `ALTER TABLE ... DROP COLUMN` | 表损坏 | 同上 |
+| 事务里执行 DDL | DDL 回滚不完整,部分持久化 | `autocommit=True`,每条 DDL 独立 |
+| `FOREIGN KEY ... ON DELETE CASCADE` | 不支持(底层依赖 trigger) | 应用层级联:`knowhub_db/cascade.py` |
+
+**改表结构的安全模式**:`CREATE TABLE 新表 → INSERT 数据 → DROP TABLE 旧表`。参考 `migrate_v3_junction_tables.py`。
+
+---
+
+## 2. DDL 需谨慎的操作
+
+| 操作 | 风险 | 推荐做法 |
+|------|------|---------|
+| `ADD COLUMN ... NOT NULL DEFAULT 'X'` | 连接可能被服务端杀掉 | 拆成两步:`ADD COLUMN ... DEFAULT 'X'` → 另起连接 `ALTER COLUMN ... SET NOT NULL` |
+| 任何 DDL(ALTER/CREATE INDEX 等) | 需要 `AccessExclusiveLock`,任何 `idle in transaction` 会话都会阻塞无限等 | 跑前先 kill `idle in transaction`;用 `statement_timeout='30s'` 防挂起 |
+| 批量 DDL 连发 | 每次开新连接会累积 TCP 会话 | 一个长连接跑全部 DDL |
+
+---
+
+## 3. Store / 应用端连接规范
+
+### 3.1 永远 `autocommit = True`
+
+```python
+self.conn = psycopg2.connect(...)
+self.conn.autocommit = True   # 必须
+```
+
+**原因**:`autocommit = False` 下,执行一次 SELECT 就会开启一个隐式事务,如果不显式 commit/rollback,连接停在 `idle in transaction` 状态——**永久持有 AccessShareLock**,阻塞后续所有需要 AccessExclusiveLock 的 DDL。这是最难诊断、最容易卡整个系统的坑。
+
+**代价**:多语句写(entity INSERT + 若干 junction 写)失去事务原子性。但我们的写模式是 `DELETE + INSERT ON CONFLICT DO NOTHING`——幂等,失败重跑即可。
+
+详见 `decisions.md §17`。
+
+### 3.2 Migration 脚本模板
+
+```python
+import os, time, psycopg2
+from dotenv import load_dotenv
+load_dotenv()
+
+conn = psycopg2.connect(..., connect_timeout=10)
+conn.autocommit = True      # 不要开事务
+cur = conn.cursor()
+cur.execute("SET statement_timeout = '30s'")  # 卡超过 30s 自动失败
+
+# 动手前先清 idle-in-tx,防止 DDL 等锁无限阻塞
+cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' 
+               AND pid!=pg_backend_pid() AND datname=current_database()""")
+for (pid,) in cur.fetchall():
+    cur.execute("SELECT pg_terminate_backend(%s)", (pid,))
+
+# 每条 DDL 前打 flush 的 print(否则卡住时看不到到哪一步)
+for t in TARGETS:
+    print(f"[{time.strftime('%H:%M:%S')}] ALTER {t}...", flush=True)
+    cur.execute(f"ALTER TABLE {t} ...")
+    print(f"  ✓ done", flush=True)
+```
+
+**关键**:
+- `flush=True` 永远带上——挂起时最需要看到卡在哪一步
+- `SET statement_timeout`——宁可快速失败,不要让 client 无限等
+- 先清 idle-in-tx——否则你做的 DDL 会被别人的长事务卡住
+- 每条 DDL 前后打时间戳
+
+---
+
+## 4. 排查手册
+
+### 症状:连接失败 `remaining connection slots are reserved`
+
+**原因**:连接池打满。通常因为:
+- 有 Python 脚本被 kill 但 TCP 会话服务端未释放(等 idle timeout,可能十几分钟)
+- 生产服务(knowhub server)连接泄漏
+
+**排查**:
+```bash
+lsof -i | grep gpdbmaster   # 看本地有没有残留连接
+```
+
+**恢复**(按成本从低到高):
+1. **等**:Alibaba 的 idle session timeout 会自然释放——但"active"或"idle in transaction"不走超时
+2. **阿里云控制台**:实例管理 → 会话管理 → 手动 terminate。**首选**
+3. **重启实例**:所有连接清零,影响 1-2 分钟
+
+### 症状:DDL 挂起几十秒后 timeout 或被连接断开
+
+**原因 99%**:有 `idle in transaction` 会话持有目标表的 `AccessShareLock`。
+
+**查证**:
+```sql
+SELECT l.pid, l.mode, l.granted, c.relname, a.state, now()-a.query_start AS dur
+FROM pg_locks l
+JOIN pg_class c ON l.relation=c.oid
+LEFT JOIN pg_stat_activity a ON l.pid=a.pid
+WHERE c.relname IN ('your_target_table')
+  AND l.pid != pg_backend_pid();
+```
+
+**解决**:
+```sql
+SELECT pg_terminate_backend(<pid>);
+```
+
+### 症状:脚本静默挂住,没有任何输出
+
+**原因**:Python 的 print buffer 没 flush,execute() 已经在 wait 了。
+
+**预防**:所有 print 带 `flush=True`;`python -u` 或设 `PYTHONUNBUFFERED=1`。
+
+---
+
+## 5. 诊断脚本(都在 `knowhub/knowhub_db/scripts/`)
+
+| 脚本 | 用途 |
+|------|------|
+| `kill_db_locks.py` | 列出所有非 idle 会话 + 杀 idle-in-tx |
+| `clear_locks.py` | 清锁(轻量版) |
+| `check_table_structure.py` | 看表结构和行数 |
+| `check_extensions.py` | 看 PG 扩展(pgvector/fastann 等) |
+
+---
+
+## 6. 金句(30 秒能记住的)
+
+- **DDL 前先 `autocommit=True`**;改 store 也必须 `autocommit=True`
+- **每个 print 带 `flush=True`**
+- **`SET statement_timeout='30s'`**——宁可失败别挂死
+- **跑 DDL 前先 kill idle-in-tx**
+- **禁用 RENAME / DROP COLUMN**
+- **禁用 FK ON DELETE CASCADE**——用 `cascade.py`
+- **别在事务里跑 DDL**
+- 连接满了?**去控制台杀会话**,本地 kill 进程只关本地 socket

+ 47 - 0
knowhub/docs/decisions.md

@@ -463,3 +463,50 @@ async def llm_rerank(query: str, candidates: List[dict], top_k: int):
 - `agent/trace/store.py` - 日志管理
 
 **文档**:`knowhub/docs/feedback-timing-design.md`
+
+---
+
+## 16. Schema v4:strategy 实体 + relation_type 语义标签
+
+**日期**:2026-04-15
+
+**背景**:
+- 现有 schema 没有表达「制作策略」概念——一组原子能力的组合 + 可执行正文 + 来源知识
+- `*_knowledge` 边只表达「相关」,无法区分"这个知识构建了能力"还是"这个知识是能力的应用实例"
+- requirement / capability 想直接挂原始 resource(不经过 knowledge 整理层),现有模型不支持
+
+**决策**:
+
+1. **新增 `strategy` 实体**:`id, name, description, body, status, embedding + timestamps`。通过 `requirement_strategy` 边表达"满足关系"——strategy 是给 requirement 服务的。
+
+2. **`*_knowledge` 边加 `relation_type VARCHAR(32) DEFAULT 'related'`**:枚举值 {`source`, `case`, `compose`, `related`}。不加 DB 侧 CHECK 约束,便于扩展。PK 保持不变(一对 entity-knowledge 仍是唯一的分类)。
+
+3. **新增直连 resource 边**:`capability_resource`、`requirement_resource`、`strategy_resource`——承认 knowledge 不是 resource 的必经整理层,实体可直接归纳于原始素材。无 relation_type(存在即 source 语义)。
+
+**为什么不把 strategy 做成 capability 的一种(`capability.kind='strategy'`)**:
+- strategy 的可执行 `body` 字段语义差异大
+- strategy 是 capability 的**消费者**(通过 `strategy_capability` 组合),不是 capability 本身
+- 将来 strategy 会有自己的版本、状态、评估维度,独立表更可维护
+
+**实现位置**:
+- `knowhub/knowhub_db/migrations/migrate_v4_strategy_and_relation_types.py`
+- `knowhub/knowhub_db/pg_strategy_store.py`
+- `knowhub/knowhub_db/pg_store.py`(及其他 store 的 `knowledge_links` 子查询)
+
+---
+
+## 17. Store 连接 autocommit=True
+
+**日期**:2026-04-15
+
+**背景**:执行 v4 schema migration 时发现,`ALTER TABLE` 在 AnalyticDB 上挂起 30-60s 后 timeout。排查发现:所有 `PostgreSQLXxxStore` 的 `self.conn.autocommit = False`,SELECT 执行后连接停在 `idle in transaction` 状态,**永久持有 `AccessShareLock`**——阻塞任何需要 `AccessExclusiveLock` 的 DDL。
+
+**决策**:所有 store `__init__` 和 `_reconnect` 都改为 `self.conn.autocommit = True`。
+
+**权衡**:
+- 失:多语句写路径失去事务原子性(如 insert entity + DELETE junctions + INSERT junctions)
+- 得:任何 SELECT 不再持长锁;未来 DDL 不会再被阻塞
+
+**为什么可以接受失去原子性**:所有 junction 写入都是 `DELETE FROM ... WHERE eid=X` + `INSERT ... ON CONFLICT DO NOTHING`——**幂等**。部分失败后重跑等价于从头跑。实体主键也用 `ON CONFLICT (id) DO UPDATE`。没有"中间状态"需要被事务保护。
+
+**实现位置**:6 个 store 文件(`pg_store.py`, `pg_resource_store.py`, `pg_tool_store.py`, `pg_capability_store.py`, `pg_requirement_store.py`, `pg_strategy_store.py`)。保留原有 `self.conn.commit()` 调用作为 no-op(最小侵入)。

+ 2 - 0
knowhub/docs/schema-migration-plan.md

@@ -1,5 +1,7 @@
 # Schema 迁移方案:新库 knowhub
 
+> 历史记录:这是 v3 一次性迁移(knowledge_hub → knowhub)的方案。**跑 DDL 前的通用规范**参见 [db-operations.md](./db-operations.md)。
+
 ## 背景
 
 旧库 `knowledge_hub` 使用 JSONB 数组存储实体间关系,存在一致性、索引、命名问题。迁移目标:在新库 `knowhub` 中建立干净的关联表结构。

+ 105 - 23
knowhub/docs/schema.md

@@ -32,22 +32,29 @@ Embedding 模型:`google/gemini-2.5-flash-lite`(通过 OpenRouter)。实
 ## 表间关系
 
 ```
-           requirement
-          ╱           ╲
- capability_ids    knowledge_ids
-        ╱                 ╲
-  capability            knowledge ← knowledge_resource → resource
-        ╲                 ╱  ╲
-     tool_ids      knowledge_ids  knowledge_relation
-        ╲            ╱              (知识间关系)
-         tool
-          |
-    tool_provider (执行层索引)
+              requirement ──── requirement_strategy ──── strategy
+            ╱     ╲   ╲                                 ╱  │  ╲
+  capability_ids  knowledge_ids  resource_ids   capability │  knowledge / resource
+           ╱              ╲         ╲                      compose
+     capability          knowledge ← knowledge_resource → resource
+         │  ╲              ╱  ╲                            ↑
+         │  tool_ids  knowledge_ids  knowledge_relation    │
+         │     ╲         ╱            (知识间关系)          │
+         │      tool    ...          ← capability_resource ┤
+    capability_resource                ← requirement_resource
+                                       ← strategy_resource
+                     tool_provider (执行层索引)
 ```
 
+所有 `*_knowledge` / `knowledge_relation` / `strategy_capability` / `strategy_knowledge` 边带 `relation_type` 语义标签:
+- `source` — 知识/资料是该实体的构建来源
+- `case`   — 知识是该实体的应用实例
+- `compose` — 组合关系(strategy → capability)
+- `related` — 默认/未分类
+
 ---
 
-## 实体表(5)
+## 实体表(6
 
 ### knowledge — 知识表
 
@@ -72,10 +79,10 @@ Embedding 模型:`google/gemini-2.5-flash-lite`(通过 OpenRouter)。实
 | `updated_at` | BIGINT | 更新时间戳(秒) |
 | `status` | VARCHAR | pending → approved / rejected / checked |
 
-关联(通过关联表,API 返回时聚合为 `{entity}_ids`):
-- `requirement_ids` ← requirement_knowledge
-- `capability_ids` ← capability_knowledge
-- `tool_ids` ← tool_knowledge
+关联(通过关联表,API 返回时聚合为 `{entity}_ids` 扁平列表 + `{entity}_links` 带 relation_type):
+- `requirement_ids` / `requirement_links` ← requirement_knowledge
+- `capability_ids` / `capability_links` ← capability_knowledge
+- `tool_ids` / `tool_links` ← tool_knowledge
 - `resource_ids` ← knowledge_resource
 - `relations` ← knowledge_relation
 
@@ -111,7 +118,9 @@ Embedding 模型:`google/gemini-2.5-flash-lite`(通过 OpenRouter)。实
 
 关联:
 - `capability_ids` ← requirement_capability
-- `knowledge_ids` ← requirement_knowledge
+- `knowledge_ids` / `knowledge_links` ← requirement_knowledge
+- `resource_ids` ← requirement_resource
+- `strategy_ids` ← requirement_strategy(满足该需求的 strategy)
 
 ### capability — 原子能力表
 
@@ -128,7 +137,8 @@ Embedding 模型:`google/gemini-2.5-flash-lite`(通过 OpenRouter)。实
 关联:
 - `requirement_ids` ← requirement_capability
 - `tool_ids` ← capability_tool
-- `knowledge_ids` ← capability_knowledge
+- `knowledge_ids` / `knowledge_links` ← capability_knowledge
+- `resource_ids` ← capability_resource
 
 ### tool — 工具表
 
@@ -149,14 +159,35 @@ Embedding 模型:`google/gemini-2.5-flash-lite`(通过 OpenRouter)。实
 
 关联:
 - `capability_ids` ← capability_tool
-- `knowledge_ids` ← tool_knowledge
+- `knowledge_ids` / `knowledge_links` ← tool_knowledge
 - `provider_ids` ← tool_provider
 
+### strategy — 制作策略表
+
+一组原子 capability 的组合,附带自身的可执行 `body`(如工作流脚本、流程描述)与 source 知识。strategy 设计用来满足 requirement。
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `id` | VARCHAR PK | `"strategy-{...}"` |
+| `name` | VARCHAR | 策略名称 |
+| `description` | TEXT | 策略描述 |
+| `body` | TEXT | 可执行正文(工作流/脚本/流程步骤) |
+| `status` | VARCHAR | draft / approved / deprecated 等 |
+| `created_at` | BIGINT | 创建时间戳(秒) |
+| `updated_at` | BIGINT | 更新时间戳(秒) |
+| `embedding` | float4[] | name + description 的向量 |
+
+关联:
+- `requirement_ids` ← requirement_strategy(这个 strategy 被设计用来满足哪些 requirement)
+- `capability_ids` / `capability_links` ← strategy_capability(compose 组合)
+- `knowledge_ids` / `knowledge_links` ← strategy_knowledge(source / case 等)
+- `resource_ids` ← strategy_resource(直接原始素材)
+
 ---
 
-## 关联表(8)
+## 关联表(14
 
-### 实体链(2)
+### 实体链(3
 
 **requirement_capability** — 需求分解为能力
 
@@ -177,7 +208,18 @@ PK: (requirement_id, capability_id)
 
 PK: (capability_id, tool_id)
 
-### 知识链(3)
+**requirement_strategy** — 需求由 strategy 满足
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `requirement_id` | VARCHAR | → requirement.id |
+| `strategy_id` | VARCHAR | → strategy.id |
+
+PK: (requirement_id, strategy_id)
+
+### 知识链(4)
+
+所有 `*_knowledge` 表及 `strategy_knowledge` 都带 `relation_type VARCHAR(32)` 列,值 ∈ {`source`, `case`, `related`}。DEFAULT `'related'`(历史数据)。
 
 **requirement_knowledge** — 需求的方案策略、完成方法
 
@@ -185,6 +227,7 @@ PK: (capability_id, tool_id)
 |------|------|------|
 | `requirement_id` | VARCHAR | → requirement.id |
 | `knowledge_id` | VARCHAR | → knowledge.id |
+| `relation_type` | VARCHAR(32) | source / case / related(默认) |
 
 PK: (requirement_id, knowledge_id)
 
@@ -194,6 +237,7 @@ PK: (requirement_id, knowledge_id)
 |------|------|------|
 | `capability_id` | VARCHAR | → capability.id |
 | `knowledge_id` | VARCHAR | → knowledge.id |
+| `relation_type` | VARCHAR(32) | source / case / related(默认) |
 
 PK: (capability_id, knowledge_id)
 
@@ -203,10 +247,33 @@ PK: (capability_id, knowledge_id)
 |------|------|------|
 | `tool_id` | VARCHAR | → tool.id |
 | `knowledge_id` | VARCHAR | → knowledge.id |
+| `relation_type` | VARCHAR(32) | source / case / related(默认) |
 
 PK: (tool_id, knowledge_id)
 
-### 来源链(1)
+**strategy_knowledge** — strategy 的知识来源
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `strategy_id` | VARCHAR | → strategy.id |
+| `knowledge_id` | VARCHAR | → knowledge.id |
+| `relation_type` | VARCHAR(32) | 默认 `source`;可为 `case` 等 |
+
+PK: (strategy_id, knowledge_id)
+
+### 组合关系(1)
+
+**strategy_capability** — strategy 组合哪些原子能力
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `strategy_id` | VARCHAR | → strategy.id |
+| `capability_id` | VARCHAR | → capability.id |
+| `relation_type` | VARCHAR(32) | 默认 `compose` |
+
+PK: (strategy_id, capability_id)
+
+### 来源链(5)
 
 **knowledge_resource** — 知识的原始来源
 
@@ -217,6 +284,20 @@ PK: (tool_id, knowledge_id)
 
 PK: (knowledge_id, resource_id)
 
+**requirement_resource** — 需求直接来自的原始素材(绕过 knowledge 整理层)
+
+PK: (requirement_id, resource_id)
+
+**capability_resource** — capability 直接来自的原始素材
+
+PK: (capability_id, resource_id)
+
+**strategy_resource** — strategy 直接来自的原始素材
+
+PK: (strategy_id, resource_id)
+
+`*_resource` 边**无 relation_type**——存在本身即"source"语义,不需额外标签。
+
 ### 知识间关系(1)
 
 **knowledge_relation** — 知识之间的关系(替代、扩展、矛盾等)
@@ -251,4 +332,5 @@ PK: (tool_id, provider_id)
 | tool | `embedding` | name + introduction |
 | capability | `embedding` | name + description |
 | requirement | `embedding` | description |
+| strategy | `embedding` | name + description |
 

+ 55 - 2
knowhub/knowhub_db/README.md

@@ -2,6 +2,18 @@
 
 PostgreSQL 数据库的封装层。表结构和数据模型详见 [docs/schema.md](../docs/schema.md)。
 
+> ⚠️ **动手前先读 [docs/db-operations.md](../docs/db-operations.md)**——DDL / migration / 排查卡死的操作规范,全是踩过的坑。
+
+---
+
+## 连接约定
+
+所有 Store 使用 **`autocommit = True`**(每条语句独立事务)。原因:
+- `autocommit = False` 时 SELECT 后连接停在 `idle in transaction`,永久持有 `AccessShareLock`,阻塞未来的 DDL(ALTER / CREATE INDEX 等)
+- 我们的多语句写路径(`DELETE + INSERT ON CONFLICT DO NOTHING`)**幂等**,失去事务原子性影响可控——重试即可恢复
+
+各 Store 内仍有 `self.conn.commit()` 调用,autocommit 模式下为 no-op,保留不删是为了最小侵入。
+
 ---
 
 ## 封装类
@@ -39,7 +51,7 @@ tool 表的 CRUD + 向量检索。关联表:capability_tool, tool_knowledge。
 capability 表的 CRUD + 向量检索。关联表:requirement_capability, capability_tool, capability_knowledge。
 
 ### `PostgreSQLRequirementStore` (`pg_requirement_store.py`)
-requirement 表的 CRUD + 向量检索。关联表:requirement_capability, requirement_knowledge。
+requirement 表的 CRUD + 向量检索。关联表:requirement_capability, requirement_knowledge, requirement_resource, requirement_strategy
 
 | 方法 | 功能 |
 |------|------|
@@ -48,6 +60,45 @@ requirement 表的 CRUD + 向量检索。关联表:requirement_capability, req
 | `search(embedding, limit)` | 向量检索 |
 | `list_all(limit)` | 列出所有需求 |
 | `count()` | 统计总数 |
+| `add_knowledge(req_id, kid, relation_type='related')` | 增量挂接知识 |
+| `add_resource(req_id, resource_id)` | 增量挂接原始素材 |
+| `add_strategy(req_id, strategy_id)` | 增量挂接 strategy |
+
+### `PostgreSQLStrategyStore` (`pg_strategy_store.py`)
+strategy 表(制作策略 = 原子能力的组合 + 可执行 body)。关联表:strategy_capability, strategy_knowledge, strategy_resource, requirement_strategy。
+
+| 方法 | 功能 |
+|------|------|
+| `insert_or_update(strategy)` | 插入或更新 |
+| `get_by_id(id)` | 按 ID 查询 |
+| `search(embedding, limit)` | 向量检索 |
+| `list_all(limit, status)` | 列表查询 |
+| `update(id, updates)` | 更新 |
+| `delete(id)` | 删除(含级联) |
+| `count(status)` | 统计总数 |
+| `add_capability(sid, cap_id, relation_type='compose')` | 增量组合能力 |
+| `add_knowledge(sid, kid, relation_type='source')` | 增量挂接来源知识 |
+| `add_resource(sid, resource_id)` | 增量挂接原始素材 |
+| `add_requirement(sid, req_id)` | 增量挂接所满足的 requirement |
+
+---
+
+## 关联关系类型(relation_type)
+
+`*_knowledge` 和 `strategy_*` 边携带 `relation_type VARCHAR(32)` 语义标签:
+
+| 值 | 含义 | 使用场景 |
+|----|------|---------|
+| `source` | 构建该实体的知识/资料来源 | 研究产出的能力、策略的理论依据 |
+| `case` | 该实体的应用实例 | 工具/能力/策略的使用案例 |
+| `compose` | 组合关系 | 仅 strategy → capability |
+| `related` | 默认/未分类 | 历史数据、弱关联 |
+
+Store 读取时同时返回两种视图:
+- `{entity}_ids: [id1, id2, ...]` —— 扁平 ID 列表(向后兼容)
+- `{entity}_links: [{id, relation_type}, ...]` —— 含类型
+
+写入时两种格式都接受:传扁平 ids 则默认 `'related'`,传 links 则使用指定 type。
 
 ---
 
@@ -57,16 +108,18 @@ requirement 表的 CRUD + 向量检索。关联表:requirement_capability, req
 knowhub_db/
 ├── pg_store.py                # knowledge 表
 ├── pg_resource_store.py       # resource 表
-├── cascade.py                 # 级联删除(应用层,替代 FK ON DELETE CASCADE)
 ├── pg_tool_store.py           # tool 表
 ├── pg_capability_store.py     # capability 表
 ├── pg_requirement_store.py    # requirement 表
+├── pg_strategy_store.py       # strategy 表
+├── cascade.py                 # 级联删除(应用层,替代 FK ON DELETE CASCADE)
 ├── README.md
 ├── migrations/                # 一次性迁移脚本(已执行,保留备查)
 └── scripts/                   # 诊断和运维脚本
     ├── check_table_structure.py   # 查看表结构和行数
     ├── check_extensions.py        # 查看 PostgreSQL 扩展
     ├── clear_locks.py             # 清理数据库锁
+    ├── kill_db_locks.py           # 清理数据库锁
     ├── clean_invalid_knowledge_refs.py  # 清理失效引用
     └── ...
 ```

+ 14 - 0
knowhub/knowhub_db/cascade.py

@@ -15,6 +15,7 @@ _JUNCTIONS = {
         ('knowledge_resource', 'knowledge_id'),
         ('knowledge_relation', 'source_id'),
         ('knowledge_relation', 'target_id'),
+        ('strategy_knowledge', 'knowledge_id'),
     ],
     'tool': [
         ('capability_tool', 'tool_id'),
@@ -25,13 +26,26 @@ _JUNCTIONS = {
         ('requirement_capability', 'capability_id'),
         ('capability_tool', 'capability_id'),
         ('capability_knowledge', 'capability_id'),
+        ('capability_resource', 'capability_id'),
+        ('strategy_capability', 'capability_id'),
     ],
     'requirement': [
         ('requirement_capability', 'requirement_id'),
         ('requirement_knowledge', 'requirement_id'),
+        ('requirement_resource', 'requirement_id'),
+        ('requirement_strategy', 'requirement_id'),
     ],
     'resource': [
         ('knowledge_resource', 'resource_id'),
+        ('capability_resource', 'resource_id'),
+        ('requirement_resource', 'resource_id'),
+        ('strategy_resource', 'resource_id'),
+    ],
+    'strategy': [
+        ('strategy_capability', 'strategy_id'),
+        ('strategy_knowledge', 'strategy_id'),
+        ('strategy_resource', 'strategy_id'),
+        ('requirement_strategy', 'strategy_id'),
     ],
 }
 

+ 216 - 0
knowhub/knowhub_db/migrations/migrate_v4_strategy_and_relation_types.py

@@ -0,0 +1,216 @@
+#!/usr/bin/env python3
+"""
+数据库迁移 v4:新增 strategy 实体 + 扩展 relation_type
+
+本次变更:
+1. 新增实体表 strategy
+2. 新增 junction 表:
+   - strategy_capability(默认 relation_type='compose')
+   - strategy_knowledge(默认 relation_type='source')
+   - strategy_resource
+   - capability_resource
+   - requirement_resource
+   - requirement_strategy(strategy 满足哪些 requirement)
+3. 为现有 junction 表加 relation_type 列(DEFAULT 'related'):
+   - requirement_knowledge
+   - capability_knowledge
+   - tool_knowledge
+
+关于 PK:不修改现有 *_knowledge 的 PK——保持 (entity_id, knowledge_id) 唯一。
+relation_type 作为分类属性(一对 entity-knowledge 只有一个语义)。
+
+关于枚举:应用层保证写入值属于 {source, case, compose, related}。
+不加 DB 侧 CHECK 约束,为将来扩展保留空间。
+
+幂等:反复执行不破坏已有数据。
+"""
+
+import os
+import psycopg2
+from psycopg2.extras import RealDictCursor
+from dotenv import load_dotenv
+
+_script_dir = os.path.dirname(os.path.abspath(__file__))
+_project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..'))
+load_dotenv(os.path.join(_project_root, '.env'))
+
+
+def get_connection():
+    conn = psycopg2.connect(
+        host=os.getenv('KNOWHUB_DB'),
+        port=int(os.getenv('KNOWHUB_PORT', 5432)),
+        user=os.getenv('KNOWHUB_USER'),
+        password=os.getenv('KNOWHUB_PASSWORD'),
+        database=os.getenv('KNOWHUB_DB_NAME'),
+        connect_timeout=10
+    )
+    conn.autocommit = True
+    return conn
+
+
+def table_exists(cursor, name):
+    cursor.execute("SELECT 1 FROM information_schema.tables WHERE table_name = %s", (name,))
+    return cursor.fetchone() is not None
+
+
+def column_exists(cursor, table, column):
+    cursor.execute(
+        "SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s",
+        (table, column))
+    return cursor.fetchone() is not None
+
+
+# ─── Step 1: 新增实体表 strategy ─────────────────────────────────────────────
+
+CREATE_STRATEGY = """
+CREATE TABLE IF NOT EXISTS strategy (
+    id          VARCHAR PRIMARY KEY,
+    name        VARCHAR,
+    description TEXT,
+    body        TEXT,
+    status      VARCHAR DEFAULT 'draft',
+    created_at  BIGINT,
+    updated_at  BIGINT,
+    embedding   float4[]
+)
+"""
+
+# ─── Step 2: 新增 junction 表 ─────────────────────────────────────────────────
+
+CREATE_NEW_JUNCTIONS = [
+    # strategy 的组合关系:strategy → capability(compose)
+    """
+    CREATE TABLE IF NOT EXISTS strategy_capability (
+        strategy_id   VARCHAR NOT NULL,
+        capability_id VARCHAR NOT NULL,
+        relation_type VARCHAR(32) NOT NULL DEFAULT 'compose',
+        PRIMARY KEY (strategy_id, capability_id)
+    )
+    """,
+    # strategy 的知识来源:strategy → knowledge(默认 source,也可 case 等)
+    """
+    CREATE TABLE IF NOT EXISTS strategy_knowledge (
+        strategy_id   VARCHAR NOT NULL,
+        knowledge_id  VARCHAR NOT NULL,
+        relation_type VARCHAR(32) NOT NULL DEFAULT 'source',
+        PRIMARY KEY (strategy_id, knowledge_id)
+    )
+    """,
+    # strategy 的原始素材(直接来源,无 type)
+    """
+    CREATE TABLE IF NOT EXISTS strategy_resource (
+        strategy_id VARCHAR NOT NULL,
+        resource_id VARCHAR NOT NULL,
+        PRIMARY KEY (strategy_id, resource_id)
+    )
+    """,
+    # capability 的原始素材(直接来源,无 type)
+    """
+    CREATE TABLE IF NOT EXISTS capability_resource (
+        capability_id VARCHAR NOT NULL,
+        resource_id   VARCHAR NOT NULL,
+        PRIMARY KEY (capability_id, resource_id)
+    )
+    """,
+    # requirement 的原始素材(直接来源,无 type)
+    """
+    CREATE TABLE IF NOT EXISTS requirement_resource (
+        requirement_id VARCHAR NOT NULL,
+        resource_id    VARCHAR NOT NULL,
+        PRIMARY KEY (requirement_id, resource_id)
+    )
+    """,
+    # strategy 被设计用来满足哪些 requirement
+    """
+    CREATE TABLE IF NOT EXISTS requirement_strategy (
+        requirement_id VARCHAR NOT NULL,
+        strategy_id    VARCHAR NOT NULL,
+        PRIMARY KEY (requirement_id, strategy_id)
+    )
+    """,
+]
+
+# ─── Step 3: 为现有 *_knowledge 加 relation_type 列 ───────────────────────────
+
+TABLES_NEEDING_RELATION_TYPE = [
+    'requirement_knowledge',
+    'capability_knowledge',
+    'tool_knowledge',
+]
+
+
+def add_relation_type_column(cursor, table: str):
+    """幂等:若列已存在则跳过"""
+    if column_exists(cursor, table, 'relation_type'):
+        print(f"  {table}.relation_type 已存在,跳过")
+        return
+    cursor.execute(f"""
+        ALTER TABLE {table}
+        ADD COLUMN relation_type VARCHAR(32) NOT NULL DEFAULT 'related'
+    """)
+    print(f"  ✓ {table}.relation_type 已添加(DEFAULT 'related')")
+
+
+# ─── 主流程 ───────────────────────────────────────────────────────────────────
+
+def main():
+    print("=" * 60)
+    print("KnowHub 迁移 v4: strategy + relation_type")
+    print("=" * 60)
+
+    conn = get_connection()
+    cursor = conn.cursor(cursor_factory=RealDictCursor)
+
+    # Step 1: strategy 实体表
+    print("\n[1/3] 创建 strategy 实体表...")
+    cursor.execute(CREATE_STRATEGY)
+    print("  ✓ strategy")
+
+    # Step 2: 5 张新 junction 表
+    print("\n[2/3] 创建新 junction 表...")
+    for sql in CREATE_NEW_JUNCTIONS:
+        cursor.execute(sql)
+    for t in ('strategy_capability', 'strategy_knowledge', 'strategy_resource',
+              'capability_resource', 'requirement_resource', 'requirement_strategy'):
+        print(f"  ✓ {t}")
+
+    # Step 3: 为现有 *_knowledge 加 relation_type 列
+    print("\n[3/3] 为现有 *_knowledge 添加 relation_type 列...")
+    for t in TABLES_NEEDING_RELATION_TYPE:
+        add_relation_type_column(cursor, t)
+
+    # 验证
+    print("\n" + "=" * 60)
+    print("最终表结构验证:")
+    print("=" * 60)
+    check_tables = [
+        'strategy',
+        'strategy_capability', 'strategy_knowledge', 'strategy_resource',
+        'capability_resource', 'requirement_resource', 'requirement_strategy',
+        'requirement_knowledge', 'capability_knowledge', 'tool_knowledge',
+        'knowledge_relation',
+    ]
+    for t in check_tables:
+        try:
+            cursor.execute(f"""
+                SELECT column_name FROM information_schema.columns
+                WHERE table_name = %s ORDER BY ordinal_position
+            """, (t,))
+            cols = [r['column_name'] for r in cursor.fetchall()]
+            cursor.execute(f"SELECT COUNT(*) as count FROM {t}")
+            count = cursor.fetchone()['count']
+            print(f"\n  {t} ({count} rows)")
+            print(f"    {', '.join(cols)}")
+        except Exception as e:
+            print(f"\n  {t}: ERROR - {e}")
+
+    print("\n" + "=" * 60)
+    print("迁移成功!")
+    print("=" * 60)
+
+    cursor.close()
+    conn.close()
+
+
+if __name__ == '__main__':
+    main()

+ 64 - 9
knowhub/knowhub_db/pg_capability_store.py

@@ -25,7 +25,13 @@ _REL_SUBQUERIES = """
         json_object_agg(ct2.tool_id, ct2.description), '{}'::json)
      FROM capability_tool ct2 WHERE ct2.capability_id = capability.id AND ct2.description != '') AS implements,
     (SELECT COALESCE(json_agg(ck.knowledge_id), '[]'::json)
-     FROM capability_knowledge ck WHERE ck.capability_id = capability.id) AS knowledge_ids
+     FROM capability_knowledge ck WHERE ck.capability_id = capability.id) AS knowledge_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', ck2.knowledge_id, 'relation_type', ck2.relation_type
+     )), '[]'::json)
+     FROM capability_knowledge ck2 WHERE ck2.capability_id = capability.id) AS knowledge_links,
+    (SELECT COALESCE(json_agg(cr.resource_id), '[]'::json)
+     FROM capability_resource cr WHERE cr.capability_id = capability.id) AS resource_ids
 """
 
 _BASE_FIELDS = "id, name, criterion, description"
@@ -33,6 +39,21 @@ _BASE_FIELDS = "id, name, criterion, description"
 _SELECT_FIELDS = f"{_BASE_FIELDS}, {_REL_SUBQUERIES}"
 
 
+def _normalize_links(data: Dict, links_key: str, ids_key: str, default_type: str):
+    """两种输入格式统一:{links_key: [{id, relation_type}]} 或 {ids_key: [id]}"""
+    if links_key in data and data[links_key] is not None:
+        out = []
+        for item in data[links_key]:
+            if isinstance(item, dict):
+                out.append((item['id'], item.get('relation_type', default_type)))
+            else:
+                out.append((item, default_type))
+        return out
+    if ids_key in data and data[ids_key] is not None:
+        return [(i, default_type) for i in data[ids_key]]
+    return None
+
+
 class PostgreSQLCapabilityStore:
     def __init__(self):
         """初始化 PostgreSQL 连接"""
@@ -43,7 +64,7 @@ class PostgreSQLCapabilityStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
         print(f"[PostgreSQL Capability] 已连接到远程数据库: {os.getenv('KNOWHUB_DB')}")
 
     def _reconnect(self):
@@ -54,7 +75,7 @@ class PostgreSQLCapabilityStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
 
     def _ensure_connection(self):
         if self.conn.closed != 0:
@@ -98,12 +119,21 @@ class PostgreSQLCapabilityStore:
                         "INSERT INTO capability_tool (capability_id, tool_id, description) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
                         (cap_id, tool_id, desc))
 
-        if 'knowledge_ids' in data:
+        k_links = _normalize_links(data, 'knowledge_links', 'knowledge_ids', 'related')
+        if k_links is not None:
             cursor.execute("DELETE FROM capability_knowledge WHERE capability_id = %s", (cap_id,))
-            for kid in data['knowledge_ids']:
+            for kid, rtype in k_links:
+                cursor.execute(
+                    "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
+                    "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                    (cap_id, kid, rtype))
+
+        if 'resource_ids' in data and data['resource_ids'] is not None:
+            cursor.execute("DELETE FROM capability_resource WHERE capability_id = %s", (cap_id,))
+            for rid in data['resource_ids']:
                 cursor.execute(
-                    "INSERT INTO capability_knowledge (capability_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                    (cap_id, kid))
+                    "INSERT INTO capability_resource (capability_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                    (cap_id, rid))
 
     def insert_or_update(self, cap: Dict):
         """插入或更新原子能力"""
@@ -181,7 +211,8 @@ class PostgreSQLCapabilityStore:
         try:
             # 分离关联字段
             rel_fields = {}
-            for key in ('requirement_ids', 'implements', 'tool_ids', 'knowledge_ids'):
+            for key in ('requirement_ids', 'implements', 'tool_ids',
+                        'knowledge_ids', 'knowledge_links', 'resource_ids'):
                 if key in updates:
                     rel_fields[key] = updates.pop(key)
 
@@ -227,7 +258,8 @@ class PostgreSQLCapabilityStore:
         if not row:
             return None
         result = dict(row)
-        for field in ('requirement_ids', 'tool_ids', 'knowledge_ids'):
+        for field in ('requirement_ids', 'tool_ids', 'knowledge_ids',
+                      'resource_ids', 'knowledge_links'):
             if field in result and isinstance(result[field], str):
                 result[field] = json.loads(result[field])
             elif field in result and result[field] is None:
@@ -239,6 +271,29 @@ class PostgreSQLCapabilityStore:
                 result['implements'] = {}
         return result
 
+    def add_knowledge(self, cap_id: str, knowledge_id: str, relation_type: str = 'related'):
+        """增量挂接 capability-knowledge 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (cap_id, knowledge_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_resource(self, cap_id: str, resource_id: str):
+        """增量挂接 capability-resource 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO capability_resource (capability_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                (cap_id, resource_id))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
     def close(self):
         if self.conn:
             self.conn.close()

+ 105 - 14
knowhub/knowhub_db/pg_requirement_store.py

@@ -15,12 +15,20 @@ from knowhub.knowhub_db.cascade import cascade_delete
 
 load_dotenv()
 
-# 关联字段子查询
+# 关联字段子查询。knowledge 边暴露两种视图:knowledge_ids(扁平)+ knowledge_links(含 type)
 _REL_SUBQUERY = """
     (SELECT COALESCE(json_agg(rc.capability_id), '[]'::json)
      FROM requirement_capability rc WHERE rc.requirement_id = requirement.id) AS capability_ids,
     (SELECT COALESCE(json_agg(rk.knowledge_id), '[]'::json)
-     FROM requirement_knowledge rk WHERE rk.requirement_id = requirement.id) AS knowledge_ids
+     FROM requirement_knowledge rk WHERE rk.requirement_id = requirement.id) AS knowledge_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', rk2.knowledge_id, 'relation_type', rk2.relation_type
+     )), '[]'::json)
+     FROM requirement_knowledge rk2 WHERE rk2.requirement_id = requirement.id) AS knowledge_links,
+    (SELECT COALESCE(json_agg(rr.resource_id), '[]'::json)
+     FROM requirement_resource rr WHERE rr.requirement_id = requirement.id) AS resource_ids,
+    (SELECT COALESCE(json_agg(rs.strategy_id), '[]'::json)
+     FROM requirement_strategy rs WHERE rs.requirement_id = requirement.id) AS strategy_ids
 """
 
 _BASE_FIELDS = "id, description, source_nodes, status, match_result"
@@ -28,6 +36,21 @@ _BASE_FIELDS = "id, description, source_nodes, status, match_result"
 _SELECT_FIELDS = f"{_BASE_FIELDS}, {_REL_SUBQUERY}"
 
 
+def _normalize_links(data: Dict, links_key: str, ids_key: str, default_type: str):
+    """两种输入格式统一:{links_key: [{id, relation_type}]} 或 {ids_key: [id]}"""
+    if links_key in data and data[links_key] is not None:
+        out = []
+        for item in data[links_key]:
+            if isinstance(item, dict):
+                out.append((item['id'], item.get('relation_type', default_type)))
+            else:
+                out.append((item, default_type))
+        return out
+    if ids_key in data and data[ids_key] is not None:
+        return [(i, default_type) for i in data[ids_key]]
+    return None
+
+
 class PostgreSQLRequirementStore:
     def __init__(self):
         """初始化 PostgreSQL 连接"""
@@ -38,7 +61,7 @@ class PostgreSQLRequirementStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
         print(f"[PostgreSQL Requirement] 已连接到远程数据库: {os.getenv('KNOWHUB_DB')}")
 
     def _reconnect(self):
@@ -49,7 +72,7 @@ class PostgreSQLRequirementStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
 
     def _ensure_connection(self):
         if self.conn.closed != 0:
@@ -96,12 +119,26 @@ class PostgreSQLRequirementStore:
                     cursor.execute(
                         "INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
                         (req_id, cap_id))
-            if 'knowledge_ids' in requirement:
+            k_links = _normalize_links(requirement, 'knowledge_links', 'knowledge_ids', 'related')
+            if k_links is not None:
                 cursor.execute("DELETE FROM requirement_knowledge WHERE requirement_id = %s", (req_id,))
-                for kid in requirement['knowledge_ids']:
+                for kid, rtype in k_links:
+                    cursor.execute(
+                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, kid, rtype))
+            if 'resource_ids' in requirement and requirement['resource_ids'] is not None:
+                cursor.execute("DELETE FROM requirement_resource WHERE requirement_id = %s", (req_id,))
+                for rid in requirement['resource_ids']:
                     cursor.execute(
-                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (req_id, kid))
+                        "INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, rid))
+            if 'strategy_ids' in requirement and requirement['strategy_ids'] is not None:
+                cursor.execute("DELETE FROM requirement_strategy WHERE requirement_id = %s", (req_id,))
+                for sid in requirement['strategy_ids']:
+                    cursor.execute(
+                        "INSERT INTO requirement_strategy (requirement_id, strategy_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, sid))
             self.conn.commit()
         finally:
             cursor.close()
@@ -166,7 +203,11 @@ class PostgreSQLRequirementStore:
         try:
             # 分离关联字段
             cap_ids = updates.pop('capability_ids', None)
-            knowledge_ids = updates.pop('knowledge_ids', None)
+            strategy_ids = updates.pop('strategy_ids', None)
+            rel_data = {}
+            for k in ('knowledge_ids', 'knowledge_links', 'resource_ids'):
+                if k in updates:
+                    rel_data[k] = updates.pop(k)
 
             if updates:
                 set_parts = []
@@ -191,13 +232,63 @@ class PostgreSQLRequirementStore:
                         "INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
                         (req_id, cap_id))
 
-            if knowledge_ids is not None:
+            k_links = _normalize_links(rel_data, 'knowledge_links', 'knowledge_ids', 'related')
+            if k_links is not None:
                 cursor.execute("DELETE FROM requirement_knowledge WHERE requirement_id = %s", (req_id,))
-                for kid in knowledge_ids:
+                for kid, rtype in k_links:
+                    cursor.execute(
+                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, kid, rtype))
+
+            if 'resource_ids' in rel_data and rel_data['resource_ids'] is not None:
+                cursor.execute("DELETE FROM requirement_resource WHERE requirement_id = %s", (req_id,))
+                for rid in rel_data['resource_ids']:
                     cursor.execute(
-                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (req_id, kid))
+                        "INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, rid))
 
+            if strategy_ids is not None:
+                cursor.execute("DELETE FROM requirement_strategy WHERE requirement_id = %s", (req_id,))
+                for sid in strategy_ids:
+                    cursor.execute(
+                        "INSERT INTO requirement_strategy (requirement_id, strategy_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, sid))
+
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_knowledge(self, req_id: str, knowledge_id: str, relation_type: str = 'related'):
+        """增量挂接 requirement-knowledge 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (req_id, knowledge_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_resource(self, req_id: str, resource_id: str):
+        """增量挂接 requirement-resource 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                (req_id, resource_id))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_strategy(self, req_id: str, strategy_id: str):
+        """增量挂接 requirement-strategy 边(该 strategy 满足此 requirement)"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO requirement_strategy (requirement_id, strategy_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                (req_id, strategy_id))
             self.conn.commit()
         finally:
             cursor.close()
@@ -231,7 +322,7 @@ class PostgreSQLRequirementStore:
         if 'source_nodes' in result and isinstance(result['source_nodes'], str):
             result['source_nodes'] = json.loads(result['source_nodes'])
         # 关联字段(来自 junction table 子查询)
-        for field in ('capability_ids', 'knowledge_ids'):
+        for field in ('capability_ids', 'knowledge_ids', 'resource_ids', 'strategy_ids', 'knowledge_links'):
             if field in result and isinstance(result[field], str):
                 result[field] = json.loads(result[field])
             elif field in result and result[field] is None:

+ 2 - 2
knowhub/knowhub_db/pg_resource_store.py

@@ -25,7 +25,7 @@ class PostgreSQLResourceStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
 
     def _reconnect(self):
         self.conn = psycopg2.connect(
@@ -35,7 +35,7 @@ class PostgreSQLResourceStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
 
     def _ensure_connection(self):
         if self.conn.closed != 0:

+ 133 - 40
knowhub/knowhub_db/pg_store.py

@@ -14,14 +14,29 @@ from knowhub.knowhub_db.cascade import cascade_delete
 
 load_dotenv()
 
-# 关联字段的子查询(从 junction table 读取,返回 JSON 数组)
+# 关联字段的子查询(从 junction table 读取)
+# 对于带 relation_type 的 *_knowledge 边,同时暴露两种视图:
+#   - *_ids       : 扁平 ID 列表(向后兼容,不含 type)
+#   - *_links     : [{id, relation_type}](含 type)
 _REL_SUBQUERIES = """
     (SELECT COALESCE(json_agg(rk.requirement_id), '[]'::json)
      FROM requirement_knowledge rk WHERE rk.knowledge_id = knowledge.id) AS requirement_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', rk2.requirement_id, 'relation_type', rk2.relation_type
+     )), '[]'::json)
+     FROM requirement_knowledge rk2 WHERE rk2.knowledge_id = knowledge.id) AS requirement_links,
     (SELECT COALESCE(json_agg(ck.capability_id), '[]'::json)
      FROM capability_knowledge ck WHERE ck.knowledge_id = knowledge.id) AS capability_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', ck2.capability_id, 'relation_type', ck2.relation_type
+     )), '[]'::json)
+     FROM capability_knowledge ck2 WHERE ck2.knowledge_id = knowledge.id) AS capability_links,
     (SELECT COALESCE(json_agg(tk.tool_id), '[]'::json)
      FROM tool_knowledge tk WHERE tk.knowledge_id = knowledge.id) AS tool_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', tk2.tool_id, 'relation_type', tk2.relation_type
+     )), '[]'::json)
+     FROM tool_knowledge tk2 WHERE tk2.knowledge_id = knowledge.id) AS tool_links,
     (SELECT COALESCE(json_agg(kr.resource_id), '[]'::json)
      FROM knowledge_resource kr WHERE kr.knowledge_id = knowledge.id) AS resource_ids,
     (SELECT COALESCE(json_agg(json_build_object(
@@ -44,6 +59,26 @@ _SELECT_FIELDS = f"{_BASE_FIELDS}, {_REL_SUBQUERIES}"
 _SELECT_FIELDS_WITH_EMB = f"task_embedding, content_embedding, {_SELECT_FIELDS}"
 
 
+def _normalize_links(data: Dict, links_key: str, ids_key: str, default_type: str):
+    """
+    统一两种输入格式:
+    - {links_key: [{id, relation_type}, ...]}  → 使用指定 type
+    - {ids_key: [id1, id2, ...]}               → 使用 default_type
+    两个 key 都没有返回 None(不更新)
+    """
+    if links_key in data and data[links_key] is not None:
+        out = []
+        for item in data[links_key]:
+            if isinstance(item, dict):
+                out.append((item['id'], item.get('relation_type', default_type)))
+            else:
+                out.append((item, default_type))
+        return out
+    if ids_key in data and data[ids_key] is not None:
+        return [(i, default_type) for i in data[ids_key]]
+    return None
+
+
 class PostgreSQLStore:
     def __init__(self):
         """初始化 PostgreSQL 连接"""
@@ -54,7 +89,7 @@ class PostgreSQLStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
         print(f"[PostgreSQL] 已连接到远程数据库: {os.getenv('KNOWHUB_DB')}")
 
     def _reconnect(self):
@@ -65,7 +100,7 @@ class PostgreSQLStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
 
     def _ensure_connection(self):
         if self.conn.closed != 0:
@@ -113,18 +148,24 @@ class PostgreSQLStore:
             ))
             # 写入关联表
             kid = knowledge['id']
-            for req_id in knowledge.get('requirement_ids', []):
+            req_links = _normalize_links(knowledge, 'requirement_links', 'requirement_ids', 'related') or []
+            for req_id, rtype in req_links:
                 cursor.execute(
-                    "INSERT INTO requirement_knowledge (requirement_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                    (req_id, kid))
-            for cap_id in knowledge.get('capability_ids', []):
+                    "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                    "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                    (req_id, kid, rtype))
+            cap_links = _normalize_links(knowledge, 'capability_links', 'capability_ids', 'related') or []
+            for cap_id, rtype in cap_links:
                 cursor.execute(
-                    "INSERT INTO capability_knowledge (capability_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                    (cap_id, kid))
-            for tool_id in knowledge.get('tool_ids', []):
+                    "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
+                    "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                    (cap_id, kid, rtype))
+            tool_links = _normalize_links(knowledge, 'tool_links', 'tool_ids', 'related') or []
+            for tool_id, rtype in tool_links:
                 cursor.execute(
-                    "INSERT INTO tool_knowledge (tool_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                    (tool_id, kid))
+                    "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                    "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                    (tool_id, kid, rtype))
             for res_id in knowledge.get('resource_ids', []):
                 cursor.execute(
                     "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
@@ -220,10 +261,10 @@ class PostgreSQLStore:
         cursor = self._get_cursor()
         try:
             # 分离关联字段和实体字段
-            req_ids = updates.pop('requirement_ids', None)
-            cap_ids = updates.pop('capability_ids', None)
-            tool_ids = updates.pop('tool_ids', None)
-            resource_ids = updates.pop('resource_ids', None)
+            rel_keys = ('requirement_ids', 'requirement_links',
+                        'capability_ids', 'capability_links',
+                        'tool_ids', 'tool_links', 'resource_ids')
+            rel_data = {k: updates.pop(k) for k in rel_keys if k in updates}
 
             if updates:
                 set_parts = []
@@ -240,30 +281,36 @@ class PostgreSQLStore:
                 cursor.execute(sql, params)
 
             # 更新关联表(全量替换)
-            if req_ids is not None:
+            req_links = _normalize_links(rel_data, 'requirement_links', 'requirement_ids', 'related')
+            if req_links is not None:
                 cursor.execute("DELETE FROM requirement_knowledge WHERE knowledge_id = %s", (knowledge_id,))
-                for req_id in req_ids:
+                for req_id, rtype in req_links:
                     cursor.execute(
-                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (req_id, knowledge_id))
+                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, knowledge_id, rtype))
 
-            if cap_ids is not None:
+            cap_links = _normalize_links(rel_data, 'capability_links', 'capability_ids', 'related')
+            if cap_links is not None:
                 cursor.execute("DELETE FROM capability_knowledge WHERE knowledge_id = %s", (knowledge_id,))
-                for cap_id in cap_ids:
+                for cap_id, rtype in cap_links:
                     cursor.execute(
-                        "INSERT INTO capability_knowledge (capability_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (cap_id, knowledge_id))
+                        "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (cap_id, knowledge_id, rtype))
 
-            if tool_ids is not None:
+            tool_links = _normalize_links(rel_data, 'tool_links', 'tool_ids', 'related')
+            if tool_links is not None:
                 cursor.execute("DELETE FROM tool_knowledge WHERE knowledge_id = %s", (knowledge_id,))
-                for tool_id in tool_ids:
+                for tool_id, rtype in tool_links:
                     cursor.execute(
-                        "INSERT INTO tool_knowledge (tool_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (tool_id, knowledge_id))
+                        "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (tool_id, knowledge_id, rtype))
 
-            if resource_ids is not None:
+            if 'resource_ids' in rel_data and rel_data['resource_ids'] is not None:
                 cursor.execute("DELETE FROM knowledge_resource WHERE knowledge_id = %s", (knowledge_id,))
-                for res_id in resource_ids:
+                for res_id in rel_data['resource_ids']:
                     cursor.execute(
                         "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
                         (knowledge_id, res_id))
@@ -303,6 +350,45 @@ class PostgreSQLStore:
         finally:
             cursor.close()
 
+    def add_requirement(self, knowledge_id: str, requirement_id: str,
+                        relation_type: str = 'related'):
+        """增量挂接 requirement-knowledge 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (requirement_id, knowledge_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_capability(self, knowledge_id: str, capability_id: str,
+                       relation_type: str = 'related'):
+        """增量挂接 capability-knowledge 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (capability_id, knowledge_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_tool(self, knowledge_id: str, tool_id: str,
+                 relation_type: str = 'related'):
+        """增量挂接 tool-knowledge 边"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (tool_id, knowledge_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
     def count(self) -> int:
         """返回知识总数"""
         cursor = self._get_cursor()
@@ -349,7 +435,8 @@ class PostgreSQLStore:
         if 'eval' in result and isinstance(result['eval'], str):
             result['eval'] = json.loads(result['eval'])
         # 关联字段(来自 junction table 子查询,可能是 JSON 字符串或已解析的列表)
-        for field in ('requirement_ids', 'capability_ids', 'tool_ids', 'resource_ids'):
+        for field in ('requirement_ids', 'capability_ids', 'tool_ids', 'resource_ids',
+                      'requirement_links', 'capability_links', 'tool_links'):
             if field in result and isinstance(result[field], str):
                 result[field] = json.loads(result[field])
             elif field in result and result[field] is None:
@@ -400,18 +487,24 @@ class PostgreSQLStore:
             # 批量写入关联表
             for k in knowledge_list:
                 kid = k['id']
-                for req_id in k.get('requirement_ids', []):
+                req_links = _normalize_links(k, 'requirement_links', 'requirement_ids', 'related') or []
+                for req_id, rtype in req_links:
                     cursor.execute(
-                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (req_id, kid))
-                for cap_id in k.get('capability_ids', []):
+                        "INSERT INTO requirement_knowledge (requirement_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (req_id, kid, rtype))
+                cap_links = _normalize_links(k, 'capability_links', 'capability_ids', 'related') or []
+                for cap_id, rtype in cap_links:
                     cursor.execute(
-                        "INSERT INTO capability_knowledge (capability_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (cap_id, kid))
-                for tool_id in k.get('tool_ids', []):
+                        "INSERT INTO capability_knowledge (capability_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (cap_id, kid, rtype))
+                tool_links = _normalize_links(k, 'tool_links', 'tool_ids', 'related') or []
+                for tool_id, rtype in tool_links:
                     cursor.execute(
-                        "INSERT INTO tool_knowledge (tool_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (tool_id, kid))
+                        "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (tool_id, kid, rtype))
                 for res_id in k.get('resource_ids', []):
                     cursor.execute(
                         "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",

+ 353 - 0
knowhub/knowhub_db/pg_strategy_store.py

@@ -0,0 +1,353 @@
+"""
+PostgreSQL strategy 存储封装
+
+用于存储和检索「制作策略」。strategy 是一组原子 capability 的组合,
+附带自身的 body(可执行描述)与 source 知识。
+
+关联:
+- strategy_capability(默认 relation_type='compose')
+- strategy_knowledge(默认 relation_type='source',也可为 'case' 等)
+- strategy_resource(直接素材,无 type)
+"""
+
+import os
+import psycopg2
+from psycopg2.extras import RealDictCursor
+from typing import List, Dict, Optional
+from dotenv import load_dotenv
+from knowhub.knowhub_db.cascade import cascade_delete
+
+load_dotenv()
+
+# 读取路径:同时暴露扁平 ids 和带 type 的 links
+_REL_SUBQUERIES = """
+    (SELECT COALESCE(json_agg(rs.requirement_id), '[]'::json)
+     FROM requirement_strategy rs WHERE rs.strategy_id = strategy.id) AS requirement_ids,
+    (SELECT COALESCE(json_agg(sc.capability_id), '[]'::json)
+     FROM strategy_capability sc WHERE sc.strategy_id = strategy.id) AS capability_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', sc2.capability_id, 'relation_type', sc2.relation_type
+     )), '[]'::json)
+     FROM strategy_capability sc2 WHERE sc2.strategy_id = strategy.id) AS capability_links,
+    (SELECT COALESCE(json_agg(sk.knowledge_id), '[]'::json)
+     FROM strategy_knowledge sk WHERE sk.strategy_id = strategy.id) AS knowledge_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', sk2.knowledge_id, 'relation_type', sk2.relation_type
+     )), '[]'::json)
+     FROM strategy_knowledge sk2 WHERE sk2.strategy_id = strategy.id) AS knowledge_links,
+    (SELECT COALESCE(json_agg(sr.resource_id), '[]'::json)
+     FROM strategy_resource sr WHERE sr.strategy_id = strategy.id) AS resource_ids
+"""
+
+_BASE_FIELDS = "id, name, description, body, status, created_at, updated_at"
+_SELECT_FIELDS = f"{_BASE_FIELDS}, {_REL_SUBQUERIES}"
+
+
+class PostgreSQLStrategyStore:
+    def __init__(self):
+        self.conn = psycopg2.connect(
+            host=os.getenv('KNOWHUB_DB'),
+            port=int(os.getenv('KNOWHUB_PORT', 5432)),
+            user=os.getenv('KNOWHUB_USER'),
+            password=os.getenv('KNOWHUB_PASSWORD'),
+            database=os.getenv('KNOWHUB_DB_NAME')
+        )
+        self.conn.autocommit = True
+        print(f"[PostgreSQL Strategy] 已连接到远程数据库: {os.getenv('KNOWHUB_DB')}")
+
+    def _reconnect(self):
+        self.conn = psycopg2.connect(
+            host=os.getenv('KNOWHUB_DB'),
+            port=int(os.getenv('KNOWHUB_PORT', 5432)),
+            user=os.getenv('KNOWHUB_USER'),
+            password=os.getenv('KNOWHUB_PASSWORD'),
+            database=os.getenv('KNOWHUB_DB_NAME')
+        )
+        self.conn.autocommit = True
+
+    def _ensure_connection(self):
+        if self.conn.closed != 0:
+            self._reconnect()
+        else:
+            try:
+                c = self.conn.cursor()
+                c.execute("SELECT 1")
+                c.close()
+            except (psycopg2.OperationalError, psycopg2.InterfaceError):
+                self._reconnect()
+
+    def _get_cursor(self):
+        self._ensure_connection()
+        return self.conn.cursor(cursor_factory=RealDictCursor)
+
+    # ─── 关联写入 ────────────────────────────────────────────────
+
+    @staticmethod
+    def _normalize_links(data: Dict, links_key: str, ids_key: str, default_type: str):
+        """
+        统一两种输入:
+        - {links_key: [{id, relation_type}, ...]}   → 使用给定 type
+        - {ids_key: [id1, id2, ...]}                → 使用 default_type
+        返回 [(id, relation_type), ...];若两个 key 都不存在返回 None(表示不更新)
+        """
+        if links_key in data and data[links_key] is not None:
+            out = []
+            for item in data[links_key]:
+                if isinstance(item, dict):
+                    out.append((item['id'], item.get('relation_type', default_type)))
+                else:  # 容错:允许混用
+                    out.append((item, default_type))
+            return out
+        if ids_key in data and data[ids_key] is not None:
+            return [(i, default_type) for i in data[ids_key]]
+        return None
+
+    def _save_relations(self, cursor, strategy_id: str, data: Dict):
+        """全量替换 strategy 的 junction"""
+        cap_links = self._normalize_links(data, 'capability_links', 'capability_ids', 'compose')
+        if cap_links is not None:
+            cursor.execute("DELETE FROM strategy_capability WHERE strategy_id = %s", (strategy_id,))
+            for cap_id, rtype in cap_links:
+                cursor.execute(
+                    "INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) "
+                    "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                    (strategy_id, cap_id, rtype))
+
+        k_links = self._normalize_links(data, 'knowledge_links', 'knowledge_ids', 'source')
+        if k_links is not None:
+            cursor.execute("DELETE FROM strategy_knowledge WHERE strategy_id = %s", (strategy_id,))
+            for kid, rtype in k_links:
+                cursor.execute(
+                    "INSERT INTO strategy_knowledge (strategy_id, knowledge_id, relation_type) "
+                    "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                    (strategy_id, kid, rtype))
+
+        if 'resource_ids' in data and data['resource_ids'] is not None:
+            cursor.execute("DELETE FROM strategy_resource WHERE strategy_id = %s", (strategy_id,))
+            for rid in data['resource_ids']:
+                cursor.execute(
+                    "INSERT INTO strategy_resource (strategy_id, resource_id) "
+                    "VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                    (strategy_id, rid))
+
+        if 'requirement_ids' in data and data['requirement_ids'] is not None:
+            cursor.execute("DELETE FROM requirement_strategy WHERE strategy_id = %s", (strategy_id,))
+            for req_id in data['requirement_ids']:
+                cursor.execute(
+                    "INSERT INTO requirement_strategy (requirement_id, strategy_id) "
+                    "VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                    (req_id, strategy_id))
+
+    # ─── 核心 CRUD ───────────────────────────────────────────────
+
+    def insert_or_update(self, strategy: Dict):
+        """插入或更新 strategy(含关联)"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute("""
+                INSERT INTO strategy (
+                    id, name, description, body, status, created_at, updated_at, embedding
+                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+                ON CONFLICT (id) DO UPDATE SET
+                    name = EXCLUDED.name,
+                    description = EXCLUDED.description,
+                    body = EXCLUDED.body,
+                    status = EXCLUDED.status,
+                    updated_at = EXCLUDED.updated_at,
+                    embedding = EXCLUDED.embedding
+            """, (
+                strategy['id'],
+                strategy.get('name', ''),
+                strategy.get('description', ''),
+                strategy.get('body', ''),
+                strategy.get('status', 'draft'),
+                strategy.get('created_at'),
+                strategy.get('updated_at'),
+                strategy.get('embedding'),
+            ))
+            self._save_relations(cursor, strategy['id'], strategy)
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def get_by_id(self, strategy_id: str) -> Optional[Dict]:
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(f"SELECT {_SELECT_FIELDS} FROM strategy WHERE id = %s", (strategy_id,))
+            result = cursor.fetchone()
+            return self._format_result(result) if result else None
+        finally:
+            cursor.close()
+
+    def search(self, query_embedding: List[float], limit: int = 10,
+               status: Optional[str] = None) -> List[Dict]:
+        """向量检索 strategy"""
+        cursor = self._get_cursor()
+        try:
+            if status:
+                sql = f"""
+                    SELECT {_SELECT_FIELDS},
+                           1 - (embedding <=> %s::real[]) as score
+                    FROM strategy
+                    WHERE embedding IS NOT NULL AND status = %s
+                    ORDER BY embedding <=> %s::real[]
+                    LIMIT %s
+                """
+                params = (query_embedding, status, query_embedding, limit)
+            else:
+                sql = f"""
+                    SELECT {_SELECT_FIELDS},
+                           1 - (embedding <=> %s::real[]) as score
+                    FROM strategy
+                    WHERE embedding IS NOT NULL
+                    ORDER BY embedding <=> %s::real[]
+                    LIMIT %s
+                """
+                params = (query_embedding, query_embedding, limit)
+            cursor.execute(sql, params)
+            results = cursor.fetchall()
+            return [self._format_result(r) for r in results]
+        finally:
+            cursor.close()
+
+    def list_all(self, limit: int = 100, offset: int = 0,
+                 status: Optional[str] = None) -> List[Dict]:
+        cursor = self._get_cursor()
+        try:
+            if status:
+                cursor.execute(f"""
+                    SELECT {_SELECT_FIELDS} FROM strategy
+                    WHERE status = %s
+                    ORDER BY id
+                    LIMIT %s OFFSET %s
+                """, (status, limit, offset))
+            else:
+                cursor.execute(f"""
+                    SELECT {_SELECT_FIELDS} FROM strategy
+                    ORDER BY id
+                    LIMIT %s OFFSET %s
+                """, (limit, offset))
+            results = cursor.fetchall()
+            return [self._format_result(r) for r in results]
+        finally:
+            cursor.close()
+
+    def update(self, strategy_id: str, updates: Dict):
+        """更新 strategy(关联字段可选)"""
+        cursor = self._get_cursor()
+        try:
+            # 分离关联字段
+            rel_keys = ('requirement_ids',
+                        'capability_ids', 'capability_links',
+                        'knowledge_ids', 'knowledge_links', 'resource_ids')
+            rel_fields = {k: updates.pop(k) for k in rel_keys if k in updates}
+
+            if updates:
+                set_parts = []
+                params = []
+                for key, value in updates.items():
+                    set_parts.append(f"{key} = %s")
+                    params.append(value)
+                params.append(strategy_id)
+                cursor.execute(
+                    f"UPDATE strategy SET {', '.join(set_parts)} WHERE id = %s",
+                    params)
+
+            if rel_fields:
+                self._save_relations(cursor, strategy_id, rel_fields)
+
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def delete(self, strategy_id: str):
+        """删除 strategy 及其所有 junction 行"""
+        cursor = self._get_cursor()
+        try:
+            cascade_delete(cursor, 'strategy', strategy_id)
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def count(self, status: Optional[str] = None) -> int:
+        cursor = self._get_cursor()
+        try:
+            if status:
+                cursor.execute("SELECT COUNT(*) as count FROM strategy WHERE status = %s", (status,))
+            else:
+                cursor.execute("SELECT COUNT(*) as count FROM strategy")
+            return cursor.fetchone()['count']
+        finally:
+            cursor.close()
+
+    # ─── 增量关联 API(不删已有)─────────────────────────────────
+
+    def add_capability(self, strategy_id: str, capability_id: str,
+                       relation_type: str = 'compose'):
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (strategy_id, capability_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_knowledge(self, strategy_id: str, knowledge_id: str,
+                      relation_type: str = 'source'):
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO strategy_knowledge (strategy_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (strategy_id, knowledge_id, relation_type))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_resource(self, strategy_id: str, resource_id: str):
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO strategy_resource (strategy_id, resource_id) "
+                "VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                (strategy_id, resource_id))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    def add_requirement(self, strategy_id: str, requirement_id: str):
+        """增量挂接 requirement-strategy 边(这个 strategy 满足该 requirement)"""
+        cursor = self._get_cursor()
+        try:
+            cursor.execute(
+                "INSERT INTO requirement_strategy (requirement_id, strategy_id) "
+                "VALUES (%s, %s) ON CONFLICT DO NOTHING",
+                (requirement_id, strategy_id))
+            self.conn.commit()
+        finally:
+            cursor.close()
+
+    # ─── 辅助 ────────────────────────────────────────────────────
+
+    def _format_result(self, row: Dict) -> Optional[Dict]:
+        if not row:
+            return None
+        import json
+        result = dict(row)
+        for field in ('requirement_ids', 'capability_ids', 'knowledge_ids', 'resource_ids'):
+            if field in result and isinstance(result[field], str):
+                result[field] = json.loads(result[field])
+            elif field in result and result[field] is None:
+                result[field] = []
+        for field in ('capability_links', 'knowledge_links'):
+            if field in result and isinstance(result[field], str):
+                result[field] = json.loads(result[field])
+            elif field in result and result[field] is None:
+                result[field] = []
+        return result
+
+    def close(self):
+        if self.conn:
+            self.conn.close()

+ 42 - 15
knowhub/knowhub_db/pg_tool_store.py

@@ -15,12 +15,16 @@ from knowhub.knowhub_db.cascade import cascade_delete
 
 load_dotenv()
 
-# 关联字段子查询
+# 关联字段子查询。knowledge 边暴露两种视图:knowledge_ids(扁平)+ knowledge_links(含 type)
 _REL_SUBQUERIES = """
     (SELECT COALESCE(json_agg(ct.capability_id), '[]'::json)
      FROM capability_tool ct WHERE ct.tool_id = tool.id) AS capability_ids,
     (SELECT COALESCE(json_agg(tk.knowledge_id), '[]'::json)
      FROM tool_knowledge tk WHERE tk.tool_id = tool.id) AS knowledge_ids,
+    (SELECT COALESCE(json_agg(json_build_object(
+        'id', tk2.knowledge_id, 'relation_type', tk2.relation_type
+     )), '[]'::json)
+     FROM tool_knowledge tk2 WHERE tk2.tool_id = tool.id) AS knowledge_links,
     (SELECT COALESCE(json_agg(tp.provider_id), '[]'::json)
      FROM tool_provider tp WHERE tp.tool_id = tool.id) AS provider_ids
 """
@@ -30,6 +34,21 @@ _BASE_FIELDS = "id, name, version, introduction, tutorial, input, output, update
 _SELECT_FIELDS = f"{_BASE_FIELDS}, {_REL_SUBQUERIES}"
 
 
+def _normalize_links(data: Dict, links_key: str, ids_key: str, default_type: str):
+    """两种输入格式统一:{links_key: [{id, relation_type}]} 或 {ids_key: [id]}"""
+    if links_key in data and data[links_key] is not None:
+        out = []
+        for item in data[links_key]:
+            if isinstance(item, dict):
+                out.append((item['id'], item.get('relation_type', default_type)))
+            else:
+                out.append((item, default_type))
+        return out
+    if ids_key in data and data[ids_key] is not None:
+        return [(i, default_type) for i in data[ids_key]]
+    return None
+
+
 class PostgreSQLToolStore:
     def __init__(self):
         """初始化 PostgreSQL 连接"""
@@ -40,7 +59,7 @@ class PostgreSQLToolStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
         print(f"[PostgreSQL Tool] 已连接到远程数据库: {os.getenv('KNOWHUB_DB')}")
 
     def _reconnect(self):
@@ -51,7 +70,7 @@ class PostgreSQLToolStore:
             password=os.getenv('KNOWHUB_PASSWORD'),
             database=os.getenv('KNOWHUB_DB_NAME')
         )
-        self.conn.autocommit = False
+        self.conn.autocommit = True
 
     def _ensure_connection(self):
         if self.conn.closed != 0:
@@ -107,12 +126,14 @@ class PostgreSQLToolStore:
                     cursor.execute(
                         "INSERT INTO capability_tool (capability_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
                         (cap_id, tool_id))
-            if 'knowledge_ids' in tool:
+            k_links = _normalize_links(tool, 'knowledge_links', 'knowledge_ids', 'related')
+            if k_links is not None:
                 cursor.execute("DELETE FROM tool_knowledge WHERE tool_id = %s", (tool_id,))
-                for kid in tool['knowledge_ids']:
+                for kid, rtype in k_links:
                     cursor.execute(
-                        "INSERT INTO tool_knowledge (tool_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (tool_id, kid))
+                        "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (tool_id, kid, rtype))
             if 'provider_ids' in tool:
                 cursor.execute("DELETE FROM tool_provider WHERE tool_id = %s", (tool_id,))
                 for pid in tool['provider_ids']:
@@ -197,6 +218,7 @@ class PostgreSQLToolStore:
             # 分离关联字段
             cap_ids = updates.pop('capability_ids', None)
             knowledge_ids = updates.pop('knowledge_ids', None)
+            knowledge_links = updates.pop('knowledge_links', None)
             provider_ids = updates.pop('provider_ids', None)
 
             if updates:
@@ -223,12 +245,16 @@ class PostgreSQLToolStore:
                         "INSERT INTO capability_tool (capability_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
                         (cap_id, tool_id))
 
-            if knowledge_ids is not None:
+            k_links = _normalize_links(
+                {'knowledge_links': knowledge_links, 'knowledge_ids': knowledge_ids},
+                'knowledge_links', 'knowledge_ids', 'related')
+            if k_links is not None:
                 cursor.execute("DELETE FROM tool_knowledge WHERE tool_id = %s", (tool_id,))
-                for kid in knowledge_ids:
+                for kid, rtype in k_links:
                     cursor.execute(
-                        "INSERT INTO tool_knowledge (tool_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                        (tool_id, kid))
+                        "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                        "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                        (tool_id, kid, rtype))
 
             if provider_ids is not None:
                 cursor.execute("DELETE FROM tool_provider WHERE tool_id = %s", (tool_id,))
@@ -241,13 +267,14 @@ class PostgreSQLToolStore:
         finally:
             cursor.close()
 
-    def add_knowledge(self, tool_id: str, knowledge_id: str):
+    def add_knowledge(self, tool_id: str, knowledge_id: str, relation_type: str = 'related'):
         """向工具添加一条知识关联(不删除已有关联)"""
         cursor = self._get_cursor()
         try:
             cursor.execute(
-                "INSERT INTO tool_knowledge (tool_id, knowledge_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
-                (tool_id, knowledge_id))
+                "INSERT INTO tool_knowledge (tool_id, knowledge_id, relation_type) "
+                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
+                (tool_id, knowledge_id, relation_type))
             self.conn.commit()
         finally:
             cursor.close()
@@ -285,7 +312,7 @@ class PostgreSQLToolStore:
                 except json.JSONDecodeError:
                     result[field] = None
         # 关联字段(来自 junction table 子查询)
-        for field in ('capability_ids', 'knowledge_ids', 'provider_ids'):
+        for field in ('capability_ids', 'knowledge_ids', 'provider_ids', 'knowledge_links'):
             if field in result and isinstance(result[field], str):
                 result[field] = json.loads(result[field])
             elif field in result and result[field] is None: