|
|
@@ -8,14 +8,13 @@ FastAPI + Milvus Lite(知识)+ SQLite(资源),单文件部署。
|
|
|
import os
|
|
|
import re
|
|
|
import json
|
|
|
-import sqlite3
|
|
|
import asyncio
|
|
|
import base64
|
|
|
import time
|
|
|
import uuid
|
|
|
from contextlib import asynccontextmanager
|
|
|
from datetime import datetime, timezone
|
|
|
-from typing import Optional, List
|
|
|
+from typing import Optional, List, Dict
|
|
|
from pathlib import Path
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
|
|
@@ -37,7 +36,8 @@ _dedup_llm = create_openrouter_llm_call(model="google/gemini-2.5-flash-lite")
|
|
|
_tool_analysis_llm = create_qwen_llm_call(model="qwen3.5-plus")
|
|
|
|
|
|
# 导入向量存储和 embedding
|
|
|
-from knowhub.vector_store import MilvusStore
|
|
|
+from knowhub.knowhub_db.pg_store import PostgreSQLStore
|
|
|
+from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
|
|
|
from knowhub.embeddings import get_embedding, get_embeddings_batch
|
|
|
|
|
|
BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub")
|
|
|
@@ -54,19 +54,10 @@ if ORG_KEYS_RAW:
|
|
|
ORG_KEYS[org.strip()] = key_b64.strip()
|
|
|
|
|
|
DB_PATH = Path(__file__).parent / BRAND_DB
|
|
|
-MILVUS_DATA_DIR = Path(__file__).parent / "milvus_data"
|
|
|
-
|
|
|
-# 全局 Milvus 存储实例
|
|
|
-milvus_store: Optional[MilvusStore] = None
|
|
|
-
|
|
|
-# --- 数据库 ---
|
|
|
-
|
|
|
-def get_db() -> sqlite3.Connection:
|
|
|
- conn = sqlite3.connect(str(DB_PATH))
|
|
|
- conn.row_factory = sqlite3.Row
|
|
|
- conn.execute("PRAGMA journal_mode=WAL")
|
|
|
- return conn
|
|
|
|
|
|
+# 全局 PostgreSQL 存储实例
|
|
|
+pg_store: Optional[PostgreSQLStore] = None
|
|
|
+pg_resource_store: Optional[PostgreSQLResourceStore] = None
|
|
|
|
|
|
# --- 加密/解密 ---
|
|
|
|
|
|
@@ -183,53 +174,6 @@ def serialize_milvus_result(data):
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def init_db():
|
|
|
- """初始化 SQLite(仅用于 resources)"""
|
|
|
- conn = get_db()
|
|
|
- conn.execute("""
|
|
|
- CREATE TABLE IF NOT EXISTS experiences (
|
|
|
- id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
- name TEXT NOT NULL,
|
|
|
- url TEXT DEFAULT '',
|
|
|
- category TEXT DEFAULT '',
|
|
|
- task TEXT NOT NULL,
|
|
|
- score INTEGER CHECK(score BETWEEN 1 AND 5),
|
|
|
- outcome TEXT DEFAULT '',
|
|
|
- tips TEXT DEFAULT '',
|
|
|
- content_id TEXT DEFAULT '',
|
|
|
- submitted_by TEXT DEFAULT '',
|
|
|
- created_at TEXT NOT NULL
|
|
|
- )
|
|
|
- """)
|
|
|
- conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)")
|
|
|
-
|
|
|
- conn.execute("""
|
|
|
- CREATE TABLE IF NOT EXISTS resources (
|
|
|
- id TEXT PRIMARY KEY,
|
|
|
- title TEXT DEFAULT '',
|
|
|
- body TEXT NOT NULL,
|
|
|
- secure_body TEXT DEFAULT '',
|
|
|
- content_type TEXT DEFAULT 'text',
|
|
|
- metadata TEXT DEFAULT '{}',
|
|
|
- sort_order INTEGER DEFAULT 0,
|
|
|
- submitted_by TEXT DEFAULT '',
|
|
|
- created_at TEXT NOT NULL,
|
|
|
- updated_at TEXT DEFAULT ''
|
|
|
- )
|
|
|
- """)
|
|
|
-
|
|
|
- conn.execute("""
|
|
|
- CREATE TABLE IF NOT EXISTS relation_cache (
|
|
|
- id INTEGER PRIMARY KEY CHECK(id = 1),
|
|
|
- data TEXT NOT NULL DEFAULT '{}'
|
|
|
- )
|
|
|
- """)
|
|
|
- conn.execute("INSERT OR IGNORE INTO relation_cache(id, data) VALUES(1, '{}')")
|
|
|
-
|
|
|
- conn.commit()
|
|
|
- conn.close()
|
|
|
-
|
|
|
-
|
|
|
# --- Models ---
|
|
|
|
|
|
class ResourceIn(BaseModel):
|
|
|
@@ -459,31 +403,22 @@ content: {content}
|
|
|
# --- Dedup: RelationCache ---
|
|
|
|
|
|
class RelationCache:
|
|
|
- """关系缓存,存储在 SQLite relation_cache 表(单行 JSON)"""
|
|
|
+ """关系缓存,存储在内存中"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self._cache: Dict[str, List[str]] = {}
|
|
|
|
|
|
def load(self) -> dict:
|
|
|
- conn = get_db()
|
|
|
- try:
|
|
|
- row = conn.execute("SELECT data FROM relation_cache WHERE id=1").fetchone()
|
|
|
- return json.loads(row["data"]) if row else {}
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ return self._cache
|
|
|
|
|
|
def save(self, cache: dict):
|
|
|
- conn = get_db()
|
|
|
- try:
|
|
|
- conn.execute("UPDATE relation_cache SET data=? WHERE id=1", (json.dumps(cache),))
|
|
|
- conn.commit()
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ self._cache = cache
|
|
|
|
|
|
def add_relation(self, relation_type: str, knowledge_id: str):
|
|
|
- cache = self.load()
|
|
|
- if relation_type not in cache:
|
|
|
- cache[relation_type] = []
|
|
|
- if knowledge_id not in cache[relation_type]:
|
|
|
- cache[relation_type].append(knowledge_id)
|
|
|
- self.save(cache)
|
|
|
+ if relation_type not in self._cache:
|
|
|
+ self._cache[relation_type] = []
|
|
|
+ if knowledge_id not in self._cache[relation_type]:
|
|
|
+ self._cache[relation_type].append(knowledge_id)
|
|
|
|
|
|
|
|
|
# --- Dedup: KnowledgeProcessor ---
|
|
|
@@ -501,7 +436,7 @@ class KnowledgeProcessor:
|
|
|
# 第一阶段:处理 pending(去重)
|
|
|
while True:
|
|
|
try:
|
|
|
- pending = milvus_store.query('status == "pending"', limit=50)
|
|
|
+ pending = pg_store.query('status == "pending"', limit=50)
|
|
|
except Exception as e:
|
|
|
print(f"[KnowledgeProcessor] 查询 pending 失败: {e}")
|
|
|
break
|
|
|
@@ -512,7 +447,7 @@ class KnowledgeProcessor:
|
|
|
# 第二阶段:处理 dedup_passed(工具关联)
|
|
|
while True:
|
|
|
try:
|
|
|
- dedup_passed = milvus_store.query('status == "dedup_passed"', limit=50)
|
|
|
+ dedup_passed = pg_store.query('status == "dedup_passed"', limit=50)
|
|
|
except Exception as e:
|
|
|
print(f"[KnowledgeProcessor] 查询 dedup_passed 失败: {e}")
|
|
|
break
|
|
|
@@ -526,7 +461,7 @@ class KnowledgeProcessor:
|
|
|
now = int(time.time())
|
|
|
# 乐观锁:pending → processing(时间戳存秒级)
|
|
|
try:
|
|
|
- milvus_store.update(kid, {"status": "processing", "updated_at": now})
|
|
|
+ pg_store.update(kid, {"status": "processing", "updated_at": now})
|
|
|
except Exception as e:
|
|
|
print(f"[KnowledgeProcessor] 锁定 {kid} 失败: {e}")
|
|
|
return
|
|
|
@@ -535,7 +470,7 @@ class KnowledgeProcessor:
|
|
|
embedding = knowledge.get("embedding")
|
|
|
if not embedding:
|
|
|
embedding = await get_embedding(knowledge["task"])
|
|
|
- candidates = milvus_store.search(
|
|
|
+ candidates = pg_store.search(
|
|
|
query_embedding=embedding,
|
|
|
filters='(status == "approved" or status == "checked")',
|
|
|
limit=10
|
|
|
@@ -545,7 +480,7 @@ class KnowledgeProcessor:
|
|
|
candidates = [c for c in candidates if c.get("score", 0) >= 0.75]
|
|
|
|
|
|
if not candidates:
|
|
|
- milvus_store.update(kid, {"status": "dedup_passed", "updated_at": now})
|
|
|
+ pg_store.update(kid, {"status": "dedup_passed", "updated_at": now})
|
|
|
return
|
|
|
|
|
|
llm_result = await self._llm_judge_relations(knowledge, candidates)
|
|
|
@@ -554,7 +489,7 @@ class KnowledgeProcessor:
|
|
|
except Exception as e:
|
|
|
print(f"[KnowledgeProcessor] 处理 {kid} 失败: {e},回退到 pending")
|
|
|
try:
|
|
|
- milvus_store.update(kid, {"status": "pending", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(kid, {"status": "pending", "updated_at": int(time.time())})
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
@@ -617,7 +552,7 @@ class KnowledgeProcessor:
|
|
|
rejected_relationships.append({"type": rel_type, "target": old_id})
|
|
|
if rel_type in ("duplicate", "subset") and old_id:
|
|
|
try:
|
|
|
- old = milvus_store.get_by_id(old_id)
|
|
|
+ old = pg_store.get_by_id(old_id)
|
|
|
if not old:
|
|
|
continue
|
|
|
eval_data = old.get("eval") or {}
|
|
|
@@ -630,10 +565,10 @@ class KnowledgeProcessor:
|
|
|
"timestamp": now
|
|
|
})
|
|
|
eval_data["helpful_history"] = helpful_history
|
|
|
- milvus_store.update(old_id, {"eval": eval_data, "updated_at": now})
|
|
|
+ pg_store.update(old_id, {"eval": eval_data, "updated_at": now})
|
|
|
except Exception as e:
|
|
|
print(f"[Apply Decision] 更新旧知识 {old_id} helpful 失败: {e}")
|
|
|
- milvus_store.update(kid, {"status": "rejected", "relationships": json.dumps(rejected_relationships), "updated_at": now})
|
|
|
+ pg_store.update(kid, {"status": "rejected", "relationships": json.dumps(rejected_relationships), "updated_at": now})
|
|
|
else:
|
|
|
new_relationships = []
|
|
|
for rel in relations:
|
|
|
@@ -647,16 +582,16 @@ class KnowledgeProcessor:
|
|
|
self._relation_cache.add_relation(rel_type, old_id)
|
|
|
if reverse_type and reverse_type != "none":
|
|
|
try:
|
|
|
- old = milvus_store.get_by_id(old_id)
|
|
|
+ old = pg_store.get_by_id(old_id)
|
|
|
if old:
|
|
|
old_rels = old.get("relationships") or []
|
|
|
old_rels.append({"type": reverse_type, "target": kid})
|
|
|
- milvus_store.update(old_id, {"relationships": json.dumps(old_rels), "updated_at": now})
|
|
|
+ pg_store.update(old_id, {"relationships": json.dumps(old_rels), "updated_at": now})
|
|
|
self._relation_cache.add_relation(reverse_type, old_id)
|
|
|
self._relation_cache.add_relation(reverse_type, kid)
|
|
|
except Exception as e:
|
|
|
print(f"[Apply Decision] 更新旧知识关系 {old_id} 失败: {e}")
|
|
|
- milvus_store.update(kid, {
|
|
|
+ pg_store.update(kid, {
|
|
|
"status": "dedup_passed",
|
|
|
"relationships": json.dumps(new_relationships),
|
|
|
"updated_at": now
|
|
|
@@ -691,62 +626,61 @@ class KnowledgeProcessor:
|
|
|
raise
|
|
|
|
|
|
async def _create_or_get_tool_resource(self, tool_info: dict) -> Optional[str]:
|
|
|
- """创建或获取工具资源"""
|
|
|
+ """创建或获取工具资源(存入 PostgreSQL tool_table)"""
|
|
|
category = tool_info.get("category", "other")
|
|
|
slug = tool_info.get("slug", "")
|
|
|
if not slug:
|
|
|
return None
|
|
|
tool_id = f"tools/{category}/{slug}"
|
|
|
- conn = get_db()
|
|
|
+ now_ts = int(time.time())
|
|
|
+ cursor = pg_store._get_cursor()
|
|
|
try:
|
|
|
- row = conn.execute("SELECT id FROM resources WHERE id = ?", (tool_id,)).fetchone()
|
|
|
- if row:
|
|
|
+ cursor.execute("SELECT id FROM tool_table WHERE id = %s", (tool_id,))
|
|
|
+ if cursor.fetchone():
|
|
|
return tool_id
|
|
|
- now_str = datetime.now(timezone.utc).isoformat()
|
|
|
- metadata = {
|
|
|
- "tool_name": tool_info.get("name", ""),
|
|
|
- "tool_slug": slug,
|
|
|
- "category": category,
|
|
|
- "version": tool_info.get("version", ""),
|
|
|
- "description": tool_info.get("description", ""),
|
|
|
- "usage": tool_info.get("usage", ""),
|
|
|
- "scenarios": tool_info.get("scenarios", []),
|
|
|
- "input": tool_info.get("input", ""),
|
|
|
- "output": tool_info.get("output", ""),
|
|
|
- "status": tool_info.get("status", "未接入"),
|
|
|
- "knowledge_ids": []
|
|
|
- }
|
|
|
- conn.execute(
|
|
|
- "INSERT INTO resources (id, title, body, content_type, metadata, submitted_by, created_at, updated_at)"
|
|
|
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
|
- (tool_id, tool_info.get("name", slug), "", "tool",
|
|
|
- json.dumps(metadata), "knowledge_processor", now_str, now_str),
|
|
|
- )
|
|
|
- conn.commit()
|
|
|
+ cursor.execute("""
|
|
|
+ INSERT INTO tool_table (id, name, version, introduction, tutorial, input, output,
|
|
|
+ updated_time, status, knowledge, case_knowledge, process_knowledge)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """, (
|
|
|
+ tool_id,
|
|
|
+ tool_info.get("name", slug),
|
|
|
+ tool_info.get("version") or None,
|
|
|
+ tool_info.get("description", ""),
|
|
|
+ tool_info.get("usage", ""),
|
|
|
+ json.dumps(tool_info.get("input", "")),
|
|
|
+ json.dumps(tool_info.get("output", "")),
|
|
|
+ now_ts,
|
|
|
+ tool_info.get("status", "未接入"),
|
|
|
+ json.dumps([]),
|
|
|
+ json.dumps([]),
|
|
|
+ json.dumps([]),
|
|
|
+ ))
|
|
|
+ pg_store.conn.commit()
|
|
|
print(f"[Tool Resource] 创建新工具: {tool_id}")
|
|
|
return tool_id
|
|
|
finally:
|
|
|
- conn.close()
|
|
|
+ cursor.close()
|
|
|
|
|
|
async def _update_tool_knowledge_index(self, tool_id: str, knowledge_id: str):
|
|
|
- """更新工具资源的 knowledge_ids 索引"""
|
|
|
- conn = get_db()
|
|
|
+ """更新工具的 knowledge 关联索引(PostgreSQL tool_table)"""
|
|
|
+ now_ts = int(time.time())
|
|
|
+ cursor = pg_store._get_cursor()
|
|
|
try:
|
|
|
- row = conn.execute("SELECT metadata FROM resources WHERE id = ?", (tool_id,)).fetchone()
|
|
|
+ cursor.execute("SELECT knowledge FROM tool_table WHERE id = %s", (tool_id,))
|
|
|
+ row = cursor.fetchone()
|
|
|
if not row:
|
|
|
return
|
|
|
- metadata = json.loads(row["metadata"] or "{}")
|
|
|
- knowledge_ids = metadata.get("knowledge_ids", [])
|
|
|
+ knowledge_ids = row["knowledge"] if isinstance(row["knowledge"], list) else json.loads(row["knowledge"] or "[]")
|
|
|
if knowledge_id not in knowledge_ids:
|
|
|
knowledge_ids.append(knowledge_id)
|
|
|
- metadata["knowledge_ids"] = knowledge_ids
|
|
|
- conn.execute(
|
|
|
- "UPDATE resources SET metadata = ?, updated_at = ? WHERE id = ?",
|
|
|
- (json.dumps(metadata), datetime.now(timezone.utc).isoformat(), tool_id)
|
|
|
+ cursor.execute(
|
|
|
+ "UPDATE tool_table SET knowledge = %s, updated_time = %s WHERE id = %s",
|
|
|
+ (json.dumps(knowledge_ids), now_ts, tool_id)
|
|
|
)
|
|
|
- conn.commit()
|
|
|
+ pg_store.conn.commit()
|
|
|
finally:
|
|
|
- conn.close()
|
|
|
+ cursor.close()
|
|
|
|
|
|
async def _analyze_tool_relation(self, knowledge: dict):
|
|
|
"""分析知识与工具的关联关系"""
|
|
|
@@ -754,7 +688,7 @@ class KnowledgeProcessor:
|
|
|
now = int(time.time())
|
|
|
# 乐观锁:dedup_passed → analyzing
|
|
|
try:
|
|
|
- milvus_store.update(kid, {"status": "analyzing", "updated_at": now})
|
|
|
+ pg_store.update(kid, {"status": "analyzing", "updated_at": now})
|
|
|
except Exception as e:
|
|
|
print(f"[Tool Analysis] 锁定 {kid} 失败: {e}")
|
|
|
return
|
|
|
@@ -772,13 +706,13 @@ class KnowledgeProcessor:
|
|
|
has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
|
|
|
# 重新分析后仍然不一致 → 知识模糊,rejected
|
|
|
if not has_tools:
|
|
|
- milvus_store.update(kid, {"status": "rejected", "updated_at": now})
|
|
|
+ pg_store.update(kid, {"status": "rejected", "updated_at": now})
|
|
|
print(f"[Tool Analysis] {kid} 两次判定不一致,知识模糊,rejected")
|
|
|
return
|
|
|
|
|
|
# 情况2:无工具且无 tool tag → 直接 approved
|
|
|
if not has_tools:
|
|
|
- milvus_store.update(kid, {"status": "approved", "updated_at": now})
|
|
|
+ pg_store.update(kid, {"status": "approved", "updated_at": now})
|
|
|
return
|
|
|
|
|
|
# 情况3/4:有工具 → 创建资源并关联
|
|
|
@@ -803,7 +737,7 @@ class KnowledgeProcessor:
|
|
|
updates["tags"] = updated_tags
|
|
|
print(f"[Tool Analysis] {kid} 添加 tool tag")
|
|
|
|
|
|
- milvus_store.update(kid, updates)
|
|
|
+ pg_store.update(kid, updates)
|
|
|
|
|
|
for tool_id in tool_ids:
|
|
|
await self._update_tool_knowledge_index(tool_id, kid)
|
|
|
@@ -813,7 +747,7 @@ class KnowledgeProcessor:
|
|
|
except Exception as e:
|
|
|
print(f"[Tool Analysis] {kid} 分析失败: {e},回退到 dedup_passed")
|
|
|
try:
|
|
|
- milvus_store.update(kid, {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(kid, {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
@@ -826,22 +760,22 @@ async def _periodic_processor():
|
|
|
now = int(time.time())
|
|
|
# 回滚超时的 processing(5分钟 → pending)
|
|
|
timeout_5min = now - 300
|
|
|
- processing = milvus_store.query('status == "processing"', limit=200)
|
|
|
+ processing = pg_store.query('status == "processing"', limit=200)
|
|
|
for item in processing:
|
|
|
updated_at = item.get("updated_at", 0) or 0
|
|
|
updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
|
|
|
if updated_at_sec < timeout_5min:
|
|
|
print(f"[Periodic] 回滚超时 processing → pending: {item['id']}")
|
|
|
- milvus_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
|
|
|
# 回滚超时的 analyzing(10分钟 → dedup_passed)
|
|
|
timeout_10min = now - 600
|
|
|
- analyzing = milvus_store.query('status == "analyzing"', limit=200)
|
|
|
+ analyzing = pg_store.query('status == "analyzing"', limit=200)
|
|
|
for item in analyzing:
|
|
|
updated_at = item.get("updated_at", 0) or 0
|
|
|
updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
|
|
|
if updated_at_sec < timeout_10min:
|
|
|
print(f"[Periodic] 回滚超时 analyzing → dedup_passed: {item['id']}")
|
|
|
- milvus_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
except Exception as e:
|
|
|
print(f"[Periodic] 定时任务错误: {e}")
|
|
|
|
|
|
@@ -850,13 +784,11 @@ async def _periodic_processor():
|
|
|
|
|
|
@asynccontextmanager
|
|
|
async def lifespan(app: FastAPI):
|
|
|
- global milvus_store, knowledge_processor
|
|
|
+ global pg_store, pg_resource_store, knowledge_processor
|
|
|
|
|
|
- # 初始化 SQLite(resources)
|
|
|
- init_db()
|
|
|
-
|
|
|
- # 初始化 Milvus Lite(knowledge)
|
|
|
- milvus_store = MilvusStore(data_dir=str(MILVUS_DATA_DIR))
|
|
|
+ # 初始化 PostgreSQL(knowledge + resources)
|
|
|
+ pg_store = PostgreSQLStore()
|
|
|
+ pg_resource_store = PostgreSQLResourceStore()
|
|
|
|
|
|
# 初始化去重处理器 + 启动定时兜底任务
|
|
|
knowledge_processor = KnowledgeProcessor()
|
|
|
@@ -870,6 +802,8 @@ async def lifespan(app: FastAPI):
|
|
|
await periodic_task
|
|
|
except asyncio.CancelledError:
|
|
|
pass
|
|
|
+ pg_store.close()
|
|
|
+ pg_resource_store.close()
|
|
|
|
|
|
|
|
|
app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
|
|
|
@@ -879,52 +813,36 @@ app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
|
|
|
|
|
|
@app.post("/api/resource", status_code=201)
|
|
|
def submit_resource(resource: ResourceIn):
|
|
|
- conn = get_db()
|
|
|
+ """提交资源(存入 PostgreSQL resources 表)"""
|
|
|
try:
|
|
|
- now = datetime.now(timezone.utc).isoformat()
|
|
|
-
|
|
|
# 加密敏感内容
|
|
|
encrypted_secure_body = encrypt_content(resource.id, resource.secure_body)
|
|
|
|
|
|
- conn.execute(
|
|
|
- "INSERT OR REPLACE INTO resources"
|
|
|
- "(id, title, body, secure_body, content_type, metadata, sort_order, submitted_by, created_at, updated_at)"
|
|
|
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
|
- (
|
|
|
- resource.id,
|
|
|
- resource.title,
|
|
|
- resource.body,
|
|
|
- encrypted_secure_body,
|
|
|
- resource.content_type,
|
|
|
- json.dumps(resource.metadata),
|
|
|
- resource.sort_order,
|
|
|
- resource.submitted_by,
|
|
|
- now,
|
|
|
- now,
|
|
|
- ),
|
|
|
- )
|
|
|
- conn.commit()
|
|
|
+ pg_resource_store.insert_or_update({
|
|
|
+ 'id': resource.id,
|
|
|
+ 'title': resource.title,
|
|
|
+ 'body': resource.body,
|
|
|
+ 'secure_body': encrypted_secure_body,
|
|
|
+ 'content_type': resource.content_type,
|
|
|
+ 'metadata': resource.metadata,
|
|
|
+ 'sort_order': resource.sort_order,
|
|
|
+ 'submitted_by': resource.submitted_by
|
|
|
+ })
|
|
|
return {"status": "ok", "id": resource.id}
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ except Exception as e:
|
|
|
+ raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@app.get("/api/resource/{resource_id:path}", response_model=ResourceOut)
|
|
|
def get_resource(resource_id: str, x_org_key: Optional[str] = Header(None)):
|
|
|
- conn = get_db()
|
|
|
+ """获取资源详情(从 PostgreSQL)"""
|
|
|
try:
|
|
|
- row = conn.execute(
|
|
|
- "SELECT id, title, body, secure_body, content_type, metadata, sort_order FROM resources WHERE id = ?",
|
|
|
- (resource_id,),
|
|
|
- ).fetchone()
|
|
|
+ row = pg_resource_store.get_by_id(resource_id)
|
|
|
if not row:
|
|
|
raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
|
|
|
|
|
|
# 解密敏感内容
|
|
|
- secure_body = decrypt_content(resource_id, row["secure_body"] or "", x_org_key)
|
|
|
-
|
|
|
- # 解析metadata
|
|
|
- metadata = json.loads(row["metadata"] or "{}")
|
|
|
+ secure_body = decrypt_content(resource_id, row.get("secure_body", ""), x_org_key)
|
|
|
|
|
|
# 计算导航上下文
|
|
|
root_id = resource_id.split("/")[0] if "/" in resource_id else resource_id
|
|
|
@@ -932,36 +850,19 @@ def get_resource(resource_id: str, x_org_key: Optional[str] = Header(None)):
|
|
|
# TOC (根节点)
|
|
|
toc = None
|
|
|
if "/" in resource_id:
|
|
|
- toc_row = conn.execute(
|
|
|
- "SELECT id, title FROM resources WHERE id = ?",
|
|
|
- (root_id,),
|
|
|
- ).fetchone()
|
|
|
+ toc_row = pg_resource_store.get_by_id(root_id)
|
|
|
if toc_row:
|
|
|
toc = ResourceNode(id=toc_row["id"], title=toc_row["title"])
|
|
|
|
|
|
# Children (子节点)
|
|
|
- children = []
|
|
|
- children_rows = conn.execute(
|
|
|
- "SELECT id, title FROM resources WHERE id LIKE ? AND id != ? ORDER BY sort_order",
|
|
|
- (f"{resource_id}/%", resource_id),
|
|
|
- ).fetchall()
|
|
|
- children = [ResourceNode(id=r["id"], title=r["title"]) for r in children_rows]
|
|
|
+ children_rows = pg_resource_store.list_resources(prefix=f"{resource_id}/", limit=1000)
|
|
|
+ children = [ResourceNode(id=r["id"], title=r["title"]) for r in children_rows
|
|
|
+ if r["id"].count("/") == resource_id.count("/") + 1]
|
|
|
|
|
|
# Prev/Next (同级节点)
|
|
|
- prev_node = None
|
|
|
- next_node = None
|
|
|
- if "/" in resource_id:
|
|
|
- siblings = conn.execute(
|
|
|
- "SELECT id, title, sort_order FROM resources WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order",
|
|
|
- (f"{root_id}/%", f"{root_id}/%/%"),
|
|
|
- ).fetchall()
|
|
|
- for i, sib in enumerate(siblings):
|
|
|
- if sib["id"] == resource_id:
|
|
|
- if i > 0:
|
|
|
- prev_node = ResourceNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"])
|
|
|
- if i < len(siblings) - 1:
|
|
|
- next_node = ResourceNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"])
|
|
|
- break
|
|
|
+ prev_node, next_node = pg_resource_store.get_siblings(resource_id)
|
|
|
+ prev = ResourceNode(id=prev_node["id"], title=prev_node["title"]) if prev_node else None
|
|
|
+ next = ResourceNode(id=next_node["id"], title=next_node["title"]) if next_node else None
|
|
|
|
|
|
return ResourceOut(
|
|
|
id=row["id"],
|
|
|
@@ -969,67 +870,49 @@ def get_resource(resource_id: str, x_org_key: Optional[str] = Header(None)):
|
|
|
body=row["body"],
|
|
|
secure_body=secure_body,
|
|
|
content_type=row["content_type"],
|
|
|
- metadata=metadata,
|
|
|
+ metadata=row.get("metadata", {}),
|
|
|
toc=toc,
|
|
|
children=children,
|
|
|
- prev=prev_node,
|
|
|
- next=next_node,
|
|
|
+ prev=prev,
|
|
|
+ next=next,
|
|
|
)
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ except HTTPException:
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@app.patch("/api/resource/{resource_id:path}")
|
|
|
def patch_resource(resource_id: str, patch: ResourcePatchIn):
|
|
|
- """更新resource字段"""
|
|
|
- conn = get_db()
|
|
|
+ """更新resource字段(PostgreSQL)"""
|
|
|
try:
|
|
|
# 检查是否存在
|
|
|
- row = conn.execute("SELECT id FROM resources WHERE id = ?", (resource_id,)).fetchone()
|
|
|
- if not row:
|
|
|
+ if not pg_resource_store.get_by_id(resource_id):
|
|
|
raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
|
|
|
|
|
|
- # 构建更新语句
|
|
|
- updates = []
|
|
|
- params = []
|
|
|
+ # 构建更新字典
|
|
|
+ updates = {}
|
|
|
|
|
|
if patch.title is not None:
|
|
|
- updates.append("title = ?")
|
|
|
- params.append(patch.title)
|
|
|
-
|
|
|
+ updates['title'] = patch.title
|
|
|
if patch.body is not None:
|
|
|
- updates.append("body = ?")
|
|
|
- params.append(patch.body)
|
|
|
-
|
|
|
+ updates['body'] = patch.body
|
|
|
if patch.secure_body is not None:
|
|
|
- encrypted = encrypt_content(resource_id, patch.secure_body)
|
|
|
- updates.append("secure_body = ?")
|
|
|
- params.append(encrypted)
|
|
|
-
|
|
|
+ updates['secure_body'] = encrypt_content(resource_id, patch.secure_body)
|
|
|
if patch.content_type is not None:
|
|
|
- updates.append("content_type = ?")
|
|
|
- params.append(patch.content_type)
|
|
|
-
|
|
|
+ updates['content_type'] = patch.content_type
|
|
|
if patch.metadata is not None:
|
|
|
- updates.append("metadata = ?")
|
|
|
- params.append(json.dumps(patch.metadata))
|
|
|
+ updates['metadata'] = patch.metadata
|
|
|
|
|
|
if not updates:
|
|
|
return {"status": "ok", "message": "No fields to update"}
|
|
|
|
|
|
- # 添加updated_at
|
|
|
- updates.append("updated_at = ?")
|
|
|
- params.append(datetime.now(timezone.utc).isoformat())
|
|
|
-
|
|
|
- # 执行更新
|
|
|
- params.append(resource_id)
|
|
|
- sql = f"UPDATE resources SET {', '.join(updates)} WHERE id = ?"
|
|
|
- conn.execute(sql, params)
|
|
|
- conn.commit()
|
|
|
-
|
|
|
+ pg_resource_store.update(resource_id, updates)
|
|
|
return {"status": "ok", "id": resource_id}
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ except HTTPException:
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@app.get("/api/resource")
|
|
|
@@ -1037,50 +920,30 @@ def list_resources(
|
|
|
content_type: Optional[str] = Query(None),
|
|
|
limit: int = Query(100, ge=1, le=1000)
|
|
|
):
|
|
|
- """列出所有resource"""
|
|
|
- conn = get_db()
|
|
|
+ """列出所有resource(PostgreSQL)"""
|
|
|
try:
|
|
|
- sql = "SELECT id, title, content_type, metadata, created_at FROM resources"
|
|
|
- params = []
|
|
|
-
|
|
|
- if content_type:
|
|
|
- sql += " WHERE content_type = ?"
|
|
|
- params.append(content_type)
|
|
|
-
|
|
|
- sql += " ORDER BY id LIMIT ?"
|
|
|
- params.append(limit)
|
|
|
-
|
|
|
- rows = conn.execute(sql, params).fetchall()
|
|
|
-
|
|
|
- results = []
|
|
|
- for row in rows:
|
|
|
- results.append({
|
|
|
- "id": row["id"],
|
|
|
- "title": row["title"],
|
|
|
- "content_type": row["content_type"],
|
|
|
- "metadata": json.loads(row["metadata"] or "{}"),
|
|
|
- "created_at": row["created_at"],
|
|
|
- })
|
|
|
-
|
|
|
+ results = pg_resource_store.list_resources(
|
|
|
+ content_type=content_type,
|
|
|
+ limit=limit
|
|
|
+ )
|
|
|
return {"results": results, "count": len(results)}
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ except Exception as e:
|
|
|
+ raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@app.delete("/api/resource/{resource_id:path}")
|
|
|
def delete_resource(resource_id: str):
|
|
|
- """删除单个resource"""
|
|
|
- conn = get_db()
|
|
|
+ """删除单个resource(PostgreSQL)"""
|
|
|
try:
|
|
|
- row = conn.execute("SELECT id FROM resources WHERE id = ?", (resource_id,)).fetchone()
|
|
|
- if not row:
|
|
|
+ if not pg_resource_store.get_by_id(resource_id):
|
|
|
raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
|
|
|
|
|
|
- conn.execute("DELETE FROM resources WHERE id = ?", (resource_id,))
|
|
|
- conn.commit()
|
|
|
+ pg_resource_store.delete(resource_id)
|
|
|
return {"status": "ok", "id": resource_id}
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ except HTTPException:
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
# --- Knowledge API ---
|
|
|
@@ -1177,7 +1040,7 @@ async def search_knowledge_api(
|
|
|
|
|
|
# 3. 向量召回(3*k 个候选)
|
|
|
recall_limit = top_k * 3
|
|
|
- candidates = milvus_store.search(
|
|
|
+ candidates = pg_store.search(
|
|
|
query_embedding=query_embedding,
|
|
|
filters=filter_expr,
|
|
|
limit=recall_limit
|
|
|
@@ -1272,7 +1135,7 @@ async def save_knowledge(knowledge: KnowledgeIn, background_tasks: BackgroundTas
|
|
|
print(f"[Save Knowledge] 插入数据: {json.dumps({k: v for k, v in insert_data.items() if k != 'embedding'}, ensure_ascii=False)}")
|
|
|
|
|
|
# 插入 Milvus
|
|
|
- milvus_store.insert(insert_data)
|
|
|
+ pg_store.insert(insert_data)
|
|
|
|
|
|
# 触发后台去重处理
|
|
|
background_tasks.add_task(knowledge_processor.process_pending)
|
|
|
@@ -1334,7 +1197,7 @@ def list_knowledge(
|
|
|
# 查询 Milvus(先获取所有符合条件的数据)
|
|
|
# Milvus 的 limit 是总数限制,我们需要获取足够多的数据来支持分页
|
|
|
max_limit = 10000 # 设置一个合理的上限
|
|
|
- results = milvus_store.query(filter_expr, limit=max_limit)
|
|
|
+ results = pg_store.query(filter_expr, limit=max_limit)
|
|
|
|
|
|
# 转换为可序列化的格式
|
|
|
serialized_results = [serialize_milvus_result(r) for r in results]
|
|
|
@@ -1369,7 +1232,7 @@ def get_all_tags():
|
|
|
"""获取所有已有的 tags"""
|
|
|
try:
|
|
|
# 查询所有知识
|
|
|
- results = milvus_store.query('id != ""', limit=10000)
|
|
|
+ results = pg_store.query('id != ""', limit=10000)
|
|
|
|
|
|
all_tags = set()
|
|
|
for item in results:
|
|
|
@@ -1391,7 +1254,7 @@ def get_all_tags():
|
|
|
def get_pending_knowledge(limit: int = Query(default=50, ge=1, le=200)):
|
|
|
"""查询待处理队列(pending + processing + dedup_passed + analyzing)"""
|
|
|
try:
|
|
|
- pending = milvus_store.query(
|
|
|
+ pending = pg_store.query(
|
|
|
'status == "pending" or status == "processing" or status == "dedup_passed" or status == "analyzing"',
|
|
|
limit=limit
|
|
|
)
|
|
|
@@ -1407,13 +1270,13 @@ async def trigger_process(force: bool = Query(default=False)):
|
|
|
"""手动触发去重处理。force=true 时先回滚所有 processing → pending,analyzing → dedup_passed"""
|
|
|
try:
|
|
|
if force:
|
|
|
- processing = milvus_store.query('status == "processing"', limit=200)
|
|
|
+ processing = pg_store.query('status == "processing"', limit=200)
|
|
|
for item in processing:
|
|
|
- milvus_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
|
|
|
print(f"[Manual Process] 回滚 {len(processing)} 条 processing → pending")
|
|
|
- analyzing = milvus_store.query('status == "analyzing"', limit=200)
|
|
|
+ analyzing = pg_store.query('status == "analyzing"', limit=200)
|
|
|
for item in analyzing:
|
|
|
- milvus_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
print(f"[Manual Process] 回滚 {len(analyzing)} 条 analyzing → dedup_passed")
|
|
|
asyncio.create_task(knowledge_processor.process_pending())
|
|
|
return {"status": "ok", "message": "处理任务已触发"}
|
|
|
@@ -1424,20 +1287,15 @@ async def trigger_process(force: bool = Query(default=False)):
|
|
|
|
|
|
@app.post("/api/knowledge/migrate")
|
|
|
async def migrate_knowledge_schema():
|
|
|
- """手动触发 schema 迁移(中转 collection 模式,将旧数据升级到含 status/relationships 的新 schema)"""
|
|
|
- try:
|
|
|
- count = milvus_store.migrate_schema()
|
|
|
- return {"status": "ok", "migrated": count, "message": f"迁移完成,共迁移 {count} 条知识"}
|
|
|
- except Exception as e:
|
|
|
- print(f"[Migrate] 错误: {e}")
|
|
|
- raise HTTPException(status_code=500, detail=str(e))
|
|
|
+ """手动触发 schema 迁移(PostgreSQL不需要此功能)"""
|
|
|
+ return {"status": "ok", "message": "PostgreSQL不需要schema迁移"}
|
|
|
|
|
|
|
|
|
@app.get("/api/knowledge/status/{knowledge_id}")
|
|
|
def get_knowledge_status(knowledge_id: str):
|
|
|
"""查询单条知识的处理状态和关系"""
|
|
|
try:
|
|
|
- result = milvus_store.get_by_id(knowledge_id)
|
|
|
+ result = pg_store.get_by_id(knowledge_id)
|
|
|
if not result:
|
|
|
raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
|
|
|
serialized = serialize_milvus_result(result)
|
|
|
@@ -1459,7 +1317,7 @@ def get_knowledge_status(knowledge_id: str):
|
|
|
def get_knowledge(knowledge_id: str):
|
|
|
"""获取单条知识"""
|
|
|
try:
|
|
|
- result = milvus_store.get_by_id(knowledge_id)
|
|
|
+ result = pg_store.get_by_id(knowledge_id)
|
|
|
if not result:
|
|
|
raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
|
|
|
|
|
|
@@ -1506,7 +1364,7 @@ async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
|
|
|
"""更新知识评估,支持知识进化"""
|
|
|
try:
|
|
|
# 获取现有知识
|
|
|
- existing = milvus_store.get_by_id(knowledge_id)
|
|
|
+ existing = pg_store.get_by_id(knowledge_id)
|
|
|
if not existing:
|
|
|
raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
|
|
|
|
|
|
@@ -1551,7 +1409,7 @@ async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
|
|
|
updates["embedding"] = embedding
|
|
|
|
|
|
# 更新 Milvus
|
|
|
- milvus_store.update(knowledge_id, updates)
|
|
|
+ pg_store.update(knowledge_id, updates)
|
|
|
|
|
|
return {"status": "ok", "knowledge_id": knowledge_id}
|
|
|
|
|
|
@@ -1567,7 +1425,7 @@ async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
|
|
|
"""直接编辑知识字段"""
|
|
|
try:
|
|
|
# 获取现有知识
|
|
|
- existing = milvus_store.get_by_id(knowledge_id)
|
|
|
+ existing = pg_store.get_by_id(knowledge_id)
|
|
|
if not existing:
|
|
|
raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
|
|
|
|
|
|
@@ -1606,7 +1464,7 @@ async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
|
|
|
updates["embedding"] = embedding
|
|
|
|
|
|
# 更新 Milvus
|
|
|
- milvus_store.update(knowledge_id, updates)
|
|
|
+ pg_store.update(knowledge_id, updates)
|
|
|
|
|
|
return {"status": "ok", "knowledge_id": knowledge_id}
|
|
|
|
|
|
@@ -1622,12 +1480,12 @@ def delete_knowledge(knowledge_id: str):
|
|
|
"""删除单条知识"""
|
|
|
try:
|
|
|
# 检查知识是否存在
|
|
|
- existing = milvus_store.get_by_id(knowledge_id)
|
|
|
+ existing = pg_store.get_by_id(knowledge_id)
|
|
|
if not existing:
|
|
|
raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
|
|
|
|
|
|
- # 从 Milvus 删除
|
|
|
- milvus_store.collection.delete(expr=f'id == "{knowledge_id}"')
|
|
|
+ # 从 PostgreSQL 删除
|
|
|
+ pg_store.delete(knowledge_id)
|
|
|
print(f"[Delete Knowledge] 已删除知识: {knowledge_id}")
|
|
|
|
|
|
return {"status": "ok", "knowledge_id": knowledge_id}
|
|
|
@@ -1646,15 +1504,19 @@ def batch_delete_knowledge(knowledge_ids: List[str] = Body(...)):
|
|
|
if not knowledge_ids:
|
|
|
raise HTTPException(status_code=400, detail="knowledge_ids cannot be empty")
|
|
|
|
|
|
- # 构建删除表达式
|
|
|
- ids_str = '", "'.join(knowledge_ids)
|
|
|
- expr = f'id in ["{ids_str}"]'
|
|
|
-
|
|
|
# 批量删除
|
|
|
- milvus_store.collection.delete(expr=expr)
|
|
|
- print(f"[Batch Delete] 已删除 {len(knowledge_ids)} 条知识")
|
|
|
-
|
|
|
- return {"status": "ok", "deleted_count": len(knowledge_ids)}
|
|
|
+ cursor = pg_store._get_cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "DELETE FROM knowledge WHERE id = ANY(%s)",
|
|
|
+ (knowledge_ids,)
|
|
|
+ )
|
|
|
+ pg_store.conn.commit()
|
|
|
+ deleted_count = cursor.rowcount
|
|
|
+ print(f"[Batch Delete] 已删除 {deleted_count} 条知识")
|
|
|
+ return {"status": "ok", "deleted_count": deleted_count}
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
@@ -1674,7 +1536,7 @@ async def batch_verify_knowledge(batch: KnowledgeBatchVerifyIn):
|
|
|
updated_count = 0
|
|
|
|
|
|
for kid in batch.knowledge_ids:
|
|
|
- existing = milvus_store.get_by_id(kid)
|
|
|
+ existing = pg_store.get_by_id(kid)
|
|
|
if not existing:
|
|
|
continue
|
|
|
|
|
|
@@ -1687,7 +1549,7 @@ async def batch_verify_knowledge(batch: KnowledgeBatchVerifyIn):
|
|
|
"issue_type": None,
|
|
|
"issue_action": None,
|
|
|
}
|
|
|
- milvus_store.update(kid, {"eval": eval_data, "status": "checked", "updated_at": int(time.time())})
|
|
|
+ pg_store.update(kid, {"eval": eval_data, "status": "checked", "updated_at": int(time.time())})
|
|
|
updated_count += 1
|
|
|
|
|
|
return {"status": "ok", "updated": updated_count}
|
|
|
@@ -1701,7 +1563,7 @@ async def batch_verify_knowledge(batch: KnowledgeBatchVerifyIn):
|
|
|
async def verify_knowledge(knowledge_id: str, verify: KnowledgeVerifyIn):
|
|
|
"""知识验证:approve 切换 approved↔checked,reject 设为 rejected"""
|
|
|
try:
|
|
|
- existing = milvus_store.get_by_id(knowledge_id)
|
|
|
+ existing = pg_store.get_by_id(knowledge_id)
|
|
|
if not existing:
|
|
|
raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
|
|
|
|
|
|
@@ -1710,7 +1572,7 @@ async def verify_knowledge(knowledge_id: str, verify: KnowledgeVerifyIn):
|
|
|
if verify.action == "approve":
|
|
|
# checked → approved(取消验证),其他 → checked
|
|
|
new_status = "approved" if current_status == "checked" else "checked"
|
|
|
- milvus_store.update(knowledge_id, {
|
|
|
+ pg_store.update(knowledge_id, {
|
|
|
"status": new_status,
|
|
|
"updated_at": int(time.time())
|
|
|
})
|
|
|
@@ -1718,7 +1580,7 @@ async def verify_knowledge(knowledge_id: str, verify: KnowledgeVerifyIn):
|
|
|
"message": "已取消验证" if new_status == "approved" else "验证通过"}
|
|
|
|
|
|
elif verify.action == "reject":
|
|
|
- milvus_store.update(knowledge_id, {
|
|
|
+ pg_store.update(knowledge_id, {
|
|
|
"status": "rejected",
|
|
|
"updated_at": int(time.time())
|
|
|
})
|
|
|
@@ -1753,7 +1615,7 @@ async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
|
|
|
if not knowledge_id:
|
|
|
continue
|
|
|
|
|
|
- existing = milvus_store.get_by_id(knowledge_id)
|
|
|
+ existing = pg_store.get_by_id(knowledge_id)
|
|
|
if not existing:
|
|
|
continue
|
|
|
|
|
|
@@ -1771,7 +1633,7 @@ async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
|
|
|
else:
|
|
|
eval_data["harmful"] = eval_data.get("harmful", 0) + 1
|
|
|
|
|
|
- milvus_store.update(knowledge_id, {"eval": eval_data})
|
|
|
+ pg_store.update(knowledge_id, {"eval": eval_data})
|
|
|
|
|
|
# 并发执行知识进化
|
|
|
if evolution_tasks:
|
|
|
@@ -1786,7 +1648,7 @@ async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
|
|
|
# 重新生成向量(只基于 task)
|
|
|
embedding = await get_embedding(task)
|
|
|
|
|
|
- milvus_store.update(knowledge_id, {
|
|
|
+ pg_store.update(knowledge_id, {
|
|
|
"content": evolved_content,
|
|
|
"eval": eval_data,
|
|
|
"embedding": embedding
|
|
|
@@ -1804,7 +1666,7 @@ async def slim_knowledge(model: str = "google/gemini-2.5-flash-lite"):
|
|
|
"""知识库瘦身:合并语义相似知识"""
|
|
|
try:
|
|
|
# 获取所有知识
|
|
|
- all_knowledge = milvus_store.query('id != ""', limit=10000)
|
|
|
+ all_knowledge = pg_store.query('id != ""', limit=10000)
|
|
|
# 转换为可序列化的格式
|
|
|
all_knowledge = [serialize_milvus_result(item) for item in all_knowledge]
|
|
|
|
|
|
@@ -1928,10 +1790,13 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
|
|
|
texts = [e['task'] for e in new_entries]
|
|
|
embeddings = await get_embeddings_batch(texts)
|
|
|
|
|
|
- # 清空并重建
|
|
|
- now = int(time.time())
|
|
|
- milvus_store.drop_collection()
|
|
|
- milvus_store._init_collection()
|
|
|
+ # 清空并重建(PostgreSQL使用TRUNCATE)
|
|
|
+ cursor = pg_store._get_cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute("TRUNCATE TABLE knowledge")
|
|
|
+ pg_store.conn.commit()
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
|
|
|
knowledge_list = []
|
|
|
for e, embedding in zip(new_entries, embeddings):
|
|
|
@@ -1971,7 +1836,7 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
|
|
|
"relationships": json.dumps([])
|
|
|
})
|
|
|
|
|
|
- milvus_store.insert_batch(knowledge_list)
|
|
|
+ pg_store.insert_batch(knowledge_list)
|
|
|
|
|
|
result_msg = f"瘦身完成:{len(all_knowledge)} → {len(new_entries)} 条知识"
|
|
|
if report_line:
|
|
|
@@ -2136,7 +2001,7 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn, backgro
|
|
|
|
|
|
# 批量插入
|
|
|
if knowledge_list:
|
|
|
- milvus_store.insert_batch(knowledge_list)
|
|
|
+ pg_store.insert_batch(knowledge_list)
|
|
|
background_tasks.add_task(knowledge_processor.process_pending)
|
|
|
|
|
|
print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识")
|