Bladeren bron

完善了feishu插件server功能

kevin.yang 4 dagen geleden
bovenliggende
commit
49525b5c28
2 gewijzigde bestanden met toevoegingen van 301 en 51 verwijderingen
  1. 2 0
      docker-compose.yml
  2. 299 51
      gateway/core/channels/feishu/openclaw-lark-patch/src/http/server.ts

+ 2 - 0
docker-compose.yml

@@ -13,6 +13,8 @@ services:
     environment:
       - FEISHU_HTTP_PORT=4380
       - GATEWAY_FEISHU_WEBHOOK_URL=http://localhost:8000/api/channels/feishu/openclaw/webhook
+    ports:
+      - "4380:4380"
     networks:
       - agent
 

+ 299 - 51
gateway/core/channels/feishu/openclaw-lark-patch/src/http/server.ts

@@ -1,3 +1,22 @@
+/**
+ * Feishu 能力独立 HTTP 服务(与 OpenClaw 主进程解耦,复用 openclaw-lark 源码)
+ *
+ * 暴露给自建 Agent 的典型用法:
+ * - GET /healthz、GET /readyz、GET /tools、GET /accounts
+ * - POST /tool-call、POST /tool-calls/batch(OAPI + MCP 文档工具 + 可选 feishu_oauth)
+ * - POST /feishu/send-message、POST /feishu/react、POST /feishu/revoke-uat
+ * - WebSocket:将 im.message / 表情 / 卡片操作等事件 JSON 转发到 GATEWAY_FEISHU_WEBHOOK_URL
+ *
+ * 其它复用方式(不必跑本服务):
+ * 1) 在 Node 项目中依赖与本仓库相同的 openclaw-lark 源码包,构造最小 OpenClawPluginApi(见 createApi),
+ *    调用 registerOapiTools / registerFeishuMcpDocTools / registerFeishuOAuthTool,并用 withTicket 包裹每次工具执行。
+ * 2) 继续使用 OpenClaw 官方网关 + feishu 插件入站(能力最全,与自建 Agent 耦合最大)。
+ *
+ * 环境变量:FEISHU_HTTP_PORT、GATEWAY_FEISHU_WEBHOOK_URL、FEISHU_MAX_BODY_MB、FEISHU_WS_ENABLED、FEISHU_SKIP_OAUTH_TOOL
+ *
+ * FEISHU_MAX_BODY_MB 默认与飞书「单次直接上传文件」上限一致(20MB);IM 上传图片官方为单张 ≤10MB(见开放平台 im-v1/image/create)。
+ * 超过 20MB 的文件在飞书侧需走分片上传,本 HTTP 层只限制 JSON 体大小,不替代飞书上传流程。
+ */
 import http from 'node:http'
 import { randomUUID } from 'node:crypto'
 import fs from 'node:fs'
@@ -7,10 +26,11 @@ import { fileURLToPath } from 'node:url'
 import * as Lark from '@larksuiteoapi/node-sdk'
 import { registerOapiTools } from '../tools/oapi/index.js'
 import { registerFeishuMcpDocTools } from '../tools/mcp/doc/index.js'
+import { registerFeishuOAuthTool } from '../tools/oauth.js'
 import { withTicket, type LarkTicket } from '../core/lark-ticket.js'
 import { ToolClient } from '../core/tool-client.js'
 import { getStoredToken, setStoredToken, type StoredUAToken } from '../core/token-store.js'
-import { callWithUAT, NeedAuthorizationError } from '../core/uat-client.js'
+import { callWithUAT, NeedAuthorizationError, revokeUAT } from '../core/uat-client.js'
 import { UserAuthRequiredError } from '../core/auth-errors.js'
 import { getLarkAccount, getEnabledLarkAccounts } from '../core/accounts.js'
 import { LarkClient } from '../core/lark-client.js'
@@ -74,6 +94,19 @@ const GATEWAY_FEISHU_WEBHOOK_URL =
   process.env.GATEWAY_FEISHU_WEBHOOK_URL ??
   'http://localhost:8000/api/channels/feishu/openclaw/webhook'
 
+// 飞书官方(开放平台):IM 上传图片单文件 ≤10MB;云空间/文件「直接上传」单文件 ≤20MB,更大需分片上传。
+// 此处为自建服务接收 JSON 的上限,默认 20MB 与直传文件上限对齐;超大媒体应使用 media_url / 飞书分片 API,而非放大 JSON。
+const FEISHU_MAX_BODY_MB_PARSED = Number.parseInt(process.env.FEISHU_MAX_BODY_MB ?? '20', 10)
+const FEISHU_MAX_BODY_MB_EFFECTIVE =
+  Number.isFinite(FEISHU_MAX_BODY_MB_PARSED) && FEISHU_MAX_BODY_MB_PARSED > 0
+    ? FEISHU_MAX_BODY_MB_PARSED
+    : 20
+const FEISHU_MAX_BODY_BYTES = Math.max(4096, FEISHU_MAX_BODY_MB_EFFECTIVE * 1024 * 1024)
+
+const FEISHU_WS_ENABLED = !['0', 'false', 'no'].includes(
+  (process.env.FEISHU_WS_ENABLED ?? 'true').toLowerCase(),
+)
+
 function patchToolClientOwnerCheck() {
   const proto: any = (ToolClient as any).prototype
   const original = proto.invokeAsUser
@@ -192,22 +225,58 @@ function createApi(config: unknown, apiLogger: Logger) {
   } as any
 }
 
+function installStandaloneLarkRuntime() {
+  LarkClient.setGlobalConfig(globalConfigRef.cfg as any)
+  LarkClient.setRuntime({
+    config: {
+      loadConfig: () => globalConfigRef.cfg as any,
+    },
+    logging: {
+      getChildLogger: () => ({
+        info: (...args: unknown[]) => logger.info?.(args.map(String).join(' ')),
+        warn: (...args: unknown[]) => logger.warn?.(args.map(String).join(' ')),
+        error: (...args: unknown[]) => logger.error?.(args.map(String).join(' ')),
+        debug: (...args: unknown[]) => logger.debug?.(args.map(String).join(' ')),
+      }),
+    },
+  } as any)
+}
+
 async function initTools() {
   const config = createConfig()
   globalConfigRef.cfg = config
   const api = createApi(config, logger)
 
+  installStandaloneLarkRuntime()
   patchToolClientOwnerCheck()
 
   registerOapiTools(api)
   registerFeishuMcpDocTools(api)
+
+  const skipOAuthTool = ['1', 'true', 'yes'].includes(
+    (process.env.FEISHU_SKIP_OAUTH_TOOL ?? '').toLowerCase(),
+  )
+  if (!skipOAuthTool) {
+    registerFeishuOAuthTool(api)
+  }
 }
 
-function readJsonBody(req: http.IncomingMessage): Promise<unknown> {
+function readJsonBody(
+  req: http.IncomingMessage,
+  maxBytes: number = FEISHU_MAX_BODY_BYTES,
+): Promise<unknown> {
   return new Promise((resolve, reject) => {
     const chunks: Buffer[] = []
+    let total = 0
     req.on('data', (chunk: Buffer | string) => {
-      chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk)
+      const buf = typeof chunk === 'string' ? Buffer.from(chunk) : chunk
+      total += buf.length
+      if (total > maxBytes) {
+        reject(new Error(`Request body exceeds limit (${maxBytes} bytes)`))
+        req.destroy()
+        return
+      }
+      chunks.push(buf)
     })
     req.on('end', () => {
       if (chunks.length === 0) {
@@ -601,6 +670,103 @@ async function handleFeishuReaction(body: unknown): Promise<ToolCallResponse> {
   }
 }
 
+async function handleBatchToolCalls(body: unknown): Promise<{
+  ok: boolean
+  results?: ToolCallResponse[]
+  error?: string
+}> {
+  if (!body || typeof body !== 'object') {
+    return { ok: false, error: 'Request body must be an object' }
+  }
+  const calls = (body as { calls?: unknown }).calls
+  if (!Array.isArray(calls)) {
+    return { ok: false, error: 'Missing or invalid field: calls (array)' }
+  }
+  const results: ToolCallResponse[] = []
+  for (const item of calls) {
+    results.push(await handleToolCall(item))
+  }
+  return { ok: true, results }
+}
+
+function handleListAccounts(): ToolCallResponse {
+  const cfg = globalConfigRef.cfg
+  if (!cfg) {
+    return { ok: false, error: 'config_not_initialised' }
+  }
+  const accounts = getEnabledLarkAccounts(cfg as any)
+  return {
+    ok: true,
+    result: {
+      accounts: accounts.map((a) => ({
+        account_id: a.accountId,
+        enabled: a.enabled,
+        configured: a.configured,
+        name: a.name,
+        brand: a.brand,
+        app_id: a.appId,
+      })),
+    },
+  }
+}
+
+async function handleRevokeUat(body: unknown): Promise<ToolCallResponse> {
+  if (!body || typeof body !== 'object') {
+    return { ok: false, error: 'Request body must be an object' }
+  }
+  const cfg = globalConfigRef.cfg
+  if (!cfg) {
+    return { ok: false, error: 'config_not_initialised' }
+  }
+  const obj = body as Record<string, unknown>
+  const openId = typeof obj.open_id === 'string' ? obj.open_id : undefined
+  if (!openId) {
+    return { ok: false, error: 'missing_open_id' }
+  }
+  const accountId = typeof obj.account_id === 'string' ? obj.account_id : undefined
+  const acct = getLarkAccount(cfg as any, accountId)
+  if (!acct.configured || !acct.appId) {
+    return {
+      ok: false,
+      error: 'account_not_configured',
+      details: { account_id: acct.accountId },
+    }
+  }
+  try {
+    await revokeUAT(acct.appId, openId)
+    return {
+      ok: true,
+      result: { revoked: true, app_id: acct.appId, open_id: openId },
+    }
+  } catch (err) {
+    const msg = err instanceof Error ? err.message : String(err)
+    return { ok: false, error: msg }
+  }
+}
+
+function isEventAppIdValid(expectedAppId: string | undefined, data: unknown): boolean {
+  if (!expectedAppId) return true
+  const eventAppId = (data as Record<string, unknown>).app_id
+  if (eventAppId == null) return true
+  if (eventAppId !== expectedAppId) {
+    logger.warn?.(
+      `feishu: event app_id mismatch (expected ${expectedAppId}, got ${String(eventAppId)}), discard`,
+    )
+    return false
+  }
+  return true
+}
+
+function createMessageDedupForAccount(account: {
+  config?: { dedup?: { ttlMs?: number; maxEntries?: number } }
+}): MessageDedup {
+  const dedupCfg = account.config?.dedup
+  return new MessageDedup({
+    ttlMs: dedupCfg?.ttlMs,
+    maxEntries: dedupCfg?.maxEntries,
+  })
+}
+
 function forwardEventToGateway(normalized: unknown) {
   return new Promise<void>((resolve, reject) => {
     const req = http.request(
@@ -641,7 +807,7 @@ async function startFeishuLongConnections() {
       continue
     }
 
-    const dedup = new MessageDedup()
+    const dedup = createMessageDedupForAccount(account)
     const lark = LarkClient.fromAccount(account)
 
     void lark
@@ -649,9 +815,14 @@ async function startFeishuLongConnections() {
         handlers: {
           'im.message.receive_v1': async (data: unknown) => {
             try {
+              if (!isEventAppIdValid(account.appId, data)) return
               const event = data as FeishuMessageEvent
               const msgId = event.message?.message_id
               if (!msgId) return
+              if (isMessageExpired(event.message?.create_time)) {
+                logger.debug?.(`feishu[${account.accountId}]: message ${msgId} expired, skip`)
+                return
+              }
               if (!dedup.tryRecord(msgId, account.accountId)) {
                 logger.debug?.(`feishu[${account.accountId}]: duplicate message ${msgId}, skip`)
                 return
@@ -675,45 +846,49 @@ async function startFeishuLongConnections() {
               logger.error?.(`feishu[${account.accountId}]: ws handler error: ${msg}`)
             }
           },
-          // 'im.message.reaction.created_v1': async (data: unknown) => {
-          //   try {
-          //     const event = data as FeishuReactionCreatedEvent
-          //     const msgId = event.message_id
-          //     const emojiType = event.reaction_type?.emoji_type ?? ''
-          //     const operatorOpenId = event.user_id?.open_id ?? ''
-          //     if (!msgId || !emojiType || !operatorOpenId) return
-
-          //     const dedupKey = `${msgId}:reaction:${emojiType}:${operatorOpenId}`
-          //     if (!dedup.tryRecord(dedupKey, account.accountId)) {
-          //       logger.debug?.(`feishu[${account.accountId}]: duplicate reaction ${dedupKey}, skip`)
-          //       return
-          //     }
-
-          //     if (isMessageExpired(event.action_time)) {
-          //       logger.debug?.(`feishu[${account.accountId}]: reaction on ${msgId} expired, skip`)
-          //       return
-          //     }
-
-          //     const normalized = {
-          //       event_type: 'reaction',
-          //       app_id: account.appId,
-          //       account_id: account.accountId,
-          //       open_id: operatorOpenId,
-          //       chat_type: event.chat_type,
-          //       chat_id: event.chat_id,
-          //       message_id: msgId,
-          //       emoji: emojiType,
-          //       action_time: event.action_time,
-          //       raw: data,
-          //     }
-          //     await forwardEventToGateway(normalized)
-          //   } catch (err) {
-          //     const msg = err instanceof Error ? err.message : String(err)
-          //     logger.error?.(`feishu[${account.accountId}]: ws reaction event error: ${msg}`)
-          //   }
-          // },
+          'im.message.reaction.created_v1': async (data: unknown) => {
+            try {
+              if (!isEventAppIdValid(account.appId, data)) return
+              const event = data as FeishuReactionCreatedEvent
+              const msgId = event.message_id
+              const emojiType = event.reaction_type?.emoji_type ?? ''
+              const operatorOpenId = event.user_id?.open_id ?? ''
+              if (!msgId || !emojiType || !operatorOpenId) return
+
+              const dedupKey = `${msgId}:reaction:${emojiType}:${operatorOpenId}`
+              if (!dedup.tryRecord(dedupKey, account.accountId)) {
+                logger.debug?.(`feishu[${account.accountId}]: duplicate reaction ${dedupKey}, skip`)
+                return
+              }
+
+              if (isMessageExpired(event.action_time)) {
+                logger.debug?.(`feishu[${account.accountId}]: reaction on ${msgId} expired, skip`)
+                return
+              }
+
+              const normalized = {
+                event_type: 'reaction',
+                app_id: account.appId,
+                account_id: account.accountId,
+                open_id: operatorOpenId,
+                chat_type: event.chat_type,
+                chat_id: event.chat_id,
+                message_id: msgId,
+                emoji: emojiType,
+                action_time: event.action_time,
+                raw: data,
+              }
+              await forwardEventToGateway(normalized)
+            } catch (err) {
+              const msg = err instanceof Error ? err.message : String(err)
+              logger.error?.(`feishu[${account.accountId}]: ws reaction event error: ${msg}`)
+            }
+          },
+          'im.message.reaction.deleted_v1': async () => {},
+          'im.message.message_read_v1': async () => {},
           'card.action.trigger': async (data: unknown) => {
             try {
+              if (!isEventAppIdValid(account.appId, data)) return
               const event = data as {
                 operator?: { open_id?: string }
                 action?: { value?: { action?: string; operation_id?: string } }
@@ -747,21 +922,44 @@ async function startFeishuLongConnections() {
   }
 }
 
+function resolvePathname(req: http.IncomingMessage): string | null {
+  if (!req.url) return null
+  try {
+    return new URL(req.url, 'http://127.0.0.1').pathname
+  } catch {
+    return req.url.split('?')[0] || '/'
+  }
+}
+
 function createServer() {
   const port = Number.parseInt(process.env.FEISHU_HTTP_PORT ?? '4380', 10)
 
   const server = http.createServer(async (req: http.IncomingMessage, res: http.ServerResponse) => {
-    if (!req.url) {
+    const pathname = resolvePathname(req)
+    if (pathname == null) {
       sendJson(res, 400, { ok: false, error: 'Missing URL' })
       return
     }
 
-    if (req.method === 'GET' && req.url === '/healthz') {
+    if (req.method === 'GET' && pathname === '/healthz') {
       sendJson(res, 200, { ok: true })
       return
     }
 
-    if (req.method === 'GET' && req.url === '/tools') {
+    if (req.method === 'GET' && pathname === '/readyz') {
+      const cfg = globalConfigRef.cfg
+      const accounts = cfg ? getEnabledLarkAccounts(cfg as any) : []
+      const configured = accounts.filter((a) => a.configured).length
+      const ok = Boolean(cfg) && configured > 0
+      sendJson(res, ok ? 200 : 503, {
+        ok,
+        configured_accounts: configured,
+        tools_registered: tools.size,
+      })
+      return
+    }
+
+    if (req.method === 'GET' && pathname === '/tools') {
       const list = Array.from(tools.values()).map((t) => ({
         name: t.name,
         label: t.label,
@@ -772,7 +970,18 @@ function createServer() {
       return
     }
 
-    if (req.method === 'POST' && req.url === '/tool-call') {
+    if (req.method === 'GET' && pathname === '/accounts') {
+      const resp = handleListAccounts()
+      const status = resp.ok ? 200 : 503
+      const body =
+        resp.ok && resp.result && typeof resp.result === 'object'
+          ? { ok: true, ...(resp.result as Record<string, unknown>) }
+          : { ok: false, error: resp.error, details: resp.details }
+      sendJson(res, status, body)
+      return
+    }
+
+    if (req.method === 'POST' && pathname === '/tool-call') {
       try {
         const body = await readJsonBody(req)
         const resp = await handleToolCall(body)
@@ -780,12 +989,31 @@ function createServer() {
         sendJson(res, status, resp)
       } catch (err) {
         const msg = err instanceof Error ? err.message : String(err)
-        sendJson(res, 500, { ok: false, error: msg })
+        const tooBig = msg.includes('exceeds limit')
+        sendJson(res, tooBig ? 413 : 500, { ok: false, error: msg })
+      }
+      return
+    }
+
+    if (req.method === 'POST' && pathname === '/tool-calls/batch') {
+      try {
+        const body = await readJsonBody(req)
+        const resp = await handleBatchToolCalls(body)
+        if (!resp.ok) {
+          sendJson(res, 400, { ok: false, error: resp.error })
+          return
+        }
+        const anyFailed = resp.results?.some((r) => !r.ok)
+        sendJson(res, anyFailed ? 207 : 200, { ok: true, results: resp.results })
+      } catch (err) {
+        const msg = err instanceof Error ? err.message : String(err)
+        const tooBig = msg.includes('exceeds limit')
+        sendJson(res, tooBig ? 413 : 500, { ok: false, error: msg })
       }
       return
     }
 
-    if (req.method === 'POST' && req.url === '/feishu/send-message') {
+    if (req.method === 'POST' && pathname === '/feishu/send-message') {
       try {
         const body = await readJsonBody(req)
         const resp = await handleSendFeishuMessage(body)
@@ -793,12 +1021,13 @@ function createServer() {
         sendJson(res, status, resp)
       } catch (err) {
         const msg = err instanceof Error ? err.message : String(err)
-        sendJson(res, 500, { ok: false, error: msg })
+        const tooBig = msg.includes('exceeds limit')
+        sendJson(res, tooBig ? 413 : 500, { ok: false, error: msg })
       }
       return
     }
 
-    if (req.method === 'POST' && req.url === '/feishu/react') {
+    if (req.method === 'POST' && pathname === '/feishu/react') {
       try {
         const body = await readJsonBody(req)
         const resp = await handleFeishuReaction(body)
@@ -806,7 +1035,22 @@ function createServer() {
         sendJson(res, status, resp)
       } catch (err) {
         const msg = err instanceof Error ? err.message : String(err)
-        sendJson(res, 500, { ok: false, error: msg })
+        const tooBig = msg.includes('exceeds limit')
+        sendJson(res, tooBig ? 413 : 500, { ok: false, error: msg })
+      }
+      return
+    }
+
+    if (req.method === 'POST' && pathname === '/feishu/revoke-uat') {
+      try {
+        const body = await readJsonBody(req)
+        const resp = await handleRevokeUat(body)
+        const status = resp.ok ? 200 : 400
+        sendJson(res, status, resp)
+      } catch (err) {
+        const msg = err instanceof Error ? err.message : String(err)
+        const tooBig = msg.includes('exceeds limit')
+        sendJson(res, tooBig ? 413 : 500, { ok: false, error: msg })
       }
       return
     }
@@ -822,7 +1066,11 @@ function createServer() {
 initTools()
   .then(() => {
     createServer()
-    void startFeishuLongConnections()
+    if (FEISHU_WS_ENABLED) {
+      void startFeishuLongConnections()
+    } else {
+      logger.info?.('feishu ws: FEISHU_WS_ENABLED=false, WebSocket long connections disabled')
+    }
   })
   .catch((err) => {
     logger.error?.(