| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- """
- 飞书 Node HTTP 适配层(默认 FEISHU_HTTP_BASE_URL:4380)工具封装。
- 与适配服务对齐:
- GET /tools
- POST /tool-call body: { tool, params, context?, tool_call_id? }
- POST /tool-calls/batch body: { calls: [ 同上 ... ] }
- context 字段与 Node ``buildTicket`` 一致:message_id、chat_id、account_id、
- sender_open_id、chat_type、thread_id。Gateway 写入 Trace.context[\"feishu_adapter\"],
- 本模块自动合并;也可用 context_patch 临时覆盖。
- """
- from __future__ import annotations
- import json
- import logging
- import os
- from typing import Any, Dict, List, Optional
- import httpx
- from agent.tools import tool
- from agent.tools.models import ToolResult
- logger = logging.getLogger(__name__)
- FEISHU_HTTP_TIMEOUT = float(os.getenv("FEISHU_HTTP_TIMEOUT", "120"))
- def _adapter_base_url() -> str:
- return os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").rstrip("/")
- async def _load_feishu_adapter(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
- if not context:
- return {}
- trace_id = context.get("trace_id")
- store = context.get("store")
- if not trace_id or not store:
- return {}
- try:
- trace = await store.get_trace(trace_id)
- except Exception:
- logger.exception("feishu_adapter: get_trace failed trace_id=%s", trace_id)
- return {}
- if not trace or not trace.context:
- return {}
- raw = trace.context.get("feishu_adapter")
- return dict(raw) if isinstance(raw, dict) else {}
- async def _resolve_uid_for_adapter(context: Optional[Dict[str, Any]]) -> str:
- if not context:
- return ""
- u = context.get("uid")
- if u:
- return str(u)
- trace_id = context.get("trace_id")
- store = context.get("store")
- if not trace_id or not store:
- return ""
- try:
- trace = await store.get_trace(trace_id)
- except Exception:
- return ""
- if trace and trace.uid:
- return str(trace.uid)
- return ""
- def _merge_to_node_context(
- adapter: Dict[str, Any],
- patch: Optional[Dict[str, Any]],
- uid: str,
- ) -> Dict[str, Any]:
- out: Dict[str, Any] = {}
- for k, v in adapter.items():
- if v is not None and v != "":
- out[k] = v
- if patch:
- for k, v in patch.items():
- if v is not None and v != "":
- out[k] = v
- if "sender_open_id" not in out and out.get("open_id"):
- out["sender_open_id"] = out["open_id"]
- if uid and "sender_open_id" not in out:
- out["sender_open_id"] = uid
- out.pop("open_id", None)
- out.pop("app_id", None)
- return out
- def _coerce_tool_params(
- params: Any,
- *,
- tool_name: str,
- label: str = "params",
- ) -> tuple[Dict[str, Any], Optional[ToolResult]]:
- """
- 模型常把嵌套 JSON 误传为字符串;若为 str 则尝试 json.loads 成 dict。
- 解析失败或类型不对时返回错误 ToolResult,避免静默发空 {} 到 Node。
- """
- if params is None:
- return {}, None
- if isinstance(params, dict):
- return params, None
- if isinstance(params, str):
- s = params.strip()
- if not s:
- return {}, None
- try:
- parsed = json.loads(s)
- except json.JSONDecodeError as e:
- return {}, ToolResult(
- title=f"{label} 不是合法 JSON",
- output=f"工具 `{tool_name}` 的 {label} 应为对象;收到字符串但解析失败:{e}\n"
- f"原始片段:{s[:400]!r}",
- error="invalid_params_json",
- )
- if not isinstance(parsed, dict):
- return {}, ToolResult(
- title=f"{label} 解析后不是对象",
- output=f"工具 `{tool_name}` 的 {label} 应为 JSON 对象,解析得到:{type(parsed).__name__}",
- error="params_not_object",
- )
- logger.info("feishu_adapter: coerced %s from JSON string for tool=%s", label, tool_name)
- return parsed, None
- return {}, ToolResult(
- title=f"{label} 类型无效",
- output=f"工具 `{tool_name}` 的 {label} 须为 object 或可解析为 object 的 JSON 字符串,收到:{type(params).__name__}",
- error="invalid_params_type",
- )
- def _format_tool_list_body(data: dict[str, Any]) -> str:
- if not data.get("ok"):
- return json.dumps(data, ensure_ascii=False)
- tools = data.get("tools") or []
- if not isinstance(tools, list):
- return json.dumps(data, ensure_ascii=False)
- lines: list[str] = [
- "以下为 Node 适配层 GET /tools 返回的工具清单(名称须与 tool-call 的 tool 一致):",
- "",
- ]
- for t in tools[:200]:
- if not isinstance(t, dict):
- continue
- name = t.get("name") or ""
- desc = (t.get("description") or "")[:300]
- lines.append(f"- {name}: {desc}")
- if len(tools) > 200:
- lines.append(f"\n… 共 {len(tools)} 个,已截断显示前 200 个。")
- lines.append(
- "\n升级适配层后请以本列表为准;调用时使用 feishu_adapter_tool_call,"
- "params 必须为 **JSON 对象**(不要传转义后的字符串);字段名以各工具的 parameters 为准"
- "(例如 feishu_bitable_app.create 通常用 name 而非 app_name)。"
- )
- return "\n".join(lines)
- def _format_tool_call_response(data: dict[str, Any]) -> ToolResult:
- if data.get("ok"):
- # 始终回传完整 JSON,避免模型只看到 {"ok": true} 而缺少 result/app_token
- text = json.dumps(data, ensure_ascii=False, indent=2)
- if data.get("result") in (None, ""):
- text += (
- "\n\n【提示】成功响应中无有效 result:请确认 params 是否为空、字段名是否与 GET /tools 的 schema 一致;"
- "必要时先 feishu_adapter_list_tools。"
- )
- return ToolResult(title="飞书适配层 tool-call 成功", output=text)
- err = data.get("error") or "unknown_error"
- if err == "need_user_authorization":
- details = data.get("details") or {}
- hint = (
- "需要用户授权:适配层可能已在会话中推送 OAuth / 授权卡片,请提示用户按卡片完成授权后重试。"
- f"\n详情:{json.dumps(details, ensure_ascii=False)}"
- )
- return ToolResult(
- title="飞书适配层:需要用户授权",
- output=hint,
- long_term_memory="need_user_authorization",
- error="need_user_authorization",
- )
- return ToolResult(
- title="飞书适配层 tool-call 失败",
- output=json.dumps(data, ensure_ascii=False),
- error=str(err),
- )
- @tool(
- hidden_params=["context"],
- display={
- "zh": {
- "name": "列出飞书 Node 适配层工具",
- "params": {},
- },
- "en": {
- "name": "List Feishu HTTP adapter tools",
- "params": {},
- },
- },
- )
- async def feishu_adapter_list_tools(context: Optional[Dict[str, Any]] = None) -> ToolResult:
- """
- 调用适配层 ``GET /tools``,获取当前注册的 MCP/OAPI 工具名与 parameters 说明。
- 调用其他飞书相关能力前应先核对名称是否与文档一致。
- """
- url = f"{_adapter_base_url()}/tools"
- try:
- async with httpx.AsyncClient(timeout=FEISHU_HTTP_TIMEOUT) as client:
- resp = await client.get(url)
- try:
- data = resp.json()
- except Exception:
- return ToolResult(
- title="获取 /tools 失败",
- output=resp.text[:800],
- error=f"HTTP {resp.status_code}",
- )
- if resp.status_code >= 400:
- return ToolResult(
- title="获取 /tools 失败",
- output=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else resp.text[:800],
- error=f"HTTP {resp.status_code}",
- )
- except Exception as e:
- logger.exception("feishu_adapter_list_tools failed")
- return ToolResult(
- title="获取 /tools 失败",
- output=str(e),
- error=str(e),
- )
- if not isinstance(data, dict):
- return ToolResult(title="/tools 返回异常", output=str(data), error="invalid_shape")
- return ToolResult(
- title="飞书适配层工具列表",
- output=_format_tool_list_body(data),
- metadata={"raw_tools_count": len(data.get("tools") or []) if isinstance(data.get("tools"), list) else 0},
- )
- @tool(
- hidden_params=["context"],
- display={
- "zh": {
- "name": "调用飞书 Node 适配层 tool-call",
- "params": {
- "tool": "工具名(与 GET /tools 的 name 一致)",
- "params": "工具参数对象,对应 schema",
- "context_patch": "可选,覆盖 Trace 中的 feishu_adapter 字段(如 message_id)",
- },
- },
- "en": {
- "name": "Invoke Feishu HTTP adapter tool-call",
- "params": {
- "tool": "Tool name (same as GET /tools)",
- "params": "Arguments object per schema",
- "context_patch": "Optional overrides for adapter context",
- },
- },
- },
- )
- async def feishu_adapter_tool_call(
- tool: str,
- params: Optional[Dict[str, Any]] = None,
- context_patch: Optional[Dict[str, Any]] = None,
- context: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- """
- 调用适配层 ``POST /tool-call``:在 Node 侧执行已注册的飞书/OAPI 工具。
- Trace 由 Gateway 写入的 ``feishu_adapter``(account_id、chat_id、message_id、
- sender_open_id 等)会自动并入 ``context``;必要时用 context_patch 覆盖当前消息的 message_id。
- **params**:须为对象;若上游误传 JSON 字符串,会尝试解析,失败则报错(不再静默发空 params)。
- **context_patch**:须为对象或合法 JSON 对象字符串;误传字符串时同样会解析。
- **tool**:与 ``GET /tools`` 的 ``name`` 完全一致;命名约定见注入的 feishu-bitable 等 SKILL,不确定时先 ``feishu_adapter_list_tools``。
- """
- name = (tool or "").strip()
- if not name:
- return ToolResult(title="参数错误", output="tool 不能为空", error="empty_tool")
- coerced, err = _coerce_tool_params(params, tool_name=name, label="params")
- if err is not None:
- return err
- patch_dict, patch_err = _coerce_tool_params(context_patch, tool_name=name, label="context_patch")
- if patch_err is not None:
- return patch_err
- adapter = await _load_feishu_adapter(context)
- uid = await _resolve_uid_for_adapter(context)
- node_ctx = _merge_to_node_context(adapter, patch_dict or None, uid)
- body: Dict[str, Any] = {
- "tool": name,
- "params": coerced,
- "context": node_ctx,
- }
- url = f"{_adapter_base_url()}/tool-call"
- try:
- async with httpx.AsyncClient(timeout=FEISHU_HTTP_TIMEOUT) as client:
- resp = await client.post(url, json=body)
- try:
- data = resp.json()
- except Exception:
- data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code, "text": resp.text[:800]}
- except Exception as e:
- logger.exception("feishu_adapter_tool_call HTTP failed tool=%s", name)
- return ToolResult(
- title="tool-call 请求失败",
- output=str(e),
- error=str(e),
- )
- if not isinstance(data, dict):
- return ToolResult(title="tool-call 响应异常", output=str(data), error="invalid_shape")
- return _format_tool_call_response(data)
- @tool(
- hidden_params=["context"],
- display={
- "zh": {
- "name": "批量调用飞书 Node 适配层 tool-call",
- "params": {
- "calls": "数组,每项含 tool、params,可选 context 覆盖单条",
- "context_patch": "可选,合并到每条 call 的 context(后者优先)",
- },
- },
- "en": {
- "name": "Batch Feishu HTTP adapter tool-calls",
- "params": {
- "calls": "Array of {tool, params?, context?}",
- "context_patch": "Optional merged into each call context",
- },
- },
- },
- )
- async def feishu_adapter_tool_calls_batch(
- calls: List[Dict[str, Any]],
- context_patch: Optional[Dict[str, Any]] = None,
- context: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- """
- 调用适配层 ``POST /tool-calls/batch``。每项与单次 tool-call 请求体结构相同,
- 可单独带 ``context``;会与 Trace 中的 feishu_adapter 及 context_patch 合并。
- """
- if not isinstance(calls, list) or not calls:
- return ToolResult(title="参数错误", output="calls 必须为非空数组", error="empty_calls")
- adapter = await _load_feishu_adapter(context)
- uid = await _resolve_uid_for_adapter(context)
- batch_patch, batch_patch_err = _coerce_tool_params(
- context_patch, tool_name="feishu_adapter_tool_calls_batch", label="context_patch"
- )
- if batch_patch_err is not None:
- return batch_patch_err
- base_ctx = _merge_to_node_context(adapter, batch_patch or None, uid)
- norm_calls: list[dict[str, Any]] = []
- for i, raw in enumerate(calls):
- if not isinstance(raw, dict):
- return ToolResult(title="参数错误", output=f"calls[{i}] 必须为对象", error="invalid_call_item")
- tname = raw.get("tool")
- if not isinstance(tname, str) or not tname.strip():
- return ToolResult(title="参数错误", output=f"calls[{i}].tool 无效", error="missing_tool")
- tname_stripped = tname.strip()
- p_raw = raw.get("params")
- p, p_err = _coerce_tool_params(p_raw, tool_name=tname_stripped, label=f"calls[{i}].params")
- if p_err is not None:
- return ToolResult(
- title="参数错误",
- output=f"calls[{i}]: {p_err.output}",
- error=p_err.error or "invalid_params",
- )
- c_extra, c_err = _coerce_tool_params(
- raw.get("context"), tool_name=tname_stripped, label=f"calls[{i}].context"
- )
- if c_err is not None:
- return ToolResult(
- title="参数错误",
- output=f"calls[{i}]: {c_err.output}",
- error=c_err.error or "invalid_context",
- )
- merged = dict(base_ctx)
- for k, v in c_extra.items():
- if v is not None and v != "":
- merged[k] = v
- norm_calls.append(
- {
- "tool": tname.strip(),
- "params": p,
- "context": merged,
- }
- )
- url = f"{_adapter_base_url()}/tool-calls/batch"
- try:
- async with httpx.AsyncClient(timeout=FEISHU_HTTP_TIMEOUT) as client:
- resp = await client.post(url, json={"calls": norm_calls})
- try:
- data = resp.json()
- except Exception:
- data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code, "text": resp.text[:800]}
- except Exception as e:
- logger.exception("feishu_adapter_tool_calls_batch HTTP failed")
- return ToolResult(
- title="tool-calls/batch 请求失败",
- output=str(e),
- error=str(e),
- )
- if not isinstance(data, dict):
- return ToolResult(title="batch 响应异常", output=str(data), error="invalid_shape")
- results = data.get("results")
- if data.get("ok") and isinstance(results, list):
- any_auth = any(
- isinstance(r, dict) and r.get("error") == "need_user_authorization" for r in results
- )
- out_text = json.dumps(data, ensure_ascii=False, indent=2)
- if any_auth:
- return ToolResult(
- title="批量 tool-call 完成(含需授权项)",
- output=out_text
- + "\n\n若含 need_user_authorization:请提示用户查看会话中的授权卡片并完成 OAuth。",
- long_term_memory="need_user_authorization_in_batch",
- )
- return ToolResult(title="批量 tool-call 完成", output=out_text)
- return ToolResult(
- title="批量 tool-call 未全部成功",
- output=json.dumps(data, ensure_ascii=False),
- error=str(data.get("error") or "batch_failed"),
- )
|