""" 知识管理工具 - KnowHub API 封装 所有工具通过 HTTP API 调用 KnowHub Server。 """ import os import logging import httpx from typing import List, Dict, Optional, Any from agent.tools import tool, ToolResult, ToolContext logger = logging.getLogger(__name__) # KnowHub Server API 地址 KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:8000") @tool(hidden_params=["context"]) async def knowledge_search( query: str, top_k: int = 5, min_score: int = 3, types: Optional[List[str]] = None, owner: Optional[str] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 检索知识(两阶段:语义路由 + 质量精排) Args: query: 搜索查询(任务描述) top_k: 返回数量(默认 5) min_score: 最低评分过滤(默认 3) types: 按类型过滤(user_profile/strategy/tool/usecase/definition/plan) owner: 按所有者过滤(可选) context: 工具上下文 Returns: 相关知识列表 """ try: params = { "q": query, "top_k": top_k, "min_score": min_score, } if types: params["types"] = ",".join(types) if owner: params["owner"] = owner async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(f"{KNOWHUB_API}/api/knowledge/search", params=params) response.raise_for_status() data = response.json() results = data.get("results", []) count = data.get("count", 0) if not results: return ToolResult( title="🔍 未找到相关知识", output=f"查询: {query}\n\n知识库中暂无相关的高质量知识。", long_term_memory=f"知识检索: 未找到相关知识 - {query[:50]}" ) # 格式化输出 output_lines = [f"查询: {query}\n", f"找到 {count} 条相关知识:\n"] for idx, item in enumerate(results, 1): eval_data = item.get("eval", {}) score = eval_data.get("score", 3) output_lines.append(f"\n### {idx}. [{item['id']}] (⭐ {score})") output_lines.append(f"**任务**: {item['task'][:150]}...") output_lines.append(f"**内容**: {item['content'][:200]}...") return ToolResult( title="✅ 知识检索成功", output="\n".join(output_lines), long_term_memory=f"知识检索: 找到 {count} 条相关知识 - {query[:50]}", metadata={ "count": count, "knowledge_ids": [item["id"] for item in results], "items": results } ) except Exception as e: logger.error(f"知识检索失败: {e}") return ToolResult( title="❌ 检索失败", output=f"错误: {str(e)}", error=str(e) ) @tool( hidden_params=["context", "owner"], inject_params={ "owner": lambda ctx: ctx.get("knowledge_config", {}).get("owner") if ctx else None, "tags": lambda ctx, args: { **ctx.get("knowledge_config", {}).get("default_tags", {}), **(args.get("tags") or {}) } if ctx else args.get("tags"), "scopes": lambda ctx, args: (args.get("scopes") or []) + (ctx.get("knowledge_config", {}).get("default_scopes") or []) if ctx else args.get("scopes"), } ) async def knowledge_save( task: str, content: str, types: List[str], tags: Optional[Dict[str, str]] = None, scopes: Optional[List[str]] = None, owner: Optional[str] = None, resource_ids: Optional[List[str]] = None, source_name: str = "", source_category: str = "exp", urls: List[str] = None, agent_id: str = "research_agent", submitted_by: str = "", score: int = 3, message_id: str = "", context: Optional[ToolContext] = None, ) -> ToolResult: """ 保存新知识 Args: task: 任务描述(在什么情景下 + 要完成什么目标) content: 核心内容 types: 知识类型标签,可选:user_profile, strategy, tool, usecase, definition, plan tags: 业务标签(JSON 对象) scopes: 可见范围(默认 ["org:cybertogether"]) owner: 所有者(默认 agent:{agent_id}) resource_ids: 关联的资源 ID 列表(可选) source_name: 来源名称 source_category: 来源类别(paper/exp/skill/book) urls: 参考来源链接列表 agent_id: 执行此调研的 agent ID submitted_by: 提交者 score: 初始评分 1-5(默认 3) message_id: 来源 Message ID context: 工具上下文 Returns: 保存结果 """ try: # 设置默认值(在 agent 代码中,不是服务器端) if scopes is None: scopes = ["org:cybertogether"] if owner is None: owner = f"agent:{agent_id}" payload = { "message_id": message_id, "types": types, "task": task, "tags": tags or {}, "scopes": scopes, "owner": owner, "content": content, "resource_ids": resource_ids or [], "source": { "name": source_name, "category": source_category, "urls": urls or [], "agent_id": agent_id, "submitted_by": submitted_by, }, "eval": { "score": score, "helpful": 1, "harmful": 0, "confidence": 0.5, } } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(f"{KNOWHUB_API}/api/knowledge", json=payload) response.raise_for_status() data = response.json() knowledge_id = data.get("knowledge_id", "unknown") return ToolResult( title="✅ 知识已保存", output=f"知识 ID: {knowledge_id}\n\n任务:\n{task[:100]}...", long_term_memory=f"保存知识: {knowledge_id} - {task[:50]}", metadata={"knowledge_id": knowledge_id} ) except Exception as e: logger.error(f"保存知识失败: {e}") return ToolResult( title="❌ 保存失败", output=f"错误: {str(e)}", error=str(e) ) @tool(hidden_params=["context"]) async def knowledge_update( knowledge_id: str, add_helpful_case: Optional[Dict] = None, add_harmful_case: Optional[Dict] = None, update_score: Optional[int] = None, evolve_feedback: Optional[str] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 更新已有知识的评估反馈 Args: knowledge_id: 知识 ID add_helpful_case: 添加好用的案例 add_harmful_case: 添加不好用的案例 update_score: 更新评分(1-5) evolve_feedback: 经验进化反馈(触发 LLM 重写) context: 工具上下文 Returns: 更新结果 """ try: payload = {} if add_helpful_case: payload["add_helpful_case"] = add_helpful_case if add_harmful_case: payload["add_harmful_case"] = add_harmful_case if update_score is not None: payload["update_score"] = update_score if evolve_feedback: payload["evolve_feedback"] = evolve_feedback if not payload: return ToolResult( title="⚠️ 无更新", output="未指定任何更新内容", long_term_memory="尝试更新知识但未指定更新内容" ) async with httpx.AsyncClient(timeout=60.0) as client: response = await client.put(f"{KNOWHUB_API}/api/knowledge/{knowledge_id}", json=payload) response.raise_for_status() summary = [] if add_helpful_case: summary.append("添加 helpful 案例") if add_harmful_case: summary.append("添加 harmful 案例") if update_score is not None: summary.append(f"更新评分: {update_score}") if evolve_feedback: summary.append("知识进化: 基于反馈重写内容") return ToolResult( title="✅ 知识已更新", output=f"知识 ID: {knowledge_id}\n\n更新内容:\n" + "\n".join(f"- {s}" for s in summary), long_term_memory=f"更新知识: {knowledge_id}" ) except Exception as e: logger.error(f"更新知识失败: {e}") return ToolResult( title="❌ 更新失败", output=f"错误: {str(e)}", error=str(e) ) @tool(hidden_params=["context"]) async def knowledge_batch_update( feedback_list: List[Dict[str, Any]], context: Optional[ToolContext] = None, ) -> ToolResult: """ 批量反馈知识的有效性 Args: feedback_list: 评价列表,每个元素包含: - knowledge_id: (str) 知识 ID - is_effective: (bool) 是否有效 - feedback: (str, optional) 改进建议,若有效且有建议则触发知识进化 context: 工具上下文 Returns: 批量更新结果 """ try: if not feedback_list: return ToolResult( title="⚠️ 反馈列表为空", output="未提供任何反馈", long_term_memory="批量更新知识: 反馈列表为空" ) payload = {"feedback_list": feedback_list} async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post(f"{KNOWHUB_API}/api/knowledge/batch_update", json=payload) response.raise_for_status() data = response.json() updated = data.get("updated", 0) return ToolResult( title="✅ 批量更新完成", output=f"成功更新 {updated} 条知识", long_term_memory=f"批量更新知识: 成功 {updated} 条" ) except Exception as e: logger.error(f"列出知识失败: {e}") return ToolResult( title="❌ 列表失败", output=f"错误: {str(e)}", error=str(e) ) @tool(hidden_params=["context"]) async def knowledge_list( limit: int = 10, types: Optional[List[str]] = None, scopes: Optional[List[str]] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 列出已保存的知识 Args: limit: 返回数量限制(默认 10) types: 按类型过滤(可选) scopes: 按范围过滤(可选) context: 工具上下文 Returns: 知识列表 """ try: params = {"limit": limit} if types: params["types"] = ",".join(types) if scopes: params["scopes"] = ",".join(scopes) async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(f"{KNOWHUB_API}/api/knowledge", params=params) response.raise_for_status() data = response.json() results = data.get("results", []) count = data.get("count", 0) if not results: return ToolResult( title="📂 知识库为空", output="还没有保存任何知识", long_term_memory="知识库为空" ) output_lines = [f"共找到 {count} 条知识:\n"] for item in results: eval_data = item.get("eval", {}) score = eval_data.get("score", 3) output_lines.append(f"- [{item['id']}] (⭐{score}) {item['task'][:60]}...") return ToolResult( title="📚 知识列表", output="\n".join(output_lines), long_term_memory=f"列出 {count} 条知识" ) except Exception as e: logger.error(f"列出知识失败: {e}") return ToolResult( title="❌ 列表失败", output=f"错误: {str(e)}", error=str(e) ) @tool(hidden_params=["context"]) async def knowledge_slim( model: str = "google/gemini-2.0-flash-001", context: Optional[ToolContext] = None, ) -> ToolResult: """ 知识库瘦身:调用顶级大模型,将知识库中语义相似的知识合并精简 Args: model: 使用的模型(默认 gemini-2.0-flash-001) context: 工具上下文 Returns: 瘦身结果报告 """ try: async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post(f"{KNOWHUB_API}/api/knowledge/slim", params={"model": model}) response.raise_for_status() data = response.json() before = data.get("before", 0) after = data.get("after", 0) report = data.get("report", "") result = f"瘦身完成:{before} → {after} 条知识" if report: result += f"\n{report}" return ToolResult( title="✅ 知识库瘦身完成", output=result, long_term_memory=f"知识库瘦身: {before} → {after} 条" ) except Exception as e: logger.error(f"知识库瘦身失败: {e}") return ToolResult( title="❌ 瘦身失败", output=f"错误: {str(e)}", error=str(e) ) # ==================== Resource 资源管理工具 ==================== @tool(hidden_params=["context"]) async def resource_save( resource_id: str, title: str, body: str, content_type: str = "text", secure_body: str = "", metadata: Optional[Dict[str, Any]] = None, submitted_by: str = "", context: Optional[ToolContext] = None, ) -> ToolResult: """ 保存资源(代码片段、凭证、Cookie 等) Args: resource_id: 资源 ID(层级路径,如 "code/selenium/login" 或 "credentials/website_a") title: 资源标题 body: 公开内容(明文存储,可搜索) content_type: 内容类型(text/code/credential/cookie) secure_body: 敏感内容(加密存储,需要组织密钥访问) metadata: 元数据(如 {"language": "python", "acquired_at": "2026-03-06T10:00:00Z"}) submitted_by: 提交者 context: 工具上下文 Returns: 保存结果 """ try: payload = { "id": resource_id, "title": title, "body": body, "secure_body": secure_body, "content_type": content_type, "metadata": metadata or {}, "submitted_by": submitted_by, } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(f"{KNOWHUB_API}/api/resource", json=payload) response.raise_for_status() data = response.json() return ToolResult( title="✅ 资源已保存", output=f"资源 ID: {resource_id}\n类型: {content_type}\n标题: {title}", long_term_memory=f"保存资源: {resource_id} ({content_type})", metadata={"resource_id": resource_id} ) except Exception as e: logger.error(f"保存资源失败: {e}") return ToolResult( title="❌ 保存失败", output=f"错误: {str(e)}", error=str(e) ) @tool(hidden_params=["context"]) async def resource_get( resource_id: str, org_key: Optional[str] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 获取资源内容 Args: resource_id: 资源 ID(层级路径) org_key: 组织密钥(用于解密敏感内容,可选) context: 工具上下文 Returns: 资源内容 """ try: headers = {} if org_key: headers["X-Org-Key"] = org_key async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{KNOWHUB_API}/api/resource/{resource_id}", headers=headers ) response.raise_for_status() data = response.json() output = f"资源 ID: {data['id']}\n" output += f"标题: {data['title']}\n" output += f"类型: {data['content_type']}\n" output += f"\n公开内容:\n{data['body']}\n" if data.get('secure_body'): output += f"\n敏感内容:\n{data['secure_body']}\n" return ToolResult( title=f"📦 {data['title']}", output=output, metadata=data ) except Exception as e: logger.error(f"获取资源失败: {e}") return ToolResult( title="❌ 获取失败", output=f"错误: {str(e)}", error=str(e) )