feishu_data_async.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import asyncio
  2. import time
  3. import aiohttp
  4. import json
  5. from typing import List, Dict, Any, Optional, Union
  6. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
  7. from config import settings
  8. class FeishuDataAsync:
  9. """飞书电子表格 异步操作工具类"""
  10. def __init__(self, base_url: str = "https://open.feishu.cn"):
  11. """
  12. 初始化飞书电子表格 V3 客户端
  13. :param app_id: 飞书应用ID
  14. :param app_secret: 飞书应用Secret
  15. :param base_url: 飞书开放平台基础URL
  16. """
  17. self.app_id = settings.FEISHU_APPID
  18. self.app_secret = settings.FEISHU_APPSECRET
  19. self.base_url = base_url
  20. self.access_token = ""
  21. self.token_expire_time = 0
  22. self.session = None
  23. async def __aenter__(self):
  24. """异步上下文管理器入口"""
  25. self.session = aiohttp.ClientSession()
  26. await self._get_access_token()
  27. return self
  28. async def __aexit__(self, exc_type, exc_val, exc_tb):
  29. """异步上下文管理器出口"""
  30. if self.session:
  31. await self.session.close()
  32. @retry(
  33. stop=stop_after_attempt(3),
  34. wait=wait_fixed(2),
  35. retry=retry_if_exception_type(Exception)
  36. )
  37. async def _get_access_token(self) -> str:
  38. """异步获取访问令牌"""
  39. now = int(time.time())
  40. if self.access_token and now < self.token_expire_time - 60:
  41. return self.access_token
  42. url = f"{self.base_url}/open-apis/auth/v3/tenant_access_token/internal"
  43. payload = {"app_id": self.app_id, "app_secret": self.app_secret}
  44. async with self.session.post(url, json=payload,ssl=False) as response:
  45. if response.status != 200:
  46. error_text = await response.text()
  47. raise Exception(f"获取访问令牌失败: {error_text}")
  48. result = await response.json()
  49. if result.get("code") != 0:
  50. raise Exception(f"获取访问令牌失败: {result.get('msg')}")
  51. self.access_token = result.get("tenant_access_token", "")
  52. self.token_expire_time = now + result.get("expire", 7200)
  53. return self.access_token
  54. async def get_values(
  55. self,
  56. spreadsheet_token: str,
  57. sheet_id: str,
  58. ) -> List[List[Any]]:
  59. """
  60. 异步获取电子表格数据(V3 版本接口)
  61. :param spreadsheet_token: 电子表格token
  62. :param sheet_id: 工作表ID
  63. :param range_str: 数据范围(如"A1:C10")
  64. :param value_render_option: 值渲染选项
  65. :param date_time_render_option: 日期时间渲染选项
  66. :return: 表格数据二维列表
  67. """
  68. access_token = await self._get_access_token()
  69. url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
  70. headers = {
  71. "Authorization": f"Bearer {access_token}",
  72. "Content-Type": "application/json; charset=utf-8"
  73. }
  74. params = {
  75. "ranges": sheet_id,
  76. "valueRenderOption": "ToString",
  77. "dateTimeRenderOption": "",
  78. "user_id_type": "open_id"
  79. }
  80. async with self.session.get(url, headers=headers, params=params,ssl=False) as response:
  81. if response.status != 200:
  82. error_text = await response.text()
  83. raise Exception(f"获取表格数据失败: {error_text}")
  84. result = await response.json()
  85. if result.get("code") != 0:
  86. raise Exception(f"获取表格数据失败: {result.get('msg')}")
  87. return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
  88. async def insert_values(
  89. self,
  90. spreadsheet_token: str,
  91. sheet_id: str,
  92. ranges: str,
  93. values: List[Any],
  94. ) -> Dict[str, Any]:
  95. """
  96. 异步更新电子表格数据
  97. :param spreadsheet_token: 电子表格token
  98. :param sheet_id: 工作表ID
  99. :param ranges: 数据范围(如"A1:C10")
  100. :param values: 要更新的数据列表
  101. :return: 更新结果
  102. """
  103. access_token = await self._get_access_token()
  104. url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_prepend"
  105. headers = {
  106. "Authorization": f"Bearer {access_token}",
  107. "Content-Type": "application/json; charset=utf-8"
  108. }
  109. payload ={
  110. "valueRange":
  111. {
  112. "range": f"{sheet_id}!{ranges}",
  113. "values": [values]
  114. },
  115. }
  116. print(payload)
  117. async with self.session.post(url, headers=headers, json=payload,ssl=False) as response:
  118. print(response)
  119. if response.status != 200:
  120. error_text = await response.text()
  121. if response.status != 200:
  122. error_text = await response.text()
  123. raise Exception(f"更新表格数据失败: {error_text}")
  124. result = await response.json()
  125. if result.get("code") != 0:
  126. raise Exception(f"更新表格数据失败: {result.get('msg')}")
  127. return result.get("data", {})