feishu_data_async.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import asyncio
  2. import time
  3. import aiohttp
  4. import json
  5. from typing import List, Dict, Any, Optional, Union
  6. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
  7. from config import settings
  8. class FeishuDataAsync:
  9. """飞书电子表格 异步操作工具类"""
  10. def __init__(self, base_url: str = "https://open.feishu.cn"):
  11. """
  12. 初始化飞书电子表格 V3 客户端
  13. :param app_id: 飞书应用ID
  14. :param app_secret: 飞书应用Secret
  15. :param base_url: 飞书开放平台基础URL
  16. """
  17. self.app_id = settings.FEISHU_APPID
  18. self.app_secret = settings.FEISHU_APPSECRET
  19. self.base_url = base_url
  20. self.access_token = ""
  21. self.token_expire_time = 0
  22. self.session = None
  23. async def __aenter__(self):
  24. """异步上下文管理器入口"""
  25. self.session = aiohttp.ClientSession()
  26. await self._get_access_token()
  27. return self
  28. async def __aexit__(self, exc_type, exc_val, exc_tb):
  29. """异步上下文管理器出口"""
  30. if self.session:
  31. await self.session.close()
  32. @retry(
  33. stop=stop_after_attempt(3),
  34. wait=wait_fixed(2),
  35. retry=retry_if_exception_type(Exception)
  36. )
  37. async def _get_access_token(self) -> str:
  38. """异步获取访问令牌"""
  39. now = int(time.time())
  40. if self.access_token and now < self.token_expire_time - 60:
  41. return self.access_token
  42. url = f"{self.base_url}/open-apis/auth/v3/tenant_access_token/internal"
  43. payload = {"app_id": self.app_id, "app_secret": self.app_secret}
  44. async with self.session.post(url, json=payload,ssl=False) as response:
  45. if response.status != 200:
  46. error_text = await response.text()
  47. raise Exception(f"获取访问令牌失败: {error_text}")
  48. result = await response.json()
  49. if result.get("code") != 0:
  50. raise Exception(f"获取访问令牌失败: {result.get('msg')}")
  51. self.access_token = result.get("tenant_access_token", "")
  52. self.token_expire_time = now + result.get("expire", 7200)
  53. return self.access_token
  54. async def get_values(
  55. self,
  56. spreadsheet_token: str,
  57. sheet_id: str,
  58. ) -> List[List[Any]]:
  59. """
  60. 异步获取电子表格数据(V3 版本接口)
  61. :param spreadsheet_token: 电子表格token
  62. :param sheet_id: 工作表ID
  63. :param range_str: 数据范围(如"A1:C10")
  64. :param value_render_option: 值渲染选项
  65. :param date_time_render_option: 日期时间渲染选项
  66. :return: 表格数据二维列表
  67. """
  68. access_token = await self._get_access_token()
  69. url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
  70. headers = {
  71. "Authorization": f"Bearer {access_token}",
  72. "Content-Type": "application/json; charset=utf-8"
  73. }
  74. params = {
  75. "ranges": sheet_id,
  76. "valueRenderOption": "ToString",
  77. "dateTimeRenderOption": "",
  78. "user_id_type": "open_id"
  79. }
  80. async with self.session.get(url, headers=headers, params=params,ssl=False) as response:
  81. if response.status != 200:
  82. error_text = await response.text()
  83. raise Exception(f"获取表格数据失败: {error_text}")
  84. result = await response.json()
  85. if result.get("code") != 0:
  86. raise Exception(f"获取表格数据失败: {result.get('msg')}")
  87. return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
  88. async def get_values_v3(
  89. self,
  90. spreadsheet_token: str,
  91. sheet_id: str,
  92. range_str: str = None,
  93. ) -> List[List[Any]]:
  94. """
  95. 异步获取电子表格数据(V3 版本接口)
  96. :param spreadsheet_token: 电子表格token
  97. :param sheet_id: 工作表ID
  98. :param range_str: 数据范围(如"A1:C10"或"A:B"),如果不指定则获取整个工作表
  99. :param value_render_option: 值渲染选项
  100. :param date_time_render_option: 日期时间渲染选项
  101. :return: 表格数据二维列表
  102. """
  103. access_token = await self._get_access_token()
  104. url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
  105. headers = {
  106. "Authorization": f"Bearer {access_token}",
  107. "Content-Type": "application/json; charset=utf-8"
  108. }
  109. full_range = f"{sheet_id}!{range_str}" if range_str else sheet_id
  110. params = {
  111. "ranges": full_range,
  112. "valueRenderOption": "ToString",
  113. "dateTimeRenderOption": "",
  114. "user_id_type": "open_id"
  115. }
  116. async with self.session.get(url, headers=headers, params=params, ssl=False) as response:
  117. if response.status != 200:
  118. error_text = await response.text()
  119. raise Exception(f"获取表格数据失败: {error_text}")
  120. result = await response.json()
  121. if result.get("code") != 0:
  122. raise Exception(f"获取表格数据失败: {result.get('msg')}")
  123. return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
  124. async def insert_values(
  125. self,
  126. spreadsheet_token: str,
  127. sheet_id: str,
  128. ranges: str,
  129. values: List[Any],
  130. ) -> Dict[str, Any]:
  131. """
  132. 异步更新电子表格数据
  133. :param spreadsheet_token: 电子表格token
  134. :param sheet_id: 工作表ID
  135. :param ranges: 数据范围(如"A1:C10")
  136. :param values: 要更新的数据列表,必须是二维数组 [['祝福有福有财 ', 'https://54dc5260-2.xuweiying.cn'], ['祝福有福有财 ', 'https://54dc5260.xuweiying.cn']]
  137. :return: 更新结果
  138. """
  139. access_token = await self._get_access_token()
  140. url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_prepend"
  141. headers = {
  142. "Authorization": f"Bearer {access_token}",
  143. "Content-Type": "application/json; charset=utf-8"
  144. }
  145. payload ={
  146. "valueRange":
  147. {
  148. "range": f"{sheet_id}!{ranges}",
  149. "values": values
  150. },
  151. }
  152. async with self.session.post(url, headers=headers, json=payload,ssl=False) as response:
  153. if response.status != 200:
  154. error_text = await response.text()
  155. raise Exception(f"更新表格数据失败: {error_text}")
  156. result = await response.json()
  157. if result.get("code") != 0:
  158. raise Exception(f"更新表格数据失败: {result.get('msg')}")
  159. return result.get("data", {})
  160. async def append_values(
  161. self,
  162. spreadsheet_token: str,
  163. sheet_id: str,
  164. ranges: str,
  165. values: List[Any],
  166. ) -> Dict[str, Any]:
  167. """
  168. 异步追加数据到电子表格末尾(正序写入)
  169. :param spreadsheet_token: 电子表格token
  170. :param sheet_id: 工作表ID
  171. :param ranges: 数据范围(如"A2:H")
  172. :param values: 要追加的数据列表,必须是二维数组 [['关键词', '日期', '总分', '公众号', '视频号', '搜一搜', '直播', '小程序']]
  173. :return: 更新结果
  174. """
  175. access_token = await self._get_access_token()
  176. url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_append"
  177. headers = {
  178. "Authorization": f"Bearer {access_token}",
  179. "Content-Type": "application/json; charset=utf-8"
  180. }
  181. payload = {
  182. "valueRange": {
  183. "range": f"{sheet_id}!{ranges}",
  184. "values": values
  185. }
  186. }
  187. async with self.session.post(url, headers=headers, json=payload, ssl=False) as response:
  188. if response.status != 200:
  189. error_text = await response.text()
  190. raise Exception(f"追加表格数据失败: {error_text}")
  191. result = await response.json()
  192. if result.get("code") != 0:
  193. raise Exception(f"追加表格数据失败: {result.get('msg')}")
  194. return result.get("data", {})