Browse Source

statistics

丁云鹏 1 week ago
parent
commit
24f099817d
4 changed files with 472 additions and 0 deletions
  1. 96 0
      coze_client.py
  2. 204 0
      feishu_client.py
  3. 0 0
      fetch.py
  4. 172 0
      main.py

+ 96 - 0
coze_client.py

@@ -0,0 +1,96 @@
+# coze_client.py
+import requests
+import json
+import logging
+
+class CozeClient:
+    """
+    封装Coze API的聊天完成操作。
+    """
+    def __init__(self, api_key: str):
+        self.api_key = api_key
+        self.api_url = "https://api.coze.com/v1/chat/completions" # Coze API 聊天完成端点
+
+    def send_message(self, bot_id: str, prompt_template: str, input_data: str) -> str | None:
+        """
+        向Coze机器人发送消息并获取回复。
+        Args:
+            bot_id (str): Coze 机器人的 Bot ID。
+            prompt_template (str): Coze API的提示模板字符串,需要包含 '{input_data}' 占位符。
+            input_data (str): 实际要填充到模板中的输入数据。
+        Returns:
+            str | None: Coze的回复内容,如果失败则返回 None。
+        Raises:
+            Exception: 如果Coze API调用失败或返回非预期格式。
+        """
+        headers = {
+            "Authorization": f"Bearer {self.api_key}",
+            "Content-Type": "application/json",
+            "Accept": "application/json"
+        }
+        
+        # 填充提示模板
+        user_message_content = prompt_template.format(input_data=input_data)
+
+        payload = {
+            "model": bot_id, # 在Coze API中,bot_id 通常作为 model 参数传入
+            "messages": [
+                {"role": "user", "content": user_message_content}
+            ],
+            "stream": False # 非流式响应
+        }
+
+        try:
+            logging.info(f"正在调用 Coze Bot '{bot_id}' (输入内容前50字: '{user_message_content[:50]}...')")
+            response = requests.post(self.api_url, headers=headers, json=payload, timeout=60)
+            response.raise_for_status() # 检查HTTP响应状态码
+            result = response.json()
+
+            # 检查Coze响应结构,提取内容
+            if result.get("choices") and result["choices"][0].get("message") and result["choices"][0]["message"].get("content"):
+                coze_output_content = result["choices"][0]["message"]["content"]
+                logging.info(f"Coze API 调用成功,返回内容长度: {len(coze_output_content)}")
+                return coze_output_content
+            else:
+                logging.warning(f"Coze API 未返回有效内容。原始响应: {json.dumps(result, ensure_ascii=False, indent=2)}")
+                raise Exception(f"Coze API returned no valid content: {result}")
+
+        except requests.exceptions.RequestException as e:
+            logging.error(f"请求Coze API发生网络错误: {e}")
+            raise Exception(f"Coze API Request Failed: {e}")
+        except (IndexError, KeyError) as e:
+            logging.error(f"Coze API 响应格式错误: {e}. 原始响应: {json.dumps(result, ensure_ascii=False, indent=2) if 'result' in locals() else 'N/A'}")
+            raise Exception(f"Coze API Response Format Error: {e}")
+        except Exception as e:
+            logging.error(f"处理Coze API响应失败: {e}")
+            raise Exception(f"Coze API Response Handling Failed: {e}")
+
+# 可以在这里添加一些简单的测试代码,但通常在main.py中进行集成测试
+if __name__ == '__main__':
+    # 仅作示例,请勿在生产环境直接硬编码敏感信息
+    # 配置日志,用于独立测试
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+    print("--- Coze客户端独立测试 (请确保替换YOUR_...) ---")
+    TEST_COZE_API_KEY = "YOUR_COZE_API_KEY"
+    TEST_COZE_BOT_ID = "YOUR_COZE_BOT_ID"
+    TEST_PROMPT_TEMPLATE = "请用一句话总结以下内容: {input_data}"
+    TEST_INPUT_TEXT = "Python是一种高级编程语言,由Guido van Rossum创建,并于1991年首次发布。它以其清晰的语法和广泛的库而闻名,适用于Web开发、数据分析、人工智能等多个领域。"
+
+    if "YOUR_" in TEST_COZE_API_KEY:
+        logging.warning("请替换 coze_client.py 中的 YOUR_ 占位符为您的实际Coze API信息以运行测试。")
+    else:
+        try:
+            coze_client = CozeClient(TEST_COZE_API_KEY)
+            coze_output = coze_client.send_message(
+                TEST_COZE_BOT_ID,
+                TEST_PROMPT_TEMPLATE,
+                TEST_INPUT_TEXT
+            )
+            if coze_output:
+                print(f"\nCoze API 成功响应:\n{coze_output}")
+            else:
+                print("\nCoze API 未返回有效内容。")
+        except Exception as e:
+            logging.error(f"Coze客户端独立测试失败: {e}")
+    print("--- Coze客户端独立测试结束 ---")

+ 204 - 0
feishu_client.py

@@ -0,0 +1,204 @@
+# feishu_client.py
+import requests
+import json
+import logging
+import time
+
+class FeishuClient:
+    """
+    封装飞书多维表格的API操作。
+    负责获取Access Token,以及对多维表格的读写操作。
+    """
+    def __init__(self, app_id: str, app_secret: str):
+        self.app_id = app_id
+        self.app_secret = app_secret
+        self._access_token = None
+        self._token_expires_at = 0 # Unix timestamp, 用于缓存Token过期时间
+
+    def _get_access_token(self) -> str:
+        """
+        获取飞书应用的 Access Token。
+        如果当前token未过期,则直接返回;否则,重新请求获取。
+        Token有效期通常为2小时,这里会提前120秒刷新。
+        """
+        # 检查缓存的token是否仍然有效
+        if self._access_token and self._token_expires_at > time.time():
+            return self._access_token
+
+        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
+        headers = {"Content-Type": "application/json; charset=utf-8"}
+        payload = {
+            "app_id": self.app_id,
+            "app_secret": self.app_secret
+        }
+        try:
+            logging.info("正在请求飞书 Access Token...")
+            response = requests.post(url, headers=headers, json=payload, timeout=10)
+            response.raise_for_status() # 检查HTTP响应状态码,如果不是2xx则抛出HTTPError
+            data = response.json()
+            if data.get("code") == 0:
+                self._access_token = data.get("app_access_token")
+                # 记录过期时间,提前2分钟刷新
+                self._token_expires_at = time.time() + data.get("expire", 7200) - 120
+                logging.info("飞书 Access Token 获取成功。")
+                return self._access_token
+            else:
+                logging.error(f"获取飞书 Access Token 失败: {data.get('msg')} (Code: {data.get('code')})")
+                raise Exception(f"Feishu Token Error: {data.get('msg')}")
+        except requests.exceptions.RequestException as e:
+            logging.error(f"请求飞书 Access Token 发生网络错误: {e}")
+            raise Exception(f"Feishu Token Request Failed: {e}")
+        except Exception as e:
+            logging.error(f"处理飞书 Access Token 响应失败: {e}")
+            raise Exception(f"Feishu Token Response Handling Failed: {e}")
+
+    def read_records(self, base_id: str, table_id: str, field_names: list, page_size: int = 100) -> list:
+        """
+        从飞书多维表格读取记录。
+        Args:
+            base_id (str): 多维表格的 Base ID (应用 Token)。
+            table_id (str): 表的 Table ID。
+            field_names (list): 要读取的字段名称列表。
+            page_size (int): 每次请求的记录数 (最大500)。
+        Returns:
+            list: 包含记录字典的列表。
+        Raises:
+            Exception: 如果读取失败。
+        """
+        access_token = self._get_access_token()
+        url = (
+            f"https://open.feishu.cn/open-apis/bitable/v1/apps/{base_id}/tables/{table_id}/records"
+        )
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json"
+        }
+        params = {
+            "page_size": min(page_size, 500), # 飞书API page_size 最大500
+            "field_names": json.dumps(field_names)
+        }
+        records = []
+        try:
+            logging.info(f"正在从飞书多维表格读取数据 (表: {table_id}, 请求字段: {field_names}, 每页: {params['page_size']})...")
+            response = requests.get(url, headers=headers, params=params, timeout=30)
+            response.raise_for_status()
+            data = response.json()
+            if data.get("code") == 0:
+                records.extend(data.get("data", {}).get("items", []))
+                # 注意:这里只处理了第一页数据。如果需要读取所有数据,您需要实现分页循环逻辑
+                if data.get("data", {}).get("has_more"):
+                    logging.warning(f"飞书多维表格 (表: {table_id}) 存在更多数据未读取。当前实现仅获取了第一页。")
+                logging.info(f"成功读取 {len(records)} 条记录。")
+                return records
+            else:
+                logging.error(f"从飞书多维表格读取数据失败: {data.get('msg')} (Code: {data.get('code')})")
+                raise Exception(f"Feishu Read Error: {data.get('msg')}")
+        except requests.exceptions.RequestException as e:
+            logging.error(f"请求飞书多维表格读取数据发生网络错误: {e}")
+            raise Exception(f"Feishu Read Request Failed: {e}")
+        except Exception as e:
+            logging.error(f"处理飞书多维表格读取响应失败: {e}")
+            raise Exception(f"Feishu Read Response Handling Failed: {e}")
+
+    def update_records(self, base_id: str, table_id: str, records_data: list) -> bool:
+        """
+        更新飞书多维表格中的记录。
+        Args:
+            base_id (str): 多维表格的 Base ID。
+            table_id (str): 表的 Table ID。
+            records_data (list): 包含要更新的记录字典的列表,每个字典需包含 record_id 和 fields。
+                                 例如: [{"record_id": "recxxxxxxxx", "fields": {"字段名": "新内容"}}]
+                                 每次批量更新最多支持500条记录。
+        Returns:
+            bool: True 如果更新成功,False 如果失败。
+        Raises:
+            Exception: 如果更新失败。
+        """
+        if not records_data:
+            logging.info("没有要更新的记录,跳过写入飞书操作。")
+            return True
+
+        access_token = self._get_access_token()
+        url = (
+            f"https://open.feishu.cn/open-apis/bitable/v1/apps/{base_id}/tables/{table_id}/records"
+        )
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json"
+        }
+        
+        # 飞书批量更新API限制每批最多500条
+        chunk_size = 500
+        all_success = True
+
+        for i in range(0, len(records_data), chunk_size):
+            batch_records = records_data[i:i + chunk_size]
+            payload = {
+                "records": batch_records
+            }
+            try:
+                logging.info(f"正在将 {len(batch_records)} 条记录分批写回飞书多维表格 (表: {table_id}, 批次: {i//chunk_size + 1})...")
+                response = requests.post(url, headers=headers, json=payload, timeout=30)
+                response.raise_for_status()
+                data = response.json()
+                if data.get("code") == 0:
+                    logging.info(f"批次 {i//chunk_size + 1} 成功写入/更新 {len(batch_records)} 条记录。")
+                else:
+                    logging.error(f"将数据写入飞书多维表格失败 (批次 {i//chunk_size + 1}): {data.get('msg')} (Code: {data.get('code')})")
+                    all_success = False
+                    # 可以在这里选择抛出异常中断,或者继续处理下一批
+                    # raise Exception(f"Feishu Write Error in batch {i//chunk_size + 1}: {data.get('msg')}")
+            except requests.exceptions.RequestException as e:
+                logging.error(f"请求飞书多维表格写入数据发生网络错误 (批次 {i//chunk_size + 1}): {e}")
+                all_success = False
+                # raise Exception(f"Feishu Write Request Failed in batch {i//chunk_size + 1}: {e}")
+            except Exception as e:
+                logging.error(f"处理飞书多维表格写入响应失败 (批次 {i//chunk_size + 1}): {e}")
+                all_success = False
+                # raise Exception(f"Feishu Write Response Handling Failed in batch {i//chunk_size + 1}: {e}")
+        
+        return all_success
+
+# 可以在这里添加一些简单的测试代码,但通常在main.py中进行集成测试
+if __name__ == '__main__':
+    # 仅作示例,请勿在生产环境直接硬编码敏感信息
+    # 请替换为您的实际飞书应用信息
+    # 配置日志,用于独立测试
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+    print("--- 飞书客户端独立测试 (请确保替换YOUR_...) ---")
+    TEST_APP_ID = "YOUR_FEISHU_APP_ID"
+    TEST_APP_SECRET = "YOUR_FEISHU_APP_SECRET"
+    TEST_BASE_ID = "YOUR_FEISHU_BASE_ID"
+    TEST_TABLE_ID = "YOUR_FEISHU_TABLE_ID"
+    TEST_INPUT_FIELD = "测试文本" # 确保您的表中有此字段
+    TEST_OUTPUT_FIELD = "测试输出" # 确保您的表中有此字段
+
+    if "YOUR_" in TEST_APP_ID:
+        logging.warning("请替换 feishu_client.py 中的 YOUR_ 占位符为您的实际飞书应用信息以运行测试。")
+    else:
+        try:
+            feishu_client = FeishuClient(TEST_APP_ID, TEST_APP_SECRET)
+            
+            # 测试读取
+            print("\n--- 测试读取记录 ---")
+            read_records_data = feishu_client.read_records(TEST_BASE_ID, TEST_TABLE_ID, [TEST_INPUT_FIELD, TEST_OUTPUT_FIELD], page_size=2)
+            for record in read_records_data:
+                print(f"Record ID: {record.get('record_id')}, Fields: {record.get('fields')}")
+
+            # 测试写入/更新 (仅更新第一条记录的某个字段)
+            if read_records_data:
+                print("\n--- 测试更新记录 ---")
+                first_record_id = read_records_data[0]['record_id']
+                update_payload = [{
+                    "record_id": first_record_id,
+                    "fields": {TEST_OUTPUT_FIELD: f"Updated by test at {time.strftime('%Y-%m-%d %H:%M:%S')}"}
+                }]
+                update_success = feishu_client.update_records(TEST_BASE_ID, TEST_TABLE_ID, update_payload)
+                print(f"更新操作是否成功: {update_success}")
+            else:
+                print("\n没有可读取的记录,跳过更新测试。")
+
+        except Exception as e:
+            logging.error(f"飞书客户端独立测试失败: {e}")
+    print("--- 飞书客户端独立测试结束 ---")

+ 0 - 0
fetch.py


+ 172 - 0
main.py

@@ -0,0 +1,172 @@
+# main.py
+import logging
+import os
+from feishu_client import FeishuClient
+from coze_client import CozeClient
+
+# 配置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+def process_feishu_data_with_coze_flow(
+    feishu_app_id: str,
+    feishu_app_secret: str,
+    feishu_base_id: str,
+    feishu_table_id: str,
+    feishu_input_field_name: str,
+    feishu_output_field_name: str,
+    coze_api_key: str,
+    coze_bot_id: str,
+    coze_prompt_template: str,
+    max_records_to_process: int = 50, # 每次处理的记录数
+    overwrite_existing_output: bool = True # 是否覆盖已有的输出字段内容
+) -> None:
+    """
+    从飞书多维表格读取数据,调用Coze API进行处理,并将结果写入多维表格。
+    这是整个业务流程的协调函数。
+
+    Args:
+        feishu_app_id (str): 飞书应用的 App ID。
+        feishu_app_secret (str): 飞书应用的 App Secret。
+        feishu_base_id (str): 飞书多维表格的 Base ID (应用 Token)。
+        feishu_table_id (str): 多维表格中要操作的表的 Table ID。
+        feishu_input_field_name (str): 多维表格中用于输入给Coze的字段名称。
+        feishu_output_field_name (str): 多维表格中用于存储Coze返回结果的字段名称。
+        coze_api_key (str): Coze API 密钥。
+        coze_bot_id (str): Coze 机器人的 Bot ID。
+        coze_prompt_template (str): Coze API的提示模板字符串,需要包含 '{input_data}' 占位符。
+                                   例如: "请总结以下文本的关键信息: {input_data}"
+        max_records_to_process (int): 每次函数调用最多从飞书读取并处理的记录数。
+        overwrite_existing_output (bool): 如果输出字段已有内容,是否覆盖。True为覆盖,False为跳过。
+    """
+    logging.info("--- 🚀 开始执行飞书数据与Coze交互流程 🚀 ---")
+
+    try:
+        # 初始化客户端
+        feishu_client = FeishuClient(feishu_app_id, feishu_app_secret)
+        coze_client = CozeClient(coze_api_key)
+
+        # 1. 从飞书多维表格读取数据
+        logging.info("阶段 1/3: 从飞书多维表格读取数据...")
+        read_field_names = [feishu_input_field_name, feishu_output_field_name]
+        raw_records = feishu_client.read_records(
+            feishu_base_id,
+            feishu_table_id,
+            read_field_names,
+            page_size=max_records_to_process # 限制读取数量,防止一次性处理过多
+        )
+
+        records_to_process = []
+        for record in raw_records:
+            record_id = record.get("record_id")
+            fields = record.get("fields", {})
+            input_value = fields.get(feishu_input_field_name)
+            output_value_existing = fields.get(feishu_output_field_name)
+
+            # 筛选需要处理的记录
+            if input_value is None or str(input_value).strip() == "":
+                logging.debug(f"记录 {record_id} 的输入字段 '{feishu_input_field_name}' 为空,跳过。")
+                continue
+            if not overwrite_existing_output and output_value_existing is not None and str(output_value_existing).strip() != "":
+                logging.info(f"记录 {record_id} 的输出字段 '{feishu_output_field_name}' 已有内容且设置为不覆盖,跳过。")
+                continue
+            
+            records_to_process.append({
+                "record_id": record_id,
+                "input_data": input_value
+            })
+        
+        if not records_to_process:
+            logging.info("没有符合条件的记录需要处理,流程结束。")
+            return
+
+        logging.info(f"共筛选出 {len(records_to_process)} 条记录待Coze处理。")
+
+        # 2. 调用 Coze API 处理数据
+        logging.info("阶段 2/3: 调用Coze API处理数据...")
+        updated_feishu_records = []
+        for i, record_info in enumerate(records_to_process):
+            record_id = record_info["record_id"]
+            input_data = record_info["input_data"]
+            
+            logging.info(f"正在处理第 {i+1}/{len(records_to_process)} 条记录 (ID: {record_id})...")
+            try:
+                coze_output = coze_client.send_message(
+                    coze_bot_id,
+                    coze_prompt_template,
+                    str(input_data) # 确保输入是字符串
+                )
+                
+                if coze_output:
+                    updated_feishu_records.append({
+                        "record_id": record_id,
+                        "fields": {
+                            feishu_output_field_name: coze_output
+                        }
+                    })
+                else:
+                    logging.warning(f"Coze API 返回空结果给记录 {record_id},不更新此记录。")
+
+            except Exception as e:
+                logging.error(f"处理记录 {record_id} 时调用Coze API失败: {e}。该记录将被跳过。")
+                # 可以在这里记录到单独的错误日志或错误字段
+
+        if not updated_feishu_records:
+            logging.info("没有记录成功通过 Coze API 处理并准备更新,无需写入飞书。流程结束。")
+            return
+
+        # 3. 将结果写入飞书多维表格
+        logging.info("阶段 3/3: 将处理结果写回飞书多维表格...")
+        feishu_client.update_records(
+            feishu_base_id,
+            feishu_table_id,
+            updated_feishu_records
+        )
+        
+        logging.info("--- ✅ 流程执行完毕 ✅ ---")
+
+    except Exception as e:
+        logging.critical(f"主流程执行过程中发生致命错误: {e}")
+        logging.critical("请检查配置信息、网络连接、API权限以及日志中的详细错误信息。")
+
+
+if __name__ == "__main__":
+    # --- 环境变量/配置信息加载 ---
+    # 推荐使用环境变量加载敏感信息,而不是硬编码。
+    # 例如:export FEISHU_APP_ID="your_id"
+    # 或者从配置文件 (如 config.ini, .env 文件) 中加载
+
+    # 飞书配置
+    FEISHU_APP_ID = os.getenv("FEISHU_APP_ID", "YOUR_FEISHU_APP_ID")
+    FEISHU_APP_SECRET = os.getenv("FEISHU_APP_SECRET", "YOUR_FEISHU_APP_SECRET")
+    FEISHU_BASE_ID = os.getenv("FEISHU_BASE_ID", "YOUR_FEISHU_BASE_ID")
+    FEISHU_TABLE_ID = os.getenv("FEISHU_TABLE_ID", "YOUR_FEISHU_TABLE_ID")
+
+    FEISHU_INPUT_FIELD = os.getenv("FEISHU_INPUT_FIELD", "文本内容") # 你的飞书表格中用于输入的列名
+    FEISHU_OUTPUT_FIELD = os.getenv("FEISHU_OUTPUT_FIELD", "Coze总结") # 你的飞书表格中用于输出的列名
+
+    # Coze 配置
+    COZE_API_KEY = os.getenv("COZE_API_KEY", "YOUR_COZE_API_KEY")
+    COZE_BOT_ID = os.getenv("COZE_BOT_ID", "YOUR_COZE_BOT_ID") # 例如: "7343685511394590740"
+
+    # Coze 提示模板,请确保包含 {input_data} 占位符
+    # 这是一个示例,你可以根据你的机器人功能设计更复杂的提示
+    COZE_PROMPT_TEMPLATE = os.getenv("COZE_PROMPT_TEMPLATE", "请作为一位专业的编辑,总结以下文章的核心内容,要求言简意赅,200字以内: {input_data}")
+
+    # --- 执行流程 ---
+    if "YOUR_" in FEISHU_APP_ID or "YOUR_" in COZE_API_KEY:
+        logging.error("⛔️ 请检查 main.py 或环境变量,确保所有 'YOUR_' 占位符都已替换为您的实际配置信息!⛔️")
+        logging.error("流程未执行。")
+    else:
+        process_feishu_data_with_coze_flow(
+            feishu_app_id=FEISHU_APP_ID,
+            feishu_app_secret=FEISHU_APP_SECRET,
+            feishu_base_id=FEISHU_BASE_ID,
+            feishu_table_id=FEISHU_TABLE_ID,
+            feishu_input_field_name=FEISHU_INPUT_FIELD,
+            feishu_output_field_name=FEISHU_OUTPUT_FIELD,
+            coze_api_key=COZE_API_KEY,
+            coze_bot_id=COZE_BOT_ID,
+            coze_prompt_template=COZE_PROMPT_TEMPLATE,
+            max_records_to_process=10, # 每次运行最多处理10条记录
+            overwrite_existing_output=True # 总是覆盖输出字段
+        )