import asyncio import time import aiohttp import json from typing import List, Dict, Any, Optional, Union from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from config import settings class FeishuDataAsync: """飞书电子表格 异步操作工具类""" def __init__(self, base_url: str = "https://open.feishu.cn"): """ 初始化飞书电子表格 V3 客户端 :param app_id: 飞书应用ID :param app_secret: 飞书应用Secret :param base_url: 飞书开放平台基础URL """ self.app_id = settings.FEISHU_APPID self.app_secret = settings.FEISHU_APPSECRET self.base_url = base_url self.access_token = "" self.token_expire_time = 0 self.session = None async def __aenter__(self): """异步上下文管理器入口""" self.session = aiohttp.ClientSession() await self._get_access_token() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.session: await self.session.close() @retry( stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception) ) async def _get_access_token(self) -> str: """异步获取访问令牌""" now = int(time.time()) if self.access_token and now < self.token_expire_time - 60: return self.access_token url = f"{self.base_url}/open-apis/auth/v3/tenant_access_token/internal" payload = {"app_id": self.app_id, "app_secret": self.app_secret} async with self.session.post(url, json=payload,ssl=False) as response: if response.status != 200: error_text = await response.text() raise Exception(f"获取访问令牌失败: {error_text}") result = await response.json() if result.get("code") != 0: raise Exception(f"获取访问令牌失败: {result.get('msg')}") self.access_token = result.get("tenant_access_token", "") self.token_expire_time = now + result.get("expire", 7200) return self.access_token async def get_values( self, spreadsheet_token: str, sheet_id: str, ) -> List[List[Any]]: """ 异步获取电子表格数据(V3 版本接口) :param spreadsheet_token: 电子表格token :param sheet_id: 工作表ID :param range_str: 数据范围(如"A1:C10") :param value_render_option: 值渲染选项 :param date_time_render_option: 日期时间渲染选项 :return: 表格数据二维列表 """ access_token = await self._get_access_token() url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8" } params = { "ranges": sheet_id, "valueRenderOption": "ToString", "dateTimeRenderOption": "", "user_id_type": "open_id" } async with self.session.get(url, headers=headers, params=params,ssl=False) as response: if response.status != 200: error_text = await response.text() raise Exception(f"获取表格数据失败: {error_text}") result = await response.json() if result.get("code") != 0: raise Exception(f"获取表格数据失败: {result.get('msg')}") return result.get("data", {}).get("valueRanges", {})[0].get("values", []) async def insert_values( self, spreadsheet_token: str, sheet_id: str, ranges: str, values: List[Any], ) -> Dict[str, Any]: """ 异步更新电子表格数据 :param spreadsheet_token: 电子表格token :param sheet_id: 工作表ID :param ranges: 数据范围(如"A1:C10") :param values: 要更新的数据列表 :return: 更新结果 """ access_token = await self._get_access_token() url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_prepend" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8" } payload ={ "valueRange": { "range": f"{sheet_id}!{ranges}", "values": [values] }, } print(payload) async with self.session.post(url, headers=headers, json=payload,ssl=False) as response: print(response) if response.status != 200: error_text = await response.text() if response.status != 200: error_text = await response.text() raise Exception(f"更新表格数据失败: {error_text}") result = await response.json() if result.get("code") != 0: raise Exception(f"更新表格数据失败: {result.get('msg')}") return result.get("data", {})