123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import asyncio
- import time
- import aiohttp
- import json
- from typing import List, Dict, Any, Optional, Union
- from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
- from config import settings
- class FeishuDataAsync:
- """飞书电子表格 异步操作工具类"""
- def __init__(self, base_url: str = "https://open.feishu.cn"):
- """
- 初始化飞书电子表格 V3 客户端
- :param app_id: 飞书应用ID
- :param app_secret: 飞书应用Secret
- :param base_url: 飞书开放平台基础URL
- """
- self.app_id = settings.FEISHU_APPID
- self.app_secret = settings.FEISHU_APPSECRET
- self.base_url = base_url
- self.access_token = ""
- self.token_expire_time = 0
- self.session = None
- async def __aenter__(self):
- """异步上下文管理器入口"""
- self.session = aiohttp.ClientSession()
- await self._get_access_token()
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- """异步上下文管理器出口"""
- if self.session:
- await self.session.close()
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_fixed(2),
- retry=retry_if_exception_type(Exception)
- )
- async def _get_access_token(self) -> str:
- """异步获取访问令牌"""
- now = int(time.time())
- if self.access_token and now < self.token_expire_time - 60:
- return self.access_token
- url = f"{self.base_url}/open-apis/auth/v3/tenant_access_token/internal"
- payload = {"app_id": self.app_id, "app_secret": self.app_secret}
- async with self.session.post(url, json=payload,ssl=False) as response:
- if response.status != 200:
- error_text = await response.text()
- raise Exception(f"获取访问令牌失败: {error_text}")
- result = await response.json()
- if result.get("code") != 0:
- raise Exception(f"获取访问令牌失败: {result.get('msg')}")
- self.access_token = result.get("tenant_access_token", "")
- self.token_expire_time = now + result.get("expire", 7200)
- return self.access_token
- async def get_values(
- self,
- spreadsheet_token: str,
- sheet_id: str,
- ) -> List[List[Any]]:
- """
- 异步获取电子表格数据(V3 版本接口)
- :param spreadsheet_token: 电子表格token
- :param sheet_id: 工作表ID
- :param range_str: 数据范围(如"A1:C10")
- :param value_render_option: 值渲染选项
- :param date_time_render_option: 日期时间渲染选项
- :return: 表格数据二维列表
- """
- access_token = await self._get_access_token()
- url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
- headers = {
- "Authorization": f"Bearer {access_token}",
- "Content-Type": "application/json; charset=utf-8"
- }
- params = {
- "ranges": sheet_id,
- "valueRenderOption": "ToString",
- "dateTimeRenderOption": "",
- "user_id_type": "open_id"
- }
- async with self.session.get(url, headers=headers, params=params,ssl=False) as response:
- if response.status != 200:
- error_text = await response.text()
- raise Exception(f"获取表格数据失败: {error_text}")
- result = await response.json()
- if result.get("code") != 0:
- raise Exception(f"获取表格数据失败: {result.get('msg')}")
- return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
- async def insert_values(
- self,
- spreadsheet_token: str,
- sheet_id: str,
- ranges: str,
- values: List[Any],
- ) -> Dict[str, Any]:
- """
- 异步更新电子表格数据
- :param spreadsheet_token: 电子表格token
- :param sheet_id: 工作表ID
- :param ranges: 数据范围(如"A1:C10")
- :param values: 要更新的数据列表
- :return: 更新结果
- """
- access_token = await self._get_access_token()
- url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_prepend"
- headers = {
- "Authorization": f"Bearer {access_token}",
- "Content-Type": "application/json; charset=utf-8"
- }
- payload ={
- "valueRange":
- {
- "range": f"{sheet_id}!{ranges}",
- "values": [values]
- },
- }
- print(payload)
- async with self.session.post(url, headers=headers, json=payload,ssl=False) as response:
- print(response)
- if response.status != 200:
- error_text = await response.text()
- if response.status != 200:
- error_text = await response.text()
- raise Exception(f"更新表格数据失败: {error_text}")
- result = await response.json()
- if result.get("code") != 0:
- raise Exception(f"更新表格数据失败: {result.get('msg')}")
- return result.get("data", {})
|