jihuaqiang 1 Minggu lalu
induk
melakukan
287ab293ab
4 mengubah file dengan 428 tambahan dan 3 penghapusan
  1. 6 3
      .env
  2. 30 0
      coze/coze_hook.py
  3. 131 0
      xhs/README.md
  4. 261 0
      xhs/content_identify.py

+ 6 - 3
.env

@@ -1,11 +1,14 @@
-# 飞书
+# 飞书基础配置,无需修改
 FEISHU_APP_ID=cli_a76c35b8fa7d500c
 FEISHU_APP_SECRET=xHpF7H9nBwXeCH2Z1YRDScWSXzyktq36
 FEISHU_FILE_TOKEN=VEBsbCfaWa3gF3slQILc6Rybnde
 
+# 飞书表格配置
+FEISHU_TABLE_ID=tblNdje7z6Cf3hax
+
 # 扣子
-COZE_API_KEY=
-COZE_BOT_ID=
+COZE_API_KEY=pat_pClXS15hyuqohC9TK58vU7130Hp6QmmHlnyW2TjFpKVWKsW2B1VniFwdXkY3eRNB
+COZE_BOT_ID=7535375645057351720
 
 # Gemini
 GEMINI_API_KEY=AIzaSyC0J8gtl5I6-nu6fgvQrfnWkw0QIzfXEWE

+ 30 - 0
coze/coze_hook.py

@@ -0,0 +1,30 @@
+import requests
+import json
+import os
+
+class CozeHook(object):
+    def __init__(self):
+        self.url = "https://api.coze.cn/v1/workflow/run"
+        self.headers = {
+            "Content-Type": "application/json",
+            "Authorization": "Bearer " + os.getenv('COZE_API_KEY')
+        }
+        self.hook_id = os.getenv('COZE_BOT_ID')
+    def call_coze_api(self, title, body_text, image_url_list, hook_id=None):
+        url = self.url
+        headers = self.headers
+        payload = {
+            "workflow_id": hook_id or self.hook_id,
+            "parameters": {
+                "title": title,
+                "body_text": body_text,
+                "image_url_list": image_url_list
+            }
+        }
+        response = requests.post(url, json=payload, headers=headers, timeout=600)
+        response.raise_for_status()
+        return response.json()
+    
+    def run(self, title, body_text, image_url_list): 
+        resp = self.call_coze_api(title, body_text, image_url_list)
+        return resp

+ 131 - 0
xhs/README.md

@@ -0,0 +1,131 @@
+# 内容识别脚本使用说明
+
+## 功能描述
+
+`content_identify.py` 脚本实现了以下功能:
+
+1. **读取飞书表格数据**:从指定的飞书多维表格中读取数据
+2. **调用Coze工作流**:对每条记录调用Coze工作流进行内容识别
+3. **结果写回**:将Coze返回的识别结果写回飞书表格
+
+## 环境配置
+
+### 必需的环境变量
+
+在 `.env` 文件中设置以下环境变量:
+
+```bash
+# 飞书配置
+FEISHU_APP_ID=your_app_id
+FEISHU_APP_SECRET=your_app_secret
+FEISHU_FILE_TOKEN=your_file_token
+FEISHU_TABLE_ID=your_table_id
+
+# 可选:自定义字段名称
+FEISHU_INPUT_FIELD=抓取结果      # 输入内容字段名
+FEISHU_OUTPUT_FIELD=识别结果     # 输出结果字段名
+FEISHU_TITLE_FIELD=标题          # 标题字段名
+FEISHU_IMAGE_FIELD=图片链接      # 图片链接字段名
+```
+
+### 飞书表格结构
+
+飞书表格应包含以下字段:
+- **标题**:内容的标题
+- **抓取结果**:需要识别的内容文本
+- **图片链接**:相关的图片URL列表
+- **识别结果**:Coze工作流的识别结果(会自动填充)
+
+## 使用方法
+
+### 1. 直接运行脚本
+
+```bash
+cd xhs
+python content_identify.py
+```
+
+### 2. 在代码中调用
+
+```python
+from xhs.content_identify import ContentIdentifier
+
+# 创建实例
+identifier = ContentIdentifier()
+
+# 处理所有记录
+identifier.process_all_records()
+```
+
+### 3. 运行测试脚本
+
+在运行主脚本之前,建议先运行测试脚本验证配置:
+
+```bash
+cd xhs
+python test_content_identify.py
+```
+
+测试脚本会检查:
+- 环境变量配置
+- 飞书API连接
+- Coze API连接
+- ContentIdentifier类初始化
+
+## 脚本特性
+
+### 智能处理
+- **跳过已处理记录**:如果某条记录已有识别结果,会自动跳过
+- **空内容检查**:没有输入内容的记录会被跳过
+- **分页处理**:支持大量数据的分页处理
+
+### 错误处理
+- **API调用失败**:Coze API调用失败时会记录错误信息
+- **数据提取失败**:无法提取数据时会记录详细错误
+- **网络异常**:网络问题时会自动重试
+
+### 性能优化
+- **API限制**:每次调用后添加1秒延迟,避免触发API限制
+- **批量处理**:支持批量获取和更新记录
+- **内存优化**:分页处理避免内存溢出
+
+## 输出日志
+
+脚本运行时会输出详细的处理日志:
+
+```
+开始处理飞书表格 tblxxxxxxxxx 中的所有记录
+获取到 10 条记录
+处理记录 recxxxxxxxxx
+标题: 这是一个测试标题...
+内容长度: 1234 字符
+图片数量: 3
+正在调用Coze工作流,标题: 这是一个测试标题...
+Coze工作流调用成功
+已更新记录 recxxxxxxxxx
+处理完成!总共处理 10 条记录,成功 10 条
+```
+
+## 故障排除
+
+### 常见问题
+
+1. **环境变量未设置**
+   - 确保所有必需的环境变量都已正确设置
+   - 检查 `.env` 文件是否存在且格式正确
+
+2. **飞书API权限问题**
+   - 检查 `FEISHU_APP_ID` 和 `FEISHU_APP_SECRET` 是否正确
+   - 确认应用有访问多维表格的权限
+
+3. **Coze API调用失败**
+   - 检查Coze工作流ID是否正确
+   - 确认API密钥有效且有调用权限
+
+4. **字段名称不匹配**
+   - 检查飞书表格中的字段名称是否与配置一致
+   - 可以通过环境变量自定义字段名称
+
+### 调试模式
+
+如需更详细的调试信息,可以修改脚本中的日志级别或添加更多打印语句。 

+ 261 - 0
xhs/content_identify.py

@@ -0,0 +1,261 @@
+import os
+import json
+import time
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+
+# 导入自定义模块
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.fei_shu import FeiShu
+from coze.coze_hook import CozeHook
+
+
+class ContentIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 初始化飞书客户端
+        self.feishu = FeiShu()
+        
+        # 初始化Coze客户端
+        self.coze = CozeHook()
+        
+        # 从环境变量获取配置
+        self.table_id = os.getenv('FEISHU_TABLE_ID')
+        if not self.table_id:
+            raise ValueError("请设置环境变量 FEISHU_TABLE_ID")
+        
+        # 字段名称配置
+        self.input_field = os.getenv('FEISHU_INPUT_FIELD', '抓取结果')
+        self.output_field = os.getenv('FEISHU_OUTPUT_FIELD', '识别结果')
+        
+    def extract_content_from_record(self, record) -> Dict[str, Any]:
+        """从飞书记录中提取内容"""
+        fields = record.fields
+        
+        # 提取抓取结果
+        crawl_result = fields.get(self.input_field, '')
+        title = ''
+        body_text = ''
+        image_url_list = []
+        
+        # 解析抓取结果
+        if crawl_result:
+            if isinstance(crawl_result, list) and len(crawl_result) > 0:
+                # 如果是数组格式,取第一个元素
+                crawl_data = crawl_result[0]
+                if isinstance(crawl_data, dict) and 'text' in crawl_data:
+                    try:
+                        # 解析JSON字符串
+                        json_data = json.loads(crawl_data['text'])
+                        
+                        # 提取标题
+                        title = json_data.get('title', '')
+                        
+                        # 提取正文内容
+                        body_text = json_data.get('body_text', '')
+                        
+                        # 提取图片链接
+                        image_data_list = json_data.get('image_url_list', [])
+                        for img_data in image_data_list:
+                            if isinstance(img_data, dict) and 'image_url' in img_data:
+                                image_url_list.append(img_data['image_url'])
+                        
+                    except json.JSONDecodeError as e:
+                        print(f"解析抓取结果JSON失败: {e}")
+                        # 如果解析失败,尝试直接使用文本内容
+                        if isinstance(crawl_data, dict) and 'text' in crawl_data:
+                            body_text = crawl_data['text']
+            elif isinstance(crawl_result, str):
+                # 如果是字符串格式,尝试直接解析
+                try:
+                    json_data = json.loads(crawl_result)
+                    title = json_data.get('title', '')
+                    body_text = json_data.get('body_text', '')
+                    image_data_list = json_data.get('image_url_list', [])
+                    for img_data in image_data_list:
+                        if isinstance(img_data, dict) and 'image_url' in img_data:
+                            image_url_list.append(img_data['image_url'])
+                except json.JSONDecodeError:
+                    body_text = crawl_result
+        
+        return {
+            'title': title,
+            'body_text': body_text,
+            'image_url_list': image_url_list,
+            'record_id': record.record_id
+        }
+    
+    def call_coze_workflow(self, title: str, body_text: str, image_url_list: List[str]) -> Dict[str, Any]:
+        """调用Coze工作流"""
+        try:
+            print(f"正在调用Coze工作流,标题: {title[:50]}...")
+            response = self.coze.run(title, body_text, image_url_list)
+            print("Coze工作流调用成功")
+            return response
+        except Exception as e:
+            print(f"调用Coze工作流失败: {e}")
+            return {"data": "{}"}
+    
+    def extract_coze_result(self, coze_response: Dict[str, Any]) -> Dict[str, str]:
+        """
+        从API响应中提取images_comprehension、title、body_text字段
+        """
+        try:
+            # 获取data字段
+            data = coze_response.get("data")
+            if not data:
+                print("响应中没有data字段")
+                return {"images_comprehension": "", "title": "", "body_text": ""}
+            
+            # 解析data字段(它是JSON字符串)
+            if isinstance(data, str):
+                try:
+                    data = json.loads(data)
+                except json.JSONDecodeError as e:
+                    print(f"data字段JSON解析失败: {e}")
+                    return {"images_comprehension": "", "title": "", "body_text": ""}
+            
+            # 从解析后的data中提取字段
+            extracted_fields = {
+                "images_comprehension": data.get("images_comprehension", ""),
+                "title": data.get("title", ""),
+                "body_text": data.get("body_text", "")
+            }
+            
+            return extracted_fields
+            
+        except Exception as e:
+            print(f"提取Coze结果失败: {e}")
+            return {"images_comprehension": "", "title": "", "body_text": ""}
+    
+    def update_feishu_record(self, record_id: str, result_dict: Dict[str, Any]):
+        """更新飞书表格中的记录"""
+        try:
+            import lark_oapi as lark
+                        # 创建更新记录
+            update_record = (lark.bitable.v1.AppTableRecord.builder()
+                           .record_id(record_id)
+                           .fields({
+                               self.output_field: json.dumps({
+                                    'images_comprehension': result_dict.get('images_comprehension', ''),
+                                    'title': result_dict.get('title', ''),
+                                    'body_text': result_dict.get('body_text', '')
+                                }, ensure_ascii=False)
+                           })
+                           .build())
+            
+            # 执行更新
+            self.feishu.update_record(self.table_id, update_record)
+            print(f"已更新记录 {record_id}")
+            
+        except Exception as e:
+            print(f"更新飞书记录失败: {e}")
+    
+    def process_single_record(self, record) -> bool:
+        """处理单条记录"""
+        try:
+            # 提取内容
+            content = self.extract_content_from_record(record)
+            
+            # 检查是否已经有识别结果
+            fields = record.fields
+            existing_result = fields.get(self.output_field, '')
+            
+            # 如果已有识别结果,则跳过
+            if existing_result and existing_result.strip():
+                try:
+                    # 尝试解析JSON,如果成功说明已有有效结果
+                    json.loads(existing_result)
+                    print(f"记录 {record.record_id} 已有识别结果,跳过")
+                    return True
+                except json.JSONDecodeError:
+                    # 如果JSON解析失败,说明可能是旧格式,继续处理
+                    pass
+            
+            # 检查是否有输入内容
+            if not content['body_text'] or not content['body_text'].strip():
+                print(f"记录 {record.record_id} 没有输入内容,跳过")
+                return True
+            
+            print(f"处理记录 {record.record_id}")
+            print(f"标题: {content['title'][:50]}...")
+            print(f"内容长度: {len(content['body_text'])} 字符")
+            print(f"图片数量: {len(content['image_url_list'])}")
+            
+            # 调用Coze工作流
+            coze_response = self.call_coze_workflow(
+                content['title'],
+                content['body_text'],
+                content['image_url_list']
+            )
+            
+            # 提取结果
+            result_dict = self.extract_coze_result(coze_response)
+            
+            # 更新飞书表格
+            self.update_feishu_record(record.record_id, result_dict)
+            
+            # 添加延迟避免API限制
+            time.sleep(1)
+            
+            return True
+            
+        except Exception as e:
+            print(f"处理记录 {record.record_id} 失败: {e}")
+            return False
+    
+    def process_all_records(self):
+        """处理所有记录"""
+        print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
+        
+        page_token = None
+        total_processed = 0
+        total_success = 0
+        
+        while True:
+            try:
+                # 获取记录
+                result = self.feishu.get_all_records(self.table_id, page_token)
+                
+                if not result.items:
+                    print("没有找到记录")
+                    break
+                
+                print(f"获取到 {len(result.items)} 条记录")
+                
+                # 处理每条记录
+                for record in result.items:
+                    total_processed += 1
+                    if self.process_single_record(record):
+                        total_success += 1
+                
+                # 检查是否有下一页
+                if not result.has_more:
+                    break
+                
+                page_token = result.page_token
+                print(f"继续获取下一页,token: {page_token}")
+                
+            except Exception as e:
+                print(f"获取记录失败: {e}")
+                break
+        
+        print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
+
+
+def main():
+    """主函数"""
+    try:
+        identifier = ContentIdentifier()
+        identifier.process_all_records()
+    except Exception as e:
+        print(f"程序执行失败: {e}")
+
+
+if __name__ == "__main__":
+    main()