| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- """
- 知识管理工具 - KnowHub API 封装
- 所有工具通过 HTTP API 调用 KnowHub Server。
- """
- import os
- import logging
- import httpx
- from typing import List, Dict, Optional, Any
- from agent.tools import tool, ToolResult, ToolContext
- logger = logging.getLogger(__name__)
- # KnowHub Server API 地址
- KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:8000")
- @tool(hidden_params=["context"])
- async def knowledge_search(
- query: str,
- top_k: int = 5,
- min_score: int = 3,
- types: Optional[List[str]] = None,
- owner: Optional[str] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 检索知识(两阶段:语义路由 + 质量精排)
- Args:
- query: 搜索查询(任务描述)
- top_k: 返回数量(默认 5)
- min_score: 最低评分过滤(默认 3)
- types: 按类型过滤(user_profile/strategy/tool/usecase/definition/plan)
- owner: 按所有者过滤(可选)
- context: 工具上下文
- Returns:
- 相关知识列表
- """
- try:
- params = {
- "q": query,
- "top_k": top_k,
- "min_score": min_score,
- }
- if types:
- params["types"] = ",".join(types)
- if owner:
- params["owner"] = owner
- async with httpx.AsyncClient(timeout=60.0) as client:
- response = await client.get(f"{KNOWHUB_API}/api/knowledge/search", params=params)
- response.raise_for_status()
- data = response.json()
- results = data.get("results", [])
- count = data.get("count", 0)
- if not results:
- return ToolResult(
- title="🔍 未找到相关知识",
- output=f"查询: {query}\n\n知识库中暂无相关的高质量知识。",
- long_term_memory=f"知识检索: 未找到相关知识 - {query[:50]}"
- )
- # 格式化输出
- output_lines = [f"查询: {query}\n", f"找到 {count} 条相关知识:\n"]
- for idx, item in enumerate(results, 1):
- eval_data = item.get("eval", {})
- score = eval_data.get("score", 3)
- output_lines.append(f"\n### {idx}. [{item['id']}] (⭐ {score})")
- output_lines.append(f"**任务**: {item['task'][:150]}...")
- output_lines.append(f"**内容**: {item['content'][:200]}...")
- return ToolResult(
- title="✅ 知识检索成功",
- output="\n".join(output_lines),
- long_term_memory=f"知识检索: 找到 {count} 条相关知识 - {query[:50]}",
- metadata={
- "count": count,
- "knowledge_ids": [item["id"] for item in results],
- "items": results
- }
- )
- except Exception as e:
- logger.error(f"知识检索失败: {e}")
- return ToolResult(
- title="❌ 检索失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(
- hidden_params=["context", "owner"],
- inject_params={
- "owner": lambda ctx: ctx.get("knowledge_config", {}).get("owner") if ctx else None,
- "tags": lambda ctx, args: {
- **ctx.get("knowledge_config", {}).get("default_tags", {}),
- **(args.get("tags") or {})
- } if ctx else args.get("tags"),
- "scopes": lambda ctx, args: (args.get("scopes") or []) + (ctx.get("knowledge_config", {}).get("default_scopes") or []) if ctx else args.get("scopes"),
- }
- )
- async def knowledge_save(
- task: str,
- content: str,
- types: List[str],
- tags: Optional[Dict[str, str]] = None,
- scopes: Optional[List[str]] = None,
- owner: Optional[str] = None,
- resource_ids: Optional[List[str]] = None,
- source_name: str = "",
- source_category: str = "exp",
- urls: List[str] = None,
- agent_id: str = "research_agent",
- submitted_by: str = "",
- score: int = 3,
- message_id: str = "",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 保存新知识
- Args:
- task: 任务描述(在什么情景下 + 要完成什么目标)
- content: 核心内容
- types: 知识类型标签,可选:user_profile, strategy, tool, usecase, definition, plan
- tags: 业务标签(JSON 对象)
- scopes: 可见范围(默认 ["org:cybertogether"])
- owner: 所有者(默认 agent:{agent_id})
- resource_ids: 关联的资源 ID 列表(可选)
- source_name: 来源名称
- source_category: 来源类别(paper/exp/skill/book)
- urls: 参考来源链接列表
- agent_id: 执行此调研的 agent ID
- submitted_by: 提交者
- score: 初始评分 1-5(默认 3)
- message_id: 来源 Message ID
- context: 工具上下文
- Returns:
- 保存结果
- """
- try:
- # 设置默认值(在 agent 代码中,不是服务器端)
- if scopes is None:
- scopes = ["org:cybertogether"]
- if owner is None:
- owner = f"agent:{agent_id}"
- payload = {
- "message_id": message_id,
- "types": types,
- "task": task,
- "tags": tags or {},
- "scopes": scopes,
- "owner": owner,
- "content": content,
- "resource_ids": resource_ids or [],
- "source": {
- "name": source_name,
- "category": source_category,
- "urls": urls or [],
- "agent_id": agent_id,
- "submitted_by": submitted_by,
- },
- "eval": {
- "score": score,
- "helpful": 1,
- "harmful": 0,
- "confidence": 0.5,
- }
- }
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.post(f"{KNOWHUB_API}/api/knowledge", json=payload)
- response.raise_for_status()
- data = response.json()
- knowledge_id = data.get("knowledge_id", "unknown")
- return ToolResult(
- title="✅ 知识已保存",
- output=f"知识 ID: {knowledge_id}\n\n任务:\n{task[:100]}...",
- long_term_memory=f"保存知识: {knowledge_id} - {task[:50]}",
- metadata={"knowledge_id": knowledge_id}
- )
- except Exception as e:
- logger.error(f"保存知识失败: {e}")
- return ToolResult(
- title="❌ 保存失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(hidden_params=["context"])
- async def knowledge_update(
- knowledge_id: str,
- add_helpful_case: Optional[Dict] = None,
- add_harmful_case: Optional[Dict] = None,
- update_score: Optional[int] = None,
- evolve_feedback: Optional[str] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 更新已有知识的评估反馈
- Args:
- knowledge_id: 知识 ID
- add_helpful_case: 添加好用的案例
- add_harmful_case: 添加不好用的案例
- update_score: 更新评分(1-5)
- evolve_feedback: 经验进化反馈(触发 LLM 重写)
- context: 工具上下文
- Returns:
- 更新结果
- """
- try:
- payload = {}
- if add_helpful_case:
- payload["add_helpful_case"] = add_helpful_case
- if add_harmful_case:
- payload["add_harmful_case"] = add_harmful_case
- if update_score is not None:
- payload["update_score"] = update_score
- if evolve_feedback:
- payload["evolve_feedback"] = evolve_feedback
- if not payload:
- return ToolResult(
- title="⚠️ 无更新",
- output="未指定任何更新内容",
- long_term_memory="尝试更新知识但未指定更新内容"
- )
- async with httpx.AsyncClient(timeout=60.0) as client:
- response = await client.put(f"{KNOWHUB_API}/api/knowledge/{knowledge_id}", json=payload)
- response.raise_for_status()
- summary = []
- if add_helpful_case:
- summary.append("添加 helpful 案例")
- if add_harmful_case:
- summary.append("添加 harmful 案例")
- if update_score is not None:
- summary.append(f"更新评分: {update_score}")
- if evolve_feedback:
- summary.append("知识进化: 基于反馈重写内容")
- return ToolResult(
- title="✅ 知识已更新",
- output=f"知识 ID: {knowledge_id}\n\n更新内容:\n" + "\n".join(f"- {s}" for s in summary),
- long_term_memory=f"更新知识: {knowledge_id}"
- )
- except Exception as e:
- logger.error(f"更新知识失败: {e}")
- return ToolResult(
- title="❌ 更新失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(hidden_params=["context"])
- async def knowledge_batch_update(
- feedback_list: List[Dict[str, Any]],
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 批量反馈知识的有效性
- Args:
- feedback_list: 评价列表,每个元素包含:
- - knowledge_id: (str) 知识 ID
- - is_effective: (bool) 是否有效
- - feedback: (str, optional) 改进建议,若有效且有建议则触发知识进化
- context: 工具上下文
- Returns:
- 批量更新结果
- """
- try:
- if not feedback_list:
- return ToolResult(
- title="⚠️ 反馈列表为空",
- output="未提供任何反馈",
- long_term_memory="批量更新知识: 反馈列表为空"
- )
- payload = {"feedback_list": feedback_list}
- async with httpx.AsyncClient(timeout=120.0) as client:
- response = await client.post(f"{KNOWHUB_API}/api/knowledge/batch_update", json=payload)
- response.raise_for_status()
- data = response.json()
- updated = data.get("updated", 0)
- return ToolResult(
- title="✅ 批量更新完成",
- output=f"成功更新 {updated} 条知识",
- long_term_memory=f"批量更新知识: 成功 {updated} 条"
- )
- except Exception as e:
- logger.error(f"列出知识失败: {e}")
- return ToolResult(
- title="❌ 列表失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(hidden_params=["context"])
- async def knowledge_list(
- limit: int = 10,
- types: Optional[List[str]] = None,
- scopes: Optional[List[str]] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 列出已保存的知识
- Args:
- limit: 返回数量限制(默认 10)
- types: 按类型过滤(可选)
- scopes: 按范围过滤(可选)
- context: 工具上下文
- Returns:
- 知识列表
- """
- try:
- params = {"limit": limit}
- if types:
- params["types"] = ",".join(types)
- if scopes:
- params["scopes"] = ",".join(scopes)
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.get(f"{KNOWHUB_API}/api/knowledge", params=params)
- response.raise_for_status()
- data = response.json()
- results = data.get("results", [])
- count = data.get("count", 0)
- if not results:
- return ToolResult(
- title="📂 知识库为空",
- output="还没有保存任何知识",
- long_term_memory="知识库为空"
- )
- output_lines = [f"共找到 {count} 条知识:\n"]
- for item in results:
- eval_data = item.get("eval", {})
- score = eval_data.get("score", 3)
- output_lines.append(f"- [{item['id']}] (⭐{score}) {item['task'][:60]}...")
- return ToolResult(
- title="📚 知识列表",
- output="\n".join(output_lines),
- long_term_memory=f"列出 {count} 条知识"
- )
- except Exception as e:
- logger.error(f"列出知识失败: {e}")
- return ToolResult(
- title="❌ 列表失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(hidden_params=["context"])
- async def knowledge_slim(
- model: str = "google/gemini-2.0-flash-001",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 知识库瘦身:调用顶级大模型,将知识库中语义相似的知识合并精简
- Args:
- model: 使用的模型(默认 gemini-2.0-flash-001)
- context: 工具上下文
- Returns:
- 瘦身结果报告
- """
- try:
- async with httpx.AsyncClient(timeout=300.0) as client:
- response = await client.post(f"{KNOWHUB_API}/api/knowledge/slim", params={"model": model})
- response.raise_for_status()
- data = response.json()
- before = data.get("before", 0)
- after = data.get("after", 0)
- report = data.get("report", "")
- result = f"瘦身完成:{before} → {after} 条知识"
- if report:
- result += f"\n{report}"
- return ToolResult(
- title="✅ 知识库瘦身完成",
- output=result,
- long_term_memory=f"知识库瘦身: {before} → {after} 条"
- )
- except Exception as e:
- logger.error(f"知识库瘦身失败: {e}")
- return ToolResult(
- title="❌ 瘦身失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- # ==================== Resource 资源管理工具 ====================
- @tool(hidden_params=["context"])
- async def resource_save(
- resource_id: str,
- title: str,
- body: str,
- content_type: str = "text",
- secure_body: str = "",
- metadata: Optional[Dict[str, Any]] = None,
- submitted_by: str = "",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 保存资源(代码片段、凭证、Cookie 等)
- Args:
- resource_id: 资源 ID(层级路径,如 "code/selenium/login" 或 "credentials/website_a")
- title: 资源标题
- body: 公开内容(明文存储,可搜索)
- content_type: 内容类型(text/code/credential/cookie)
- secure_body: 敏感内容(加密存储,需要组织密钥访问)
- metadata: 元数据(如 {"language": "python", "acquired_at": "2026-03-06T10:00:00Z"})
- submitted_by: 提交者
- context: 工具上下文
- Returns:
- 保存结果
- """
- try:
- payload = {
- "id": resource_id,
- "title": title,
- "body": body,
- "secure_body": secure_body,
- "content_type": content_type,
- "metadata": metadata or {},
- "submitted_by": submitted_by,
- }
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.post(f"{KNOWHUB_API}/api/resource", json=payload)
- response.raise_for_status()
- data = response.json()
- return ToolResult(
- title="✅ 资源已保存",
- output=f"资源 ID: {resource_id}\n类型: {content_type}\n标题: {title}",
- long_term_memory=f"保存资源: {resource_id} ({content_type})",
- metadata={"resource_id": resource_id}
- )
- except Exception as e:
- logger.error(f"保存资源失败: {e}")
- return ToolResult(
- title="❌ 保存失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(hidden_params=["context"])
- async def resource_get(
- resource_id: str,
- org_key: Optional[str] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 获取资源内容
- Args:
- resource_id: 资源 ID(层级路径)
- org_key: 组织密钥(用于解密敏感内容,可选)
- context: 工具上下文
- Returns:
- 资源内容
- """
- try:
- headers = {}
- if org_key:
- headers["X-Org-Key"] = org_key
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.get(
- f"{KNOWHUB_API}/api/resource/{resource_id}",
- headers=headers
- )
- response.raise_for_status()
- data = response.json()
- output = f"资源 ID: {data['id']}\n"
- output += f"标题: {data['title']}\n"
- output += f"类型: {data['content_type']}\n"
- output += f"\n公开内容:\n{data['body']}\n"
- if data.get('secure_body'):
- output += f"\n敏感内容:\n{data['secure_body']}\n"
- return ToolResult(
- title=f"📦 {data['title']}",
- output=output,
- metadata=data
- )
- except Exception as e:
- logger.error(f"获取资源失败: {e}")
- return ToolResult(
- title="❌ 获取失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
|