#!/usr/bin/env python """ KnowHub 自包含 CLI =================== 三种查询/写入 KnowHub 的方式,都是直接 HTTP 调用,不依赖 cyber-agent 包。 1. `ask` —— 深度回顾:POST /api/agent,agent_type=remote_librarian(默认)或 remote_research 远端 Librarian Agent 会规划、检索、整合 → 带引用的自然语言回答 2. `search` —— 快速检索:GET /api/knowledge/search 语义搜索 + LLM 精排,返回结构化知识条目 3. `save` —— 保存知识:POST /api/knowledge 把单条知识写库(异步校验入库) ## 用法 python knowhub.py ask --query="..." python knowhub.py ask --query="..." --deep # 走 remote_research python knowhub.py ask --query="..." --continue_from=SUB_TRACE_ID python knowhub.py search --query="..." [--top_k=5] [--min_score=3] [--types=strategy,tool] [--owner=...] [--capability_id=CAP-001] [--tool_id=...] [--requirement_id=...] python knowhub.py save --task="..." --content="..." --types=strategy [--score=4] [--source_name=...] [--source_urls=u1,u2] [--tags='{"project":"xyz"}'] [--capability_ids=CAP-001,CAP-002] ## 环境变量(可选) KNOWHUB_API KnowHub 服务器地址 默认 http://43.106.118.91:9999 KNOWHUB_OWNER 默认所有者(save 和 search 均用) 默认 sunlit.howard@gmail.com .env 文件:在本 skill 目录下放 `.env`,本脚本会自动读取(仅 2 个变量,纯文本解析)。 ## 返回 stdout 输出 JSON: - ask: {"mode":"remote", "agent_type":..., "sub_trace_id":..., "status":..., "summary":..., "stats":..., "error":?} - search: {"query":..., "count":N, "results":[...]} - save: {"knowledge_id":..., "status":"..."} 退出码:成功 0,失败 1。 """ import argparse import asyncio import json import os import sys from pathlib import Path from typing import Any, Dict, List, Optional import httpx # ── 默认配置 ──────────────────────────────────────── DEFAULT_KNOWHUB_API = "http://43.106.118.91:9999" DEFAULT_OWNER = "sunlit.howard@gmail.com" DEFAULT_SCOPES = ["org:cybertogether"] ASK_TIMEOUT = 600.0 # Librarian agent 规划 + 多轮检索可能需要几分钟 SEARCH_TIMEOUT = 60.0 SAVE_TIMEOUT = 30.0 # ── .env 读取(超简版,仅认 KEY=VALUE 格式,不依赖 python-dotenv) ── def _load_local_env() -> None: """从本脚本同目录的 .env 加载 KNOWHUB_API / KNOWHUB_OWNER。现有 env 优先。""" env_file = Path(__file__).resolve().parent / ".env" if not env_file.exists(): return for line in env_file.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, val = line.split("=", 1) key, val = key.strip(), val.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = val _load_local_env() KNOWHUB_API = os.getenv("KNOWHUB_API", DEFAULT_KNOWHUB_API).rstrip("/") KNOWHUB_OWNER = os.getenv("KNOWHUB_OWNER", DEFAULT_OWNER) # ── 模式实现 ──────────────────────────────────────── async def ask( query: str, deep: bool = False, continue_from: Optional[str] = None, skills: Optional[List[str]] = None, ) -> Dict[str, Any]: """ 深度回顾:调用远端 Librarian / Research Agent。 deep=False → remote_librarian(整合已有知识库的回答) deep=True → remote_research(全网调研 + 入库,较慢) """ agent_type = "remote_research" if deep else "remote_librarian" payload = { "agent_type": agent_type, "task": query, "messages": None, "continue_from": continue_from, "skills": skills, } try: async with httpx.AsyncClient(timeout=ASK_TIMEOUT) as client: r = await client.post(f"{KNOWHUB_API}/api/agent", json=payload) r.raise_for_status() result = r.json() return { "mode": "remote", "agent_type": agent_type, "sub_trace_id": result.get("sub_trace_id"), "status": result.get("status", "completed"), "summary": result.get("summary", ""), "stats": result.get("stats", {}), "error": result.get("error"), } except httpx.HTTPStatusError as e: return { "mode": "remote", "agent_type": agent_type, "status": "failed", "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}", } except Exception as e: return { "mode": "remote", "agent_type": agent_type, "status": "failed", "error": f"{type(e).__name__}: {e}", } async def search( query: str, top_k: int = 5, min_score: int = 3, types: Optional[List[str]] = None, owner: Optional[str] = None, requirement_id: Optional[str] = None, capability_id: Optional[str] = None, tool_id: Optional[str] = None, ) -> Dict[str, Any]: """快速检索:调 /api/knowledge/search。""" params: Dict[str, Any] = {"q": query, "top_k": top_k, "min_score": min_score} if types: params["types"] = ",".join(types) if owner: # 显式覆盖才用;None 时不过滤(全库搜) params["owner"] = owner if requirement_id: params["requirement_id"] = requirement_id if capability_id: params["capability_id"] = capability_id if tool_id: params["tool_id"] = tool_id try: async with httpx.AsyncClient(timeout=SEARCH_TIMEOUT) as client: r = await client.get(f"{KNOWHUB_API}/api/knowledge/search", params=params) r.raise_for_status() data = r.json() return { "query": query, "count": data.get("count", 0), "results": data.get("results", []), } except httpx.HTTPStatusError as e: return {"query": query, "count": 0, "results": [], "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}"} except Exception as e: return {"query": query, "count": 0, "results": [], "error": f"{type(e).__name__}: {e}"} async def save( task: str, content: str, types: List[str], tags: Optional[Dict[str, str]] = None, scopes: Optional[List[str]] = None, owner: Optional[str] = None, resource_ids: Optional[List[str]] = None, source_name: str = "", source_category: str = "exp", source_urls: Optional[List[str]] = None, agent_id: str = "knowhub_cli", submitted_by: str = "", score: int = 3, message_id: str = "", capability_ids: Optional[List[str]] = None, tool_ids: Optional[List[str]] = None, ) -> Dict[str, Any]: """保存知识:POST /api/knowledge。""" payload = { "message_id": message_id, "types": types, "task": task, "tags": tags or {}, "scopes": scopes or DEFAULT_SCOPES, "owner": owner or KNOWHUB_OWNER, "content": content, "resource_ids": resource_ids or [], "source": { "name": source_name, "category": source_category, "urls": source_urls or [], "agent_id": agent_id, "submitted_by": submitted_by or KNOWHUB_OWNER, }, "eval": {"score": score, "helpful": 1, "harmful": 0, "confidence": 0.5}, "capability_ids": capability_ids or [], "tool_ids": tool_ids or [], } try: async with httpx.AsyncClient(timeout=SAVE_TIMEOUT) as client: r = await client.post(f"{KNOWHUB_API}/api/knowledge", json=payload) r.raise_for_status() data = r.json() return { "knowledge_id": data.get("knowledge_id"), "status": data.get("status", "submitted"), "owner": payload["owner"], } except httpx.HTTPStatusError as e: return {"knowledge_id": None, "status": "failed", "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}"} except Exception as e: return {"knowledge_id": None, "status": "failed", "error": f"{type(e).__name__}: {e}"} # ── 参数工具 ──────────────────────────────────────── def _split_csv(val: Optional[str]) -> Optional[List[str]]: """'a,b,c' → ['a','b','c'];None → None;空串 → None。""" if not val: return None parts = [x.strip() for x in val.split(",") if x.strip()] return parts or None def _parse_json_maybe(val: Optional[str]) -> Optional[Any]: """把字符串按 JSON 解析;解析失败则原样返回字符串。""" if val is None: return None try: return json.loads(val) except (json.JSONDecodeError, ValueError): return val # ── CLI ─────────────────────────────────────────── def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="KnowHub CLI - ask / search / save 知识库", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=f"默认 API: {KNOWHUB_API} 默认 owner: {KNOWHUB_OWNER}", ) sub = parser.add_subparsers(dest="cmd", required=True, metavar="{ask,search,save}") # ask p_ask = sub.add_parser("ask", help="深度回顾(远端 Librarian Agent)") p_ask.add_argument("--query", required=True) p_ask.add_argument("--deep", action="store_true", help="改走 remote_research,全网调研 + 入库") p_ask.add_argument("--continue_from", help="已有 sub_trace_id,传入则复用 Librarian 上下文") p_ask.add_argument("--skills", help="逗号分隔的 skill 名单(可选,由服务器白名单过滤)") # search p_s = sub.add_parser("search", help="快速检索(语义搜索 + 精排)") p_s.add_argument("--query", required=True) p_s.add_argument("--top_k", type=int, default=5) p_s.add_argument("--min_score", type=int, default=3) p_s.add_argument("--types", help="逗号分隔(user_profile/strategy/tool/usecase/definition/plan)") p_s.add_argument("--owner", help=f"覆盖默认 owner(默认不过滤,用 --owner={KNOWHUB_OWNER} 限定自己的)") p_s.add_argument("--requirement_id") p_s.add_argument("--capability_id") p_s.add_argument("--tool_id") # save p_sv = sub.add_parser("save", help="保存知识到 KnowHub") p_sv.add_argument("--task", required=True, help="任务描述:在什么情景下 + 要完成什么目标") p_sv.add_argument("--content", required=True, help="知识的核心内容") p_sv.add_argument("--types", required=True, help="逗号分隔的类型") p_sv.add_argument("--tags", help="JSON 字符串,如 '{\"project\":\"xyz\"}'") p_sv.add_argument("--scopes", help=f"逗号分隔(默认 {','.join(DEFAULT_SCOPES)})") p_sv.add_argument("--owner", help=f"覆盖默认 owner(默认 {KNOWHUB_OWNER})") p_sv.add_argument("--score", type=int, default=3, help="1-5,默认 3") p_sv.add_argument("--source_name", default="") p_sv.add_argument("--source_category", default="exp", help="paper/exp/skill/book") p_sv.add_argument("--source_urls", help="逗号分隔 URL 列表") p_sv.add_argument("--agent_id", default="knowhub_cli") p_sv.add_argument("--submitted_by", default="") p_sv.add_argument("--capability_ids", help="逗号分隔的能力 ID") p_sv.add_argument("--tool_ids", help="逗号分隔的工具 ID") p_sv.add_argument("--resource_ids", help="逗号分隔的资源 ID") p_sv.add_argument("--message_id", default="") return parser async def _dispatch(args) -> Dict[str, Any]: if args.cmd == "ask": return await ask( query=args.query, deep=args.deep, continue_from=args.continue_from, skills=_split_csv(args.skills), ) if args.cmd == "search": return await search( query=args.query, top_k=args.top_k, min_score=args.min_score, types=_split_csv(args.types), owner=args.owner, requirement_id=args.requirement_id, capability_id=args.capability_id, tool_id=args.tool_id, ) if args.cmd == "save": tags_val = _parse_json_maybe(args.tags) if args.tags else None return await save( task=args.task, content=args.content, types=_split_csv(args.types) or [], tags=tags_val if isinstance(tags_val, dict) else None, scopes=_split_csv(args.scopes), owner=args.owner, resource_ids=_split_csv(args.resource_ids), source_name=args.source_name, source_category=args.source_category, source_urls=_split_csv(args.source_urls), agent_id=args.agent_id, submitted_by=args.submitted_by, score=args.score, message_id=args.message_id, capability_ids=_split_csv(args.capability_ids), tool_ids=_split_csv(args.tool_ids), ) raise ValueError(f"未知命令: {args.cmd}") def main() -> int: args = _build_parser().parse_args() result = asyncio.run(_dispatch(args)) print(json.dumps(result, ensure_ascii=False, indent=2)) # 退出码:任何 status=failed 或 error 字段非空 → 1 if result.get("status") == "failed" or result.get("error"): return 1 return 0 if __name__ == "__main__": sys.exit(main())