|
|
@@ -31,7 +31,10 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
from dotenv import load_dotenv
|
|
|
load_dotenv(Path(__file__).parent.parent / ".env")
|
|
|
|
|
|
-from agent.llm.openrouter import openrouter_llm_call
|
|
|
+from agent.llm import create_openrouter_llm_call, create_qwen_llm_call
|
|
|
+
|
|
|
+_dedup_llm = create_openrouter_llm_call(model="google/gemini-2.5-flash-lite")
|
|
|
+_tool_analysis_llm = create_qwen_llm_call(model="qwen3.5-plus")
|
|
|
|
|
|
# 导入向量存储和 embedding
|
|
|
from knowhub.vector_store import MilvusStore
|
|
|
@@ -413,6 +416,46 @@ Content: {new_content}
|
|
|
"""
|
|
|
|
|
|
|
|
|
+TOOL_ANALYSIS_PROMPT = """\
|
|
|
+分析以下知识条目,判断是否涉及"图像创作或解构任务中使用的工具"。
|
|
|
+
|
|
|
+工具范畴(包括但不限于):
|
|
|
+- AI 生图平台/模型:Midjourney、Stable Diffusion、DALL-E、Flux、ComfyUI
|
|
|
+- SD 插件/节点:ControlNet、IP-Adapter、InstantID、DWPose、DSINE
|
|
|
+- 图像处理库:rembg、PIL/Pillow、OpenCV、scikit-image
|
|
|
+- LoRA/checkpoint 模型、ComfyUI 自定义节点、AI 绘图辅助工具
|
|
|
+
|
|
|
+知识条目:
|
|
|
+task: {task}
|
|
|
+content: {content}
|
|
|
+
|
|
|
+要求:
|
|
|
+- 如果涉及上述工具,提取每个工具的信息并以 JSON 格式返回。
|
|
|
+- 如果不涉及任何工具,返回 {{"has_tools": false}}。
|
|
|
+- 只输出 JSON,不要输出其他内容。
|
|
|
+
|
|
|
+输出格式:
|
|
|
+{{
|
|
|
+ "has_tools": true,
|
|
|
+ "tools": [
|
|
|
+ {{
|
|
|
+ "name": "工具名称(原名)",
|
|
|
+ "slug": "小写英文短名,空格换下划线,如 controlnet、ip_adapter",
|
|
|
+ "category": "image_gen | image_process | model | plugin | workflow | other",
|
|
|
+ "version": "版本号或 null",
|
|
|
+ "description": "一句话功能介绍",
|
|
|
+ "usage": "核心用法",
|
|
|
+ "scenarios": ["应用场景1", "应用场景2"],
|
|
|
+ "input": "输入类型描述或 null",
|
|
|
+ "output": "输出类型描述或 null",
|
|
|
+ "source": "来源/文档链接或 null",
|
|
|
+ "status": "未接入"
|
|
|
+ }}
|
|
|
+ ]
|
|
|
+}}
|
|
|
+"""
|
|
|
+
|
|
|
+
|
|
|
# --- Dedup: RelationCache ---
|
|
|
|
|
|
class RelationCache:
|
|
|
@@ -451,10 +494,11 @@ class KnowledgeProcessor:
|
|
|
self._relation_cache = RelationCache()
|
|
|
|
|
|
async def process_pending(self):
|
|
|
- """持续处理 pending 知识直到队列为空,有锁防并发"""
|
|
|
+ """持续处理 pending 和 dedup_passed 知识直到队列为空,有锁防并发"""
|
|
|
if self._lock.locked():
|
|
|
return
|
|
|
async with self._lock:
|
|
|
+ # 第一阶段:处理 pending(去重)
|
|
|
while True:
|
|
|
try:
|
|
|
pending = milvus_store.query('status == "pending"', limit=50)
|
|
|
@@ -465,6 +509,17 @@ class KnowledgeProcessor:
|
|
|
break
|
|
|
for knowledge in pending:
|
|
|
await self._process_one(knowledge)
|
|
|
+ # 第二阶段:处理 dedup_passed(工具关联)
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ dedup_passed = milvus_store.query('status == "dedup_passed"', limit=50)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[KnowledgeProcessor] 查询 dedup_passed 失败: {e}")
|
|
|
+ break
|
|
|
+ if not dedup_passed:
|
|
|
+ break
|
|
|
+ for knowledge in dedup_passed:
|
|
|
+ await self._analyze_tool_relation(knowledge)
|
|
|
|
|
|
async def _process_one(self, knowledge: dict):
|
|
|
kid = knowledge["id"]
|
|
|
@@ -490,16 +545,16 @@ class KnowledgeProcessor:
|
|
|
candidates = [c for c in candidates if c.get("score", 0) >= 0.75]
|
|
|
|
|
|
if not candidates:
|
|
|
- milvus_store.update(kid, {"status": "approved", "updated_at": now})
|
|
|
+ milvus_store.update(kid, {"status": "dedup_passed", "updated_at": now})
|
|
|
return
|
|
|
|
|
|
llm_result = await self._llm_judge_relations(knowledge, candidates)
|
|
|
await self._apply_decision(knowledge, llm_result)
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[KnowledgeProcessor] 处理 {kid} 失败: {e},fallback 到 approved")
|
|
|
+ print(f"[KnowledgeProcessor] 处理 {kid} 失败: {e},回退到 pending")
|
|
|
try:
|
|
|
- milvus_store.update(kid, {"status": "approved", "updated_at": int(time.time())})
|
|
|
+ milvus_store.update(kid, {"status": "pending", "updated_at": int(time.time())})
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
@@ -515,9 +570,8 @@ class KnowledgeProcessor:
|
|
|
)
|
|
|
for attempt in range(3):
|
|
|
try:
|
|
|
- response = await openrouter_llm_call(
|
|
|
+ response = await _dedup_llm(
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
- model="google/gemini-2.5-flash-lite"
|
|
|
)
|
|
|
content = response.get("content", "").strip()
|
|
|
# 清理 markdown 代码块
|
|
|
@@ -603,26 +657,191 @@ class KnowledgeProcessor:
|
|
|
except Exception as e:
|
|
|
print(f"[Apply Decision] 更新旧知识关系 {old_id} 失败: {e}")
|
|
|
milvus_store.update(kid, {
|
|
|
- "status": "approved",
|
|
|
+ "status": "dedup_passed",
|
|
|
"relationships": json.dumps(new_relationships),
|
|
|
"updated_at": now
|
|
|
})
|
|
|
|
|
|
+ async def _llm_analyze_tools(self, knowledge: dict) -> dict:
|
|
|
+ """使用 LLM 分析知识中涉及的工具(复用迁移脚本逻辑)"""
|
|
|
+ task = (knowledge.get("task") or "")[:600]
|
|
|
+ content = (knowledge.get("content") or "")[:1200]
|
|
|
+ prompt = TOOL_ANALYSIS_PROMPT.format(task=task, content=content)
|
|
|
+ try:
|
|
|
+ response = await _tool_analysis_llm(
|
|
|
+ messages=[{"role": "user", "content": prompt}],
|
|
|
+ max_tokens=2048,
|
|
|
+ temperature=0.1,
|
|
|
+ )
|
|
|
+ raw = (response.get("content") or "").strip()
|
|
|
+ if raw.startswith("```"):
|
|
|
+ lines = raw.split("\n")
|
|
|
+ inner = []
|
|
|
+ in_block = False
|
|
|
+ for line in lines:
|
|
|
+ if line.startswith("```"):
|
|
|
+ in_block = not in_block
|
|
|
+ continue
|
|
|
+ if in_block:
|
|
|
+ inner.append(line)
|
|
|
+ raw = "\n".join(inner).strip()
|
|
|
+ return json.loads(raw)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[Tool Analysis LLM] 调用失败: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ async def _create_or_get_tool_resource(self, tool_info: dict) -> Optional[str]:
|
|
|
+ """创建或获取工具资源"""
|
|
|
+ category = tool_info.get("category", "other")
|
|
|
+ slug = tool_info.get("slug", "")
|
|
|
+ if not slug:
|
|
|
+ return None
|
|
|
+ tool_id = f"tools/{category}/{slug}"
|
|
|
+ conn = get_db()
|
|
|
+ try:
|
|
|
+ row = conn.execute("SELECT id FROM resources WHERE id = ?", (tool_id,)).fetchone()
|
|
|
+ if row:
|
|
|
+ return tool_id
|
|
|
+ now_str = datetime.now(timezone.utc).isoformat()
|
|
|
+ metadata = {
|
|
|
+ "tool_name": tool_info.get("name", ""),
|
|
|
+ "tool_slug": slug,
|
|
|
+ "category": category,
|
|
|
+ "version": tool_info.get("version", ""),
|
|
|
+ "description": tool_info.get("description", ""),
|
|
|
+ "usage": tool_info.get("usage", ""),
|
|
|
+ "scenarios": tool_info.get("scenarios", []),
|
|
|
+ "input": tool_info.get("input", ""),
|
|
|
+ "output": tool_info.get("output", ""),
|
|
|
+ "status": tool_info.get("status", "未接入"),
|
|
|
+ "knowledge_ids": []
|
|
|
+ }
|
|
|
+ conn.execute(
|
|
|
+ "INSERT INTO resources (id, title, body, content_type, metadata, submitted_by, created_at, updated_at)"
|
|
|
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
|
+ (tool_id, tool_info.get("name", slug), "", "tool",
|
|
|
+ json.dumps(metadata), "knowledge_processor", now_str, now_str),
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ print(f"[Tool Resource] 创建新工具: {tool_id}")
|
|
|
+ return tool_id
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ async def _update_tool_knowledge_index(self, tool_id: str, knowledge_id: str):
|
|
|
+ """更新工具资源的 knowledge_ids 索引"""
|
|
|
+ conn = get_db()
|
|
|
+ try:
|
|
|
+ row = conn.execute("SELECT metadata FROM resources WHERE id = ?", (tool_id,)).fetchone()
|
|
|
+ if not row:
|
|
|
+ return
|
|
|
+ metadata = json.loads(row["metadata"] or "{}")
|
|
|
+ knowledge_ids = metadata.get("knowledge_ids", [])
|
|
|
+ if knowledge_id not in knowledge_ids:
|
|
|
+ knowledge_ids.append(knowledge_id)
|
|
|
+ metadata["knowledge_ids"] = knowledge_ids
|
|
|
+ conn.execute(
|
|
|
+ "UPDATE resources SET metadata = ?, updated_at = ? WHERE id = ?",
|
|
|
+ (json.dumps(metadata), datetime.now(timezone.utc).isoformat(), tool_id)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ async def _analyze_tool_relation(self, knowledge: dict):
|
|
|
+ """分析知识与工具的关联关系"""
|
|
|
+ kid = knowledge["id"]
|
|
|
+ now = int(time.time())
|
|
|
+ # 乐观锁:dedup_passed → analyzing
|
|
|
+ try:
|
|
|
+ milvus_store.update(kid, {"status": "analyzing", "updated_at": now})
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[Tool Analysis] 锁定 {kid} 失败: {e}")
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ tool_analysis = await self._llm_analyze_tools(knowledge)
|
|
|
+ has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
|
|
|
+
|
|
|
+ existing_tags = knowledge.get("tags") or {}
|
|
|
+ has_tool_tag = existing_tags.get("tool") is True
|
|
|
+
|
|
|
+ # 情况1:LLM 判定无工具,但有 tool tag → 重新分析一次
|
|
|
+ if not has_tools and has_tool_tag:
|
|
|
+ print(f"[Tool Analysis] {kid} LLM 判定无工具但有 tool tag,重新分析")
|
|
|
+ tool_analysis = await self._llm_analyze_tools(knowledge)
|
|
|
+ has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
|
|
|
+ # 重新分析后仍然不一致 → 知识模糊,rejected
|
|
|
+ if not has_tools:
|
|
|
+ milvus_store.update(kid, {"status": "rejected", "updated_at": now})
|
|
|
+ print(f"[Tool Analysis] {kid} 两次判定不一致,知识模糊,rejected")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 情况2:无工具且无 tool tag → 直接 approved
|
|
|
+ if not has_tools:
|
|
|
+ milvus_store.update(kid, {"status": "approved", "updated_at": now})
|
|
|
+ return
|
|
|
+
|
|
|
+ # 情况3/4:有工具 → 创建资源并关联
|
|
|
+ tool_ids = []
|
|
|
+ for tool_info in (tool_analysis.get("tools") or []):
|
|
|
+ tool_id = await self._create_or_get_tool_resource(tool_info)
|
|
|
+ if tool_id:
|
|
|
+ tool_ids.append(tool_id)
|
|
|
+
|
|
|
+ existing_resource_ids = knowledge.get("resource_ids") or []
|
|
|
+ updated_resource_ids = list(set(existing_resource_ids + tool_ids))
|
|
|
+
|
|
|
+ updates: dict = {
|
|
|
+ "status": "approved",
|
|
|
+ "resource_ids": updated_resource_ids,
|
|
|
+ "updated_at": now
|
|
|
+ }
|
|
|
+ # 有工具但无 tool tag → 添加 tag
|
|
|
+ if not has_tool_tag:
|
|
|
+ updated_tags = dict(existing_tags)
|
|
|
+ updated_tags["tool"] = True
|
|
|
+ updates["tags"] = updated_tags
|
|
|
+ print(f"[Tool Analysis] {kid} 添加 tool tag")
|
|
|
+
|
|
|
+ milvus_store.update(kid, updates)
|
|
|
+
|
|
|
+ for tool_id in tool_ids:
|
|
|
+ await self._update_tool_knowledge_index(tool_id, kid)
|
|
|
+
|
|
|
+ print(f"[Tool Analysis] {kid} 关联了 {len(tool_ids)} 个工具")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[Tool Analysis] {kid} 分析失败: {e},回退到 dedup_passed")
|
|
|
+ try:
|
|
|
+ milvus_store.update(kid, {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
|
|
|
async def _periodic_processor():
|
|
|
- """每60秒检测超时 processing 条目(>5分钟)并回滚到 pending"""
|
|
|
+ """每60秒检测超时条目并回滚:processing(>5min)→pending,analyzing(>10min)→dedup_passed"""
|
|
|
while True:
|
|
|
await asyncio.sleep(60)
|
|
|
try:
|
|
|
- timeout_threshold = int(time.time()) - 300
|
|
|
+ now = int(time.time())
|
|
|
+ # 回滚超时的 processing(5分钟 → pending)
|
|
|
+ timeout_5min = now - 300
|
|
|
processing = milvus_store.query('status == "processing"', limit=200)
|
|
|
for item in processing:
|
|
|
updated_at = item.get("updated_at", 0) or 0
|
|
|
- # query 返回的 updated_at 已乘以 1000(毫秒),转回秒
|
|
|
updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
|
|
|
- if updated_at_sec < timeout_threshold:
|
|
|
- print(f"[Periodic] 回滚超时 processing: {item['id']}")
|
|
|
+ if updated_at_sec < timeout_5min:
|
|
|
+ print(f"[Periodic] 回滚超时 processing → pending: {item['id']}")
|
|
|
milvus_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
|
|
|
+ # 回滚超时的 analyzing(10分钟 → dedup_passed)
|
|
|
+ timeout_10min = now - 600
|
|
|
+ analyzing = milvus_store.query('status == "analyzing"', limit=200)
|
|
|
+ for item in analyzing:
|
|
|
+ updated_at = item.get("updated_at", 0) or 0
|
|
|
+ updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
|
|
|
+ if updated_at_sec < timeout_10min:
|
|
|
+ print(f"[Periodic] 回滚超时 analyzing → dedup_passed: {item['id']}")
|
|
|
+ milvus_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
except Exception as e:
|
|
|
print(f"[Periodic] 定时任务错误: {e}")
|
|
|
|
|
|
@@ -901,9 +1120,8 @@ async def _llm_rerank(query: str, candidates: list[dict], top_k: int) -> list[st
|
|
|
只输出 ID,不要其他内容。"""
|
|
|
|
|
|
try:
|
|
|
- response = await openrouter_llm_call(
|
|
|
+ response = await _dedup_llm(
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
- model="google/gemini-2.5-flash-lite"
|
|
|
)
|
|
|
|
|
|
content = response.get("content", "").strip()
|
|
|
@@ -1171,9 +1389,12 @@ def get_all_tags():
|
|
|
|
|
|
@app.get("/api/knowledge/pending")
|
|
|
def get_pending_knowledge(limit: int = Query(default=50, ge=1, le=200)):
|
|
|
- """查询待处理队列(pending + processing)"""
|
|
|
+ """查询待处理队列(pending + processing + dedup_passed + analyzing)"""
|
|
|
try:
|
|
|
- pending = milvus_store.query('status == "pending" or status == "processing"', limit=limit)
|
|
|
+ pending = milvus_store.query(
|
|
|
+ 'status == "pending" or status == "processing" or status == "dedup_passed" or status == "analyzing"',
|
|
|
+ limit=limit
|
|
|
+ )
|
|
|
serialized = [serialize_milvus_result(r) for r in pending]
|
|
|
return {"results": serialized, "count": len(serialized)}
|
|
|
except Exception as e:
|
|
|
@@ -1183,13 +1404,17 @@ def get_pending_knowledge(limit: int = Query(default=50, ge=1, le=200)):
|
|
|
|
|
|
@app.post("/api/knowledge/process")
|
|
|
async def trigger_process(force: bool = Query(default=False)):
|
|
|
- """手动触发去重处理。force=true 时先回滚所有 processing → pending"""
|
|
|
+ """手动触发去重处理。force=true 时先回滚所有 processing → pending,analyzing → dedup_passed"""
|
|
|
try:
|
|
|
if force:
|
|
|
processing = milvus_store.query('status == "processing"', limit=200)
|
|
|
for item in processing:
|
|
|
milvus_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
|
|
|
- print(f"[Manual Process] 回滚 {len(processing)} 条 processing")
|
|
|
+ print(f"[Manual Process] 回滚 {len(processing)} 条 processing → pending")
|
|
|
+ analyzing = milvus_store.query('status == "analyzing"', limit=200)
|
|
|
+ for item in analyzing:
|
|
|
+ milvus_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
|
|
|
+ print(f"[Manual Process] 回滚 {len(analyzing)} 条 analyzing → dedup_passed")
|
|
|
asyncio.create_task(knowledge_processor.process_pending())
|
|
|
return {"status": "ok", "message": "处理任务已触发"}
|
|
|
except Exception as e:
|
|
|
@@ -1264,9 +1489,8 @@ async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
|
|
|
4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。
|
|
|
"""
|
|
|
try:
|
|
|
- response = await openrouter_llm_call(
|
|
|
+ response = await _dedup_llm(
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
- model="google/gemini-2.5-flash-lite"
|
|
|
)
|
|
|
evolved = response.get("content", "").strip()
|
|
|
if len(evolved) < 5:
|
|
|
@@ -1624,9 +1848,9 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
|
|
|
禁止输出任何开场白或解释。"""
|
|
|
|
|
|
print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(all_knowledge)} 条知识...")
|
|
|
- response = await openrouter_llm_call(
|
|
|
+ slim_llm = create_openrouter_llm_call(model=model)
|
|
|
+ response = await slim_llm(
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
- model=model
|
|
|
)
|
|
|
content = response.get("content", "").strip()
|
|
|
if not content:
|
|
|
@@ -1822,9 +2046,8 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn, backgro
|
|
|
try:
|
|
|
print(f"\n[Extract] 正在从 {len(messages)} 条消息中提取知识...")
|
|
|
|
|
|
- response = await openrouter_llm_call(
|
|
|
+ response = await _dedup_llm(
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
- model="google/gemini-2.5-flash-lite"
|
|
|
)
|
|
|
|
|
|
content = response.get("content", "").strip()
|