# feishu_client.py import requests import json import logging import time class FeishuClient: """ 封装飞书多维表格的API操作。 负责获取Access Token,以及对多维表格的读写操作。 """ def __init__(self, app_id: str, app_secret: str): self.app_id = app_id self.app_secret = app_secret self._access_token = None self._token_expires_at = 0 # Unix timestamp, 用于缓存Token过期时间 def _get_access_token(self) -> str: """ 获取飞书应用的 Access Token。 如果当前token未过期,则直接返回;否则,重新请求获取。 Token有效期通常为2小时,这里会提前120秒刷新。 """ # 检查缓存的token是否仍然有效 if self._access_token and self._token_expires_at > time.time(): return self._access_token url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" headers = {"Content-Type": "application/json; charset=utf-8"} payload = { "app_id": self.app_id, "app_secret": self.app_secret } try: logging.info("正在请求飞书 Access Token...") response = requests.post(url, headers=headers, json=payload, timeout=10) response.raise_for_status() # 检查HTTP响应状态码,如果不是2xx则抛出HTTPError data = response.json() if data.get("code") == 0: self._access_token = data.get("app_access_token") # 记录过期时间,提前2分钟刷新 self._token_expires_at = time.time() + data.get("expire", 7200) - 120 logging.info("飞书 Access Token 获取成功。") return self._access_token else: logging.error(f"获取飞书 Access Token 失败: {data.get('msg')} (Code: {data.get('code')})") raise Exception(f"Feishu Token Error: {data.get('msg')}") except requests.exceptions.RequestException as e: logging.error(f"请求飞书 Access Token 发生网络错误: {e}") raise Exception(f"Feishu Token Request Failed: {e}") except Exception as e: logging.error(f"处理飞书 Access Token 响应失败: {e}") raise Exception(f"Feishu Token Response Handling Failed: {e}") def read_records(self, base_id: str, table_id: str, field_names: list, page_size: int = 100) -> list: """ 从飞书多维表格读取记录。 Args: base_id (str): 多维表格的 Base ID (应用 Token)。 table_id (str): 表的 Table ID。 field_names (list): 要读取的字段名称列表。 page_size (int): 每次请求的记录数 (最大500)。 Returns: list: 包含记录字典的列表。 Raises: Exception: 如果读取失败。 """ access_token = self._get_access_token() url = ( f"https://open.feishu.cn/open-apis/bitable/v1/apps/{base_id}/tables/{table_id}/records" ) headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } params = { "page_size": min(page_size, 500), # 飞书API page_size 最大500 "field_names": json.dumps(field_names) } records = [] try: logging.info(f"正在从飞书多维表格读取数据 (表: {table_id}, 请求字段: {field_names}, 每页: {params['page_size']})...") response = requests.get(url, headers=headers, params=params, timeout=30) response.raise_for_status() data = response.json() if data.get("code") == 0: records.extend(data.get("data", {}).get("items", [])) # 注意:这里只处理了第一页数据。如果需要读取所有数据,您需要实现分页循环逻辑 if data.get("data", {}).get("has_more"): logging.warning(f"飞书多维表格 (表: {table_id}) 存在更多数据未读取。当前实现仅获取了第一页。") logging.info(f"成功读取 {len(records)} 条记录。") return records else: logging.error(f"从飞书多维表格读取数据失败: {data.get('msg')} (Code: {data.get('code')})") raise Exception(f"Feishu Read Error: {data.get('msg')}") except requests.exceptions.RequestException as e: logging.error(f"请求飞书多维表格读取数据发生网络错误: {e}") raise Exception(f"Feishu Read Request Failed: {e}") except Exception as e: logging.error(f"处理飞书多维表格读取响应失败: {e}") raise Exception(f"Feishu Read Response Handling Failed: {e}") def update_records(self, base_id: str, table_id: str, records_data: list) -> bool: """ 更新飞书多维表格中的记录。 Args: base_id (str): 多维表格的 Base ID。 table_id (str): 表的 Table ID。 records_data (list): 包含要更新的记录字典的列表,每个字典需包含 record_id 和 fields。 例如: [{"record_id": "recxxxxxxxx", "fields": {"字段名": "新内容"}}] 每次批量更新最多支持500条记录。 Returns: bool: True 如果更新成功,False 如果失败。 Raises: Exception: 如果更新失败。 """ if not records_data: logging.info("没有要更新的记录,跳过写入飞书操作。") return True access_token = self._get_access_token() url = ( f"https://open.feishu.cn/open-apis/bitable/v1/apps/{base_id}/tables/{table_id}/records" ) headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } # 飞书批量更新API限制每批最多500条 chunk_size = 500 all_success = True for i in range(0, len(records_data), chunk_size): batch_records = records_data[i:i + chunk_size] payload = { "records": batch_records } try: logging.info(f"正在将 {len(batch_records)} 条记录分批写回飞书多维表格 (表: {table_id}, 批次: {i//chunk_size + 1})...") response = requests.post(url, headers=headers, json=payload, timeout=30) response.raise_for_status() data = response.json() if data.get("code") == 0: logging.info(f"批次 {i//chunk_size + 1} 成功写入/更新 {len(batch_records)} 条记录。") else: logging.error(f"将数据写入飞书多维表格失败 (批次 {i//chunk_size + 1}): {data.get('msg')} (Code: {data.get('code')})") all_success = False # 可以在这里选择抛出异常中断,或者继续处理下一批 # raise Exception(f"Feishu Write Error in batch {i//chunk_size + 1}: {data.get('msg')}") except requests.exceptions.RequestException as e: logging.error(f"请求飞书多维表格写入数据发生网络错误 (批次 {i//chunk_size + 1}): {e}") all_success = False # raise Exception(f"Feishu Write Request Failed in batch {i//chunk_size + 1}: {e}") except Exception as e: logging.error(f"处理飞书多维表格写入响应失败 (批次 {i//chunk_size + 1}): {e}") all_success = False # raise Exception(f"Feishu Write Response Handling Failed in batch {i//chunk_size + 1}: {e}") return all_success # 可以在这里添加一些简单的测试代码,但通常在main.py中进行集成测试 if __name__ == '__main__': # 仅作示例,请勿在生产环境直接硬编码敏感信息 # 请替换为您的实际飞书应用信息 # 配置日志,用于独立测试 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') print("--- 飞书客户端独立测试 (请确保替换YOUR_...) ---") TEST_APP_ID = "YOUR_FEISHU_APP_ID" TEST_APP_SECRET = "YOUR_FEISHU_APP_SECRET" TEST_BASE_ID = "YOUR_FEISHU_BASE_ID" TEST_TABLE_ID = "YOUR_FEISHU_TABLE_ID" TEST_INPUT_FIELD = "测试文本" # 确保您的表中有此字段 TEST_OUTPUT_FIELD = "测试输出" # 确保您的表中有此字段 if "YOUR_" in TEST_APP_ID: logging.warning("请替换 feishu_client.py 中的 YOUR_ 占位符为您的实际飞书应用信息以运行测试。") else: try: feishu_client = FeishuClient(TEST_APP_ID, TEST_APP_SECRET) # 测试读取 print("\n--- 测试读取记录 ---") read_records_data = feishu_client.read_records(TEST_BASE_ID, TEST_TABLE_ID, [TEST_INPUT_FIELD, TEST_OUTPUT_FIELD], page_size=2) for record in read_records_data: print(f"Record ID: {record.get('record_id')}, Fields: {record.get('fields')}") # 测试写入/更新 (仅更新第一条记录的某个字段) if read_records_data: print("\n--- 测试更新记录 ---") first_record_id = read_records_data[0]['record_id'] update_payload = [{ "record_id": first_record_id, "fields": {TEST_OUTPUT_FIELD: f"Updated by test at {time.strftime('%Y-%m-%d %H:%M:%S')}"} }] update_success = feishu_client.update_records(TEST_BASE_ID, TEST_TABLE_ID, update_payload) print(f"更新操作是否成功: {update_success}") else: print("\n没有可读取的记录,跳过更新测试。") except Exception as e: logging.error(f"飞书客户端独立测试失败: {e}") print("--- 飞书客户端独立测试结束 ---")