""" 飞书 Node HTTP 适配层(默认 FEISHU_HTTP_BASE_URL:4380)工具封装。 与适配服务对齐: GET /tools POST /tool-call body: { tool, params, context?, tool_call_id? } POST /tool-calls/batch body: { calls: [ 同上 ... ] } context 字段与 Node ``buildTicket`` 一致:message_id、chat_id、account_id、 sender_open_id、chat_type、thread_id。Gateway 写入 Trace.context[\"feishu_adapter\"], 本模块自动合并;也可用 context_patch 临时覆盖。 """ from __future__ import annotations import json import logging import os from typing import Any, Dict, List, Optional import httpx from agent.tools import tool from agent.tools.models import ToolResult logger = logging.getLogger(__name__) FEISHU_HTTP_TIMEOUT = float(os.getenv("FEISHU_HTTP_TIMEOUT", "120")) def _adapter_base_url() -> str: return os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").rstrip("/") async def _load_feishu_adapter(context: Optional[Dict[str, Any]]) -> Dict[str, Any]: if not context: return {} trace_id = context.get("trace_id") store = context.get("store") if not trace_id or not store: return {} try: trace = await store.get_trace(trace_id) except Exception: logger.exception("feishu_adapter: get_trace failed trace_id=%s", trace_id) return {} if not trace or not trace.context: return {} raw = trace.context.get("feishu_adapter") return dict(raw) if isinstance(raw, dict) else {} async def _resolve_uid_for_adapter(context: Optional[Dict[str, Any]]) -> str: if not context: return "" u = context.get("uid") if u: return str(u) trace_id = context.get("trace_id") store = context.get("store") if not trace_id or not store: return "" try: trace = await store.get_trace(trace_id) except Exception: return "" if trace and trace.uid: return str(trace.uid) return "" def _merge_to_node_context( adapter: Dict[str, Any], patch: Optional[Dict[str, Any]], uid: str, ) -> Dict[str, Any]: out: Dict[str, Any] = {} for k, v in adapter.items(): if v is not None and v != "": out[k] = v if patch: for k, v in patch.items(): if v is not None and v != "": out[k] = v if "sender_open_id" not in out and out.get("open_id"): out["sender_open_id"] = out["open_id"] if uid and "sender_open_id" not in out: out["sender_open_id"] = uid out.pop("open_id", None) out.pop("app_id", None) return out def _coerce_tool_params( params: Any, *, tool_name: str, label: str = "params", ) -> tuple[Dict[str, Any], Optional[ToolResult]]: """ 模型常把嵌套 JSON 误传为字符串;若为 str 则尝试 json.loads 成 dict。 解析失败或类型不对时返回错误 ToolResult,避免静默发空 {} 到 Node。 """ if params is None: return {}, None if isinstance(params, dict): return params, None if isinstance(params, str): s = params.strip() if not s: return {}, None try: parsed = json.loads(s) except json.JSONDecodeError as e: return {}, ToolResult( title=f"{label} 不是合法 JSON", output=f"工具 `{tool_name}` 的 {label} 应为对象;收到字符串但解析失败:{e}\n" f"原始片段:{s[:400]!r}", error="invalid_params_json", ) if not isinstance(parsed, dict): return {}, ToolResult( title=f"{label} 解析后不是对象", output=f"工具 `{tool_name}` 的 {label} 应为 JSON 对象,解析得到:{type(parsed).__name__}", error="params_not_object", ) logger.info("feishu_adapter: coerced %s from JSON string for tool=%s", label, tool_name) return parsed, None return {}, ToolResult( title=f"{label} 类型无效", output=f"工具 `{tool_name}` 的 {label} 须为 object 或可解析为 object 的 JSON 字符串,收到:{type(params).__name__}", error="invalid_params_type", ) def _format_tool_list_body(data: dict[str, Any]) -> str: if not data.get("ok"): return json.dumps(data, ensure_ascii=False) tools = data.get("tools") or [] if not isinstance(tools, list): return json.dumps(data, ensure_ascii=False) lines: list[str] = [ "以下为 Node 适配层 GET /tools 返回的工具清单(名称须与 tool-call 的 tool 一致):", "", ] for t in tools[:200]: if not isinstance(t, dict): continue name = t.get("name") or "" desc = (t.get("description") or "")[:300] lines.append(f"- {name}: {desc}") if len(tools) > 200: lines.append(f"\n… 共 {len(tools)} 个,已截断显示前 200 个。") lines.append( "\n升级适配层后请以本列表为准;调用时使用 feishu_adapter_tool_call," "params 必须为 **JSON 对象**(不要传转义后的字符串);字段名以各工具的 parameters 为准" "(例如 feishu_bitable_app.create 通常用 name 而非 app_name)。" ) return "\n".join(lines) def _format_tool_call_response(data: dict[str, Any]) -> ToolResult: if data.get("ok"): # 始终回传完整 JSON,避免模型只看到 {"ok": true} 而缺少 result/app_token text = json.dumps(data, ensure_ascii=False, indent=2) if data.get("result") in (None, ""): text += ( "\n\n【提示】成功响应中无有效 result:请确认 params 是否为空、字段名是否与 GET /tools 的 schema 一致;" "必要时先 feishu_adapter_list_tools。" ) return ToolResult(title="飞书适配层 tool-call 成功", output=text) err = data.get("error") or "unknown_error" if err == "need_user_authorization": details = data.get("details") or {} hint = ( "需要用户授权:适配层可能已在会话中推送 OAuth / 授权卡片,请提示用户按卡片完成授权后重试。" f"\n详情:{json.dumps(details, ensure_ascii=False)}" ) return ToolResult( title="飞书适配层:需要用户授权", output=hint, long_term_memory="need_user_authorization", error="need_user_authorization", ) return ToolResult( title="飞书适配层 tool-call 失败", output=json.dumps(data, ensure_ascii=False), error=str(err), ) @tool( hidden_params=["context"], display={ "zh": { "name": "列出飞书 Node 适配层工具", "params": {}, }, "en": { "name": "List Feishu HTTP adapter tools", "params": {}, }, }, ) async def feishu_adapter_list_tools(context: Optional[Dict[str, Any]] = None) -> ToolResult: """ 调用适配层 ``GET /tools``,获取当前注册的 MCP/OAPI 工具名与 parameters 说明。 调用其他飞书相关能力前应先核对名称是否与文档一致。 """ url = f"{_adapter_base_url()}/tools" try: async with httpx.AsyncClient(timeout=FEISHU_HTTP_TIMEOUT) as client: resp = await client.get(url) try: data = resp.json() except Exception: return ToolResult( title="获取 /tools 失败", output=resp.text[:800], error=f"HTTP {resp.status_code}", ) if resp.status_code >= 400: return ToolResult( title="获取 /tools 失败", output=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else resp.text[:800], error=f"HTTP {resp.status_code}", ) except Exception as e: logger.exception("feishu_adapter_list_tools failed") return ToolResult( title="获取 /tools 失败", output=str(e), error=str(e), ) if not isinstance(data, dict): return ToolResult(title="/tools 返回异常", output=str(data), error="invalid_shape") return ToolResult( title="飞书适配层工具列表", output=_format_tool_list_body(data), metadata={"raw_tools_count": len(data.get("tools") or []) if isinstance(data.get("tools"), list) else 0}, ) @tool( hidden_params=["context"], display={ "zh": { "name": "调用飞书 Node 适配层 tool-call", "params": { "tool": "工具名(与 GET /tools 的 name 一致)", "params": "工具参数对象,对应 schema", "context_patch": "可选,覆盖 Trace 中的 feishu_adapter 字段(如 message_id)", }, }, "en": { "name": "Invoke Feishu HTTP adapter tool-call", "params": { "tool": "Tool name (same as GET /tools)", "params": "Arguments object per schema", "context_patch": "Optional overrides for adapter context", }, }, }, ) async def feishu_adapter_tool_call( tool: str, params: Optional[Dict[str, Any]] = None, context_patch: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> ToolResult: """ 调用适配层 ``POST /tool-call``:在 Node 侧执行已注册的飞书/OAPI 工具。 Trace 由 Gateway 写入的 ``feishu_adapter``(account_id、chat_id、message_id、 sender_open_id 等)会自动并入 ``context``;必要时用 context_patch 覆盖当前消息的 message_id。 **params**:须为对象;若上游误传 JSON 字符串,会尝试解析,失败则报错(不再静默发空 params)。 **context_patch**:须为对象或合法 JSON 对象字符串;误传字符串时同样会解析。 **tool**:与 ``GET /tools`` 的 ``name`` 完全一致;命名约定见注入的 feishu-bitable 等 SKILL,不确定时先 ``feishu_adapter_list_tools``。 """ name = (tool or "").strip() if not name: return ToolResult(title="参数错误", output="tool 不能为空", error="empty_tool") coerced, err = _coerce_tool_params(params, tool_name=name, label="params") if err is not None: return err patch_dict, patch_err = _coerce_tool_params(context_patch, tool_name=name, label="context_patch") if patch_err is not None: return patch_err adapter = await _load_feishu_adapter(context) uid = await _resolve_uid_for_adapter(context) node_ctx = _merge_to_node_context(adapter, patch_dict or None, uid) body: Dict[str, Any] = { "tool": name, "params": coerced, "context": node_ctx, } url = f"{_adapter_base_url()}/tool-call" try: async with httpx.AsyncClient(timeout=FEISHU_HTTP_TIMEOUT) as client: resp = await client.post(url, json=body) try: data = resp.json() except Exception: data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code, "text": resp.text[:800]} except Exception as e: logger.exception("feishu_adapter_tool_call HTTP failed tool=%s", name) return ToolResult( title="tool-call 请求失败", output=str(e), error=str(e), ) if not isinstance(data, dict): return ToolResult(title="tool-call 响应异常", output=str(data), error="invalid_shape") return _format_tool_call_response(data) @tool( hidden_params=["context"], display={ "zh": { "name": "批量调用飞书 Node 适配层 tool-call", "params": { "calls": "数组,每项含 tool、params,可选 context 覆盖单条", "context_patch": "可选,合并到每条 call 的 context(后者优先)", }, }, "en": { "name": "Batch Feishu HTTP adapter tool-calls", "params": { "calls": "Array of {tool, params?, context?}", "context_patch": "Optional merged into each call context", }, }, }, ) async def feishu_adapter_tool_calls_batch( calls: List[Dict[str, Any]], context_patch: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> ToolResult: """ 调用适配层 ``POST /tool-calls/batch``。每项与单次 tool-call 请求体结构相同, 可单独带 ``context``;会与 Trace 中的 feishu_adapter 及 context_patch 合并。 """ if not isinstance(calls, list) or not calls: return ToolResult(title="参数错误", output="calls 必须为非空数组", error="empty_calls") adapter = await _load_feishu_adapter(context) uid = await _resolve_uid_for_adapter(context) batch_patch, batch_patch_err = _coerce_tool_params( context_patch, tool_name="feishu_adapter_tool_calls_batch", label="context_patch" ) if batch_patch_err is not None: return batch_patch_err base_ctx = _merge_to_node_context(adapter, batch_patch or None, uid) norm_calls: list[dict[str, Any]] = [] for i, raw in enumerate(calls): if not isinstance(raw, dict): return ToolResult(title="参数错误", output=f"calls[{i}] 必须为对象", error="invalid_call_item") tname = raw.get("tool") if not isinstance(tname, str) or not tname.strip(): return ToolResult(title="参数错误", output=f"calls[{i}].tool 无效", error="missing_tool") tname_stripped = tname.strip() p_raw = raw.get("params") p, p_err = _coerce_tool_params(p_raw, tool_name=tname_stripped, label=f"calls[{i}].params") if p_err is not None: return ToolResult( title="参数错误", output=f"calls[{i}]: {p_err.output}", error=p_err.error or "invalid_params", ) c_extra, c_err = _coerce_tool_params( raw.get("context"), tool_name=tname_stripped, label=f"calls[{i}].context" ) if c_err is not None: return ToolResult( title="参数错误", output=f"calls[{i}]: {c_err.output}", error=c_err.error or "invalid_context", ) merged = dict(base_ctx) for k, v in c_extra.items(): if v is not None and v != "": merged[k] = v norm_calls.append( { "tool": tname.strip(), "params": p, "context": merged, } ) url = f"{_adapter_base_url()}/tool-calls/batch" try: async with httpx.AsyncClient(timeout=FEISHU_HTTP_TIMEOUT) as client: resp = await client.post(url, json={"calls": norm_calls}) try: data = resp.json() except Exception: data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code, "text": resp.text[:800]} except Exception as e: logger.exception("feishu_adapter_tool_calls_batch HTTP failed") return ToolResult( title="tool-calls/batch 请求失败", output=str(e), error=str(e), ) if not isinstance(data, dict): return ToolResult(title="batch 响应异常", output=str(data), error="invalid_shape") results = data.get("results") if data.get("ok") and isinstance(results, list): any_auth = any( isinstance(r, dict) and r.get("error") == "need_user_authorization" for r in results ) out_text = json.dumps(data, ensure_ascii=False, indent=2) if any_auth: return ToolResult( title="批量 tool-call 完成(含需授权项)", output=out_text + "\n\n若含 need_user_authorization:请提示用户查看会话中的授权卡片并完成 OAuth。", long_term_memory="need_user_authorization_in_batch", ) return ToolResult(title="批量 tool-call 完成", output=out_text) return ToolResult( title="批量 tool-call 未全部成功", output=json.dumps(data, ensure_ascii=False), error=str(data.get("error") or "batch_failed"), )