| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- #!/usr/bin/env python
- """
- KnowHub 自包含 CLI
- ===================
- 三种查询/写入 KnowHub 的方式,都是直接 HTTP 调用,不依赖 cyber-agent 包。
- 1. `ask` —— 深度回顾:POST /api/agent,agent_type=remote_librarian(默认)或 remote_research
- 远端 Librarian Agent 会规划、检索、整合 → 带引用的自然语言回答
- 2. `search` —— 快速检索:GET /api/knowledge/search
- 语义搜索 + LLM 精排,返回结构化知识条目
- 3. `save` —— 保存知识:POST /api/knowledge
- 把单条知识写库(异步校验入库)
- ## 用法
- python knowhub.py ask --query="..."
- python knowhub.py ask --query="..." --deep # 走 remote_research
- python knowhub.py ask --query="..." --continue_from=SUB_TRACE_ID
- python knowhub.py search --query="..." [--top_k=5] [--min_score=3]
- [--types=strategy,tool] [--owner=...]
- [--capability_id=CAP-001]
- [--tool_id=...] [--requirement_id=...]
- python knowhub.py save --task="..." --content="..." --types=strategy
- [--score=4] [--source_name=...] [--source_urls=u1,u2]
- [--tags='{"project":"xyz"}']
- [--capability_ids=CAP-001,CAP-002]
- ## 环境变量(可选)
- KNOWHUB_API KnowHub 服务器地址 默认 http://43.106.118.91:9999
- KNOWHUB_OWNER 默认所有者(save 和 search 均用) 默认 sunlit.howard@gmail.com
- .env 文件:在本 skill 目录下放 `.env`,本脚本会自动读取(仅 2 个变量,纯文本解析)。
- ## 返回
- stdout 输出 JSON:
- - ask: {"mode":"remote", "agent_type":..., "sub_trace_id":..., "status":..., "summary":..., "stats":..., "error":?}
- - search: {"query":..., "count":N, "results":[...]}
- - save: {"knowledge_id":..., "status":"..."}
- 退出码:成功 0,失败 1。
- """
- import argparse
- import asyncio
- import json
- import os
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import httpx
- # ── 默认配置 ────────────────────────────────────────
- DEFAULT_KNOWHUB_API = "http://43.106.118.91:9999"
- DEFAULT_OWNER = "sunlit.howard@gmail.com"
- DEFAULT_SCOPES = ["org:cybertogether"]
- ASK_TIMEOUT = 600.0 # Librarian agent 规划 + 多轮检索可能需要几分钟
- SEARCH_TIMEOUT = 60.0
- SAVE_TIMEOUT = 30.0
- # ── .env 读取(超简版,仅认 KEY=VALUE 格式,不依赖 python-dotenv) ──
- def _load_local_env() -> None:
- """从本脚本同目录的 .env 加载 KNOWHUB_API / KNOWHUB_OWNER。现有 env 优先。"""
- env_file = Path(__file__).resolve().parent / ".env"
- if not env_file.exists():
- return
- for line in env_file.read_text(encoding="utf-8").splitlines():
- line = line.strip()
- if not line or line.startswith("#") or "=" not in line:
- continue
- key, val = line.split("=", 1)
- key, val = key.strip(), val.strip().strip('"').strip("'")
- if key and key not in os.environ:
- os.environ[key] = val
- _load_local_env()
- KNOWHUB_API = os.getenv("KNOWHUB_API", DEFAULT_KNOWHUB_API).rstrip("/")
- KNOWHUB_OWNER = os.getenv("KNOWHUB_OWNER", DEFAULT_OWNER)
- # ── 模式实现 ────────────────────────────────────────
- async def ask(
- query: str,
- deep: bool = False,
- continue_from: Optional[str] = None,
- skills: Optional[List[str]] = None,
- ) -> Dict[str, Any]:
- """
- 深度回顾:调用远端 Librarian / Research Agent。
- deep=False → remote_librarian(整合已有知识库的回答)
- deep=True → remote_research(全网调研 + 入库,较慢)
- """
- agent_type = "remote_research" if deep else "remote_librarian"
- payload = {
- "agent_type": agent_type,
- "task": query,
- "messages": None,
- "continue_from": continue_from,
- "skills": skills,
- }
- try:
- async with httpx.AsyncClient(timeout=ASK_TIMEOUT) as client:
- r = await client.post(f"{KNOWHUB_API}/api/agent", json=payload)
- r.raise_for_status()
- result = r.json()
- return {
- "mode": "remote",
- "agent_type": agent_type,
- "sub_trace_id": result.get("sub_trace_id"),
- "status": result.get("status", "completed"),
- "summary": result.get("summary", ""),
- "stats": result.get("stats", {}),
- "error": result.get("error"),
- }
- except httpx.HTTPStatusError as e:
- return {
- "mode": "remote", "agent_type": agent_type, "status": "failed",
- "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}",
- }
- except Exception as e:
- return {
- "mode": "remote", "agent_type": agent_type, "status": "failed",
- "error": f"{type(e).__name__}: {e}",
- }
- async def search(
- query: str,
- top_k: int = 5,
- min_score: int = 3,
- types: Optional[List[str]] = None,
- owner: Optional[str] = None,
- requirement_id: Optional[str] = None,
- capability_id: Optional[str] = None,
- tool_id: Optional[str] = None,
- ) -> Dict[str, Any]:
- """快速检索:调 /api/knowledge/search。"""
- params: Dict[str, Any] = {"q": query, "top_k": top_k, "min_score": min_score}
- if types:
- params["types"] = ",".join(types)
- if owner: # 显式覆盖才用;None 时不过滤(全库搜)
- params["owner"] = owner
- if requirement_id:
- params["requirement_id"] = requirement_id
- if capability_id:
- params["capability_id"] = capability_id
- if tool_id:
- params["tool_id"] = tool_id
- try:
- async with httpx.AsyncClient(timeout=SEARCH_TIMEOUT) as client:
- r = await client.get(f"{KNOWHUB_API}/api/knowledge/search", params=params)
- r.raise_for_status()
- data = r.json()
- return {
- "query": query,
- "count": data.get("count", 0),
- "results": data.get("results", []),
- }
- except httpx.HTTPStatusError as e:
- return {"query": query, "count": 0, "results": [],
- "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}"}
- except Exception as e:
- return {"query": query, "count": 0, "results": [],
- "error": f"{type(e).__name__}: {e}"}
- async def save(
- task: str,
- content: str,
- types: List[str],
- tags: Optional[Dict[str, str]] = None,
- scopes: Optional[List[str]] = None,
- owner: Optional[str] = None,
- resource_ids: Optional[List[str]] = None,
- source_name: str = "",
- source_category: str = "exp",
- source_urls: Optional[List[str]] = None,
- agent_id: str = "knowhub_cli",
- submitted_by: str = "",
- score: int = 3,
- message_id: str = "",
- capability_ids: Optional[List[str]] = None,
- tool_ids: Optional[List[str]] = None,
- ) -> Dict[str, Any]:
- """保存知识:POST /api/knowledge。"""
- payload = {
- "message_id": message_id,
- "types": types,
- "task": task,
- "tags": tags or {},
- "scopes": scopes or DEFAULT_SCOPES,
- "owner": owner or KNOWHUB_OWNER,
- "content": content,
- "resource_ids": resource_ids or [],
- "source": {
- "name": source_name,
- "category": source_category,
- "urls": source_urls or [],
- "agent_id": agent_id,
- "submitted_by": submitted_by or KNOWHUB_OWNER,
- },
- "eval": {"score": score, "helpful": 1, "harmful": 0, "confidence": 0.5},
- "capability_ids": capability_ids or [],
- "tool_ids": tool_ids or [],
- }
- try:
- async with httpx.AsyncClient(timeout=SAVE_TIMEOUT) as client:
- r = await client.post(f"{KNOWHUB_API}/api/knowledge", json=payload)
- r.raise_for_status()
- data = r.json()
- return {
- "knowledge_id": data.get("knowledge_id"),
- "status": data.get("status", "submitted"),
- "owner": payload["owner"],
- }
- except httpx.HTTPStatusError as e:
- return {"knowledge_id": None, "status": "failed",
- "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}"}
- except Exception as e:
- return {"knowledge_id": None, "status": "failed",
- "error": f"{type(e).__name__}: {e}"}
- # ── 参数工具 ────────────────────────────────────────
- def _split_csv(val: Optional[str]) -> Optional[List[str]]:
- """'a,b,c' → ['a','b','c'];None → None;空串 → None。"""
- if not val:
- return None
- parts = [x.strip() for x in val.split(",") if x.strip()]
- return parts or None
- def _parse_json_maybe(val: Optional[str]) -> Optional[Any]:
- """把字符串按 JSON 解析;解析失败则原样返回字符串。"""
- if val is None:
- return None
- try:
- return json.loads(val)
- except (json.JSONDecodeError, ValueError):
- return val
- # ── CLI ───────────────────────────────────────────
- def _build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="KnowHub CLI - ask / search / save 知识库",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=f"默认 API: {KNOWHUB_API} 默认 owner: {KNOWHUB_OWNER}",
- )
- sub = parser.add_subparsers(dest="cmd", required=True, metavar="{ask,search,save}")
- # ask
- p_ask = sub.add_parser("ask", help="深度回顾(远端 Librarian Agent)")
- p_ask.add_argument("--query", required=True)
- p_ask.add_argument("--deep", action="store_true", help="改走 remote_research,全网调研 + 入库")
- p_ask.add_argument("--continue_from", help="已有 sub_trace_id,传入则复用 Librarian 上下文")
- p_ask.add_argument("--skills", help="逗号分隔的 skill 名单(可选,由服务器白名单过滤)")
- # search
- p_s = sub.add_parser("search", help="快速检索(语义搜索 + 精排)")
- p_s.add_argument("--query", required=True)
- p_s.add_argument("--top_k", type=int, default=5)
- p_s.add_argument("--min_score", type=int, default=3)
- p_s.add_argument("--types", help="逗号分隔(user_profile/strategy/tool/usecase/definition/plan)")
- p_s.add_argument("--owner", help=f"覆盖默认 owner(默认不过滤,用 --owner={KNOWHUB_OWNER} 限定自己的)")
- p_s.add_argument("--requirement_id")
- p_s.add_argument("--capability_id")
- p_s.add_argument("--tool_id")
- # save
- p_sv = sub.add_parser("save", help="保存知识到 KnowHub")
- p_sv.add_argument("--task", required=True, help="任务描述:在什么情景下 + 要完成什么目标")
- p_sv.add_argument("--content", required=True, help="知识的核心内容")
- p_sv.add_argument("--types", required=True, help="逗号分隔的类型")
- p_sv.add_argument("--tags", help="JSON 字符串,如 '{\"project\":\"xyz\"}'")
- p_sv.add_argument("--scopes", help=f"逗号分隔(默认 {','.join(DEFAULT_SCOPES)})")
- p_sv.add_argument("--owner", help=f"覆盖默认 owner(默认 {KNOWHUB_OWNER})")
- p_sv.add_argument("--score", type=int, default=3, help="1-5,默认 3")
- p_sv.add_argument("--source_name", default="")
- p_sv.add_argument("--source_category", default="exp", help="paper/exp/skill/book")
- p_sv.add_argument("--source_urls", help="逗号分隔 URL 列表")
- p_sv.add_argument("--agent_id", default="knowhub_cli")
- p_sv.add_argument("--submitted_by", default="")
- p_sv.add_argument("--capability_ids", help="逗号分隔的能力 ID")
- p_sv.add_argument("--tool_ids", help="逗号分隔的工具 ID")
- p_sv.add_argument("--resource_ids", help="逗号分隔的资源 ID")
- p_sv.add_argument("--message_id", default="")
- return parser
- async def _dispatch(args) -> Dict[str, Any]:
- if args.cmd == "ask":
- return await ask(
- query=args.query,
- deep=args.deep,
- continue_from=args.continue_from,
- skills=_split_csv(args.skills),
- )
- if args.cmd == "search":
- return await search(
- query=args.query,
- top_k=args.top_k,
- min_score=args.min_score,
- types=_split_csv(args.types),
- owner=args.owner,
- requirement_id=args.requirement_id,
- capability_id=args.capability_id,
- tool_id=args.tool_id,
- )
- if args.cmd == "save":
- tags_val = _parse_json_maybe(args.tags) if args.tags else None
- return await save(
- task=args.task,
- content=args.content,
- types=_split_csv(args.types) or [],
- tags=tags_val if isinstance(tags_val, dict) else None,
- scopes=_split_csv(args.scopes),
- owner=args.owner,
- resource_ids=_split_csv(args.resource_ids),
- source_name=args.source_name,
- source_category=args.source_category,
- source_urls=_split_csv(args.source_urls),
- agent_id=args.agent_id,
- submitted_by=args.submitted_by,
- score=args.score,
- message_id=args.message_id,
- capability_ids=_split_csv(args.capability_ids),
- tool_ids=_split_csv(args.tool_ids),
- )
- raise ValueError(f"未知命令: {args.cmd}")
- def main() -> int:
- args = _build_parser().parse_args()
- result = asyncio.run(_dispatch(args))
- print(json.dumps(result, ensure_ascii=False, indent=2))
- # 退出码:任何 status=failed 或 error 字段非空 → 1
- if result.get("status") == "failed" or result.get("error"):
- return 1
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|