丁云鹏 před 1 měsícem
rodič
revize
01b0aa1015
6 změnil soubory, kde provedl 154 přidání a 350 odebrání
  1. 30 163
      3_handle.py
  2. 5 0
      README.md
  3. 2 11
      config.py
  4. 39 165
      gemini.py
  5. 6 11
      utils/fei_shu.py
  6. 72 0
      utils/file.py

+ 30 - 163
3_handle.py

@@ -4,136 +4,44 @@ import time
 import sys
 import argparse
 from typing import Dict, Any, List, Optional
-from dotenv import load_dotenv
 
 # 导入自定义模块
 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
 from utils.fei_shu import FeiShu
-from coze.coze_hook import CozeHook
+from gemini import GeminiProcessor
+from utils.file import File
 
 
-class ContentIdentifier:
+class Handler:
     def __init__(self, table_id: Optional[str] = None):
-        # 加载环境变量
-        load_dotenv()
         
         # 初始化飞书客户端
-        self.feishu = FeiShu()
+        self.feishu = FeiShu(file_token='VEBsbCfaWa3gF3slQILc6Rybnde')
+        self.processor = GeminiProcessor()
         
-        # 初始化Coze客户端
-        self.coze = CozeHook()
         
         # 获取表格ID:优先使用传入的参数,其次使用环境变量
-        self.table_id = table_id or os.getenv('FEISHU_TABLE_ID')
-        if not self.table_id:
-            raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数")
-        
+        self.table_id = table_id
         # 字段名称配置
-        self.input_field = os.getenv('FEISHU_INPUT_FIELD', '抓取结果')
-        self.output_field = os.getenv('FEISHU_OUTPUT_FIELD', '识别结果')
-        
+        self.input_field = '识别结果'
+        self.output_field = '初步理解'
+
+        self.system_prompt = File.read_file('prompt/handle.md')
+
+        print(self.system_prompt)
+
     def extract_content_from_record(self, record) -> Dict[str, Any]:
         """从飞书记录中提取内容"""
         fields = record.fields
         
-        # 提取抓取结果
-        crawl_result = fields.get(self.input_field, '')
-        title = ''
-        body_text = ''
-        image_url_list = []
-        
-        # 解析抓取结果
-        if crawl_result:
-            if isinstance(crawl_result, list) and len(crawl_result) > 0:
-                # 如果是数组格式,取第一个元素
-                crawl_data = crawl_result[0]
-                if isinstance(crawl_data, dict) and 'text' in crawl_data:
-                    try:
-                        # 解析JSON字符串
-                        json_data = json.loads(crawl_data['text'])
-                        
-                        # 提取标题
-                        title = json_data.get('title', '')
-                        
-                        # 提取正文内容
-                        body_text = json_data.get('body_text', '')
-                        
-                        # 提取图片链接
-                        image_data_list = json_data.get('image_url_list', [])
-                        for img_data in image_data_list:
-                            if isinstance(img_data, dict) and 'image_url' in img_data:
-                                image_url_list.append(img_data['image_url'])
-                        
-                    except json.JSONDecodeError as e:
-                        print(f"解析抓取结果JSON失败: {e}")
-                        # 如果解析失败,尝试直接使用文本内容
-                        if isinstance(crawl_data, dict) and 'text' in crawl_data:
-                            body_text = crawl_data['text']
-            elif isinstance(crawl_result, str):
-                # 如果是字符串格式,尝试直接解析
-                try:
-                    json_data = json.loads(crawl_result)
-                    title = json_data.get('title', '')
-                    body_text = json_data.get('body_text', '')
-                    image_data_list = json_data.get('image_url_list', [])
-                    for img_data in image_data_list:
-                        if isinstance(img_data, dict) and 'image_url' in img_data:
-                            image_url_list.append(img_data['image_url'])
-                except json.JSONDecodeError:
-                    body_text = crawl_result
+        # 提取识别结果
+        result = fields.get(self.input_field, [])
+
+        return ''.join([item['text'] for item in result])
         
-        return {
-            'title': title,
-            'body_text': body_text,
-            'image_url_list': image_url_list,
-            'record_id': record.record_id
-        }
     
-    def call_coze_workflow(self, title: str, body_text: str, image_url_list: List[str]) -> Dict[str, Any]:
-        """调用Coze工作流"""
-        try:
-            print(f"正在调用Coze工作流,标题: {title[:50]}...")
-            response = self.coze.run(title, body_text, image_url_list)
-            print("Coze工作流调用成功")
-            return response
-        except Exception as e:
-            print(f"调用Coze工作流失败: {e}")
-            return {"data": "{}"}
-    
-    def extract_coze_result(self, coze_response: Dict[str, Any]) -> Dict[str, str]:
-        """
-        从API响应中提取images_comprehension、title、body_text字段
-        """
-        try:
-            # 获取data字段
-            data = coze_response.get("data")
-            if not data:
-                print("响应中没有data字段")
-                return {"images_comprehension": "", "title": "", "body_text": ""}
-            
-            # 解析data字段(它是JSON字符串)
-            if isinstance(data, str):
-                try:
-                    data = json.loads(data)
-                except json.JSONDecodeError as e:
-                    print(f"data字段JSON解析失败: {e}")
-                    return {"images_comprehension": "", "title": "", "body_text": ""}
-            
-            # 从解析后的data中提取字段
-            extracted_fields = {
-                "images_comprehension": data.get("images_comprehension", ""),
-                "title": data.get("title", ""),
-                "body_text": data.get("body_text", "")
-            }
-            
-            return extracted_fields
-            
-        except Exception as e:
-            print(f"提取Coze结果失败: {e}")
-            return {"images_comprehension": "", "title": "", "body_text": ""}
-    
-    def update_feishu_record(self, record_id: str, result_dict: Dict[str, Any]):
+    def update_feishu_record(self, record_id: str, content: str):
         """更新飞书表格中的记录"""
         try:
             import lark_oapi as lark
@@ -141,11 +49,7 @@ class ContentIdentifier:
             update_record = (lark.bitable.v1.AppTableRecord.builder()
                            .record_id(record_id)
                            .fields({
-                               self.output_field: json.dumps({
-                                    'images_comprehension': result_dict.get('images_comprehension', ''),
-                                    'title': result_dict.get('title', ''),
-                                    'body_text': result_dict.get('body_text', '')
-                                }, ensure_ascii=False)
+                               self.output_field: content
                            })
                            .build())
             
@@ -161,44 +65,18 @@ class ContentIdentifier:
         try:
             # 提取内容
             content = self.extract_content_from_record(record)
-            
-            # 检查是否已经有识别结果
-            fields = record.fields
-            existing_result = fields.get(self.output_field, '')
-            
-            # 如果已有识别结果,则跳过
-            if existing_result and existing_result.strip():
-                try:
-                    # 尝试解析JSON,如果成功说明已有有效结果
-                    json.loads(existing_result)
-                    print(f"记录 {record.record_id} 已有识别结果,跳过")
-                    return True
-                except json.JSONDecodeError:
-                    # 如果JSON解析失败,说明可能是旧格式,继续处理
-                    pass
-            
+
             # 检查是否有输入内容
-            if not content['body_text'] or not content['body_text'].strip():
+            if not content.strip() :
                 print(f"记录 {record.record_id} 没有输入内容,跳过")
                 return True
-            
-            print(f"处理记录 {record.record_id}")
-            print(f"标题: {content['title'][:50]}...")
-            print(f"内容长度: {len(content['body_text'])} 字符")
-            print(f"图片数量: {len(content['image_url_list'])}")
-            
-            # 调用Coze工作流
-            coze_response = self.call_coze_workflow(
-                content['title'],
-                content['body_text'],
-                content['image_url_list']
-            )
-            
-            # 提取结果
-            result_dict = self.extract_coze_result(coze_response)
+
+            result = self.processor.process(content, self.system_prompt)
+
             
             # 更新飞书表格
-            self.update_feishu_record(record.record_id, result_dict)
+            self.update_feishu_record(record.record_id, result)
+
             
             # 添加延迟避免API限制
             time.sleep(1)
@@ -252,7 +130,7 @@ def main():
     """主函数"""
     # 创建命令行参数解析器
     parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
-    parser.add_argument('table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
+    parser.add_argument('--table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
     parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
     parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
     
@@ -260,22 +138,11 @@ def main():
     
     try:
         # 创建内容识别器实例
-        identifier = ContentIdentifier(table_id=args.table_id)
+        hadnler = Handler(table_id=args.table_id)
         
-        print(f"使用表格ID: {identifier.table_id}")
+        print(f"使用表格ID: {hadnler.table_id}")
         
-        if args.dry_run:
-            print("试运行模式:只显示会处理的记录,不实际调用API")
-            # TODO: 实现试运行模式
-            identifier.process_all_records()
-        else:
-            # 正常处理模式
-            if args.page_token:
-                print(f"从分页token开始处理: {args.page_token}")
-                # TODO: 支持从指定分页token开始处理
-                identifier.process_all_records()
-            else:
-                identifier.process_all_records()
+        hadnler.process_all_records()
                 
     except Exception as e:
         print(f"程序执行失败: {e}")

+ 5 - 0
README.md

@@ -0,0 +1,5 @@
+python3 -m venv myenv
+
+source myenv/bin/activate
+
+pip install -r requirements.txt

+ 2 - 11
config.py

@@ -1,18 +1,9 @@
 # 飞书配置
 # 推荐通过环境变量设置这些敏感信息,以增强安全性,避免硬编码。
 # 如果环境变量未设置,则使用默认的 "YOUR_..." 占位符。
-FEISHU_APP_ID = os.getenv("FEISHU_APP_ID", "YOUR_FEISHU_APP_ID")
-FEISHU_APP_SECRET = os.getenv("FEISHU_APP_SECRET", "YOUR_FEISHU_APP_SECRET")
-FEISHU_BASE_ID = os.getenv("FEISHU_BASE_ID", "YOUR_FEISHU_BASE_ID")
-FEISHU_TABLE_ID = os.getenv("FEISHU_TABLE_ID", "YOUR_FEISHU_TABLE_ID")
+FEISHU_APP_ID="cli_a76c35b8fa7d500c"
+FEISHU_APP_SECRET="xHpF7H9nBwXeCH2Z1YRDScWSXzyktq36"
 
-FEISHU_INPUT_FIELD = os.getenv("FEISHU_INPUT_FIELD", "文本内容") # 你的飞书表格中用于输入的列名
-FEISHU_OUTPUT_FIELD = os.getenv("FEISHU_OUTPUT_FIELD", "Coze总结") # 你的飞书表格中用于输出的列名
-
-
-# Coze 配置
-COZE_API_KEY = os.getenv("COZE_API_KEY", "YOUR_COZE_API_KEY")
-COZE_BOT_ID = os.getenv("COZE_BOT_ID", "YOUR_COZE_BOT_ID") # 例如: "7343685511394590740"
 
 # Gemini 配置
 GEMINI_API_KEY = "AIzaSyC0J8gtl5I6-nu6fgvQrfnWkw0QIzfXEWE"

+ 39 - 165
gemini.py

@@ -1,176 +1,50 @@
 import os
-import time
-import json
-import re
 from google import genai
 from google.genai import types
 import config
 
 
-# --- 正确的初始化流程 ---
+class GeminiProcessor:
+    def __init__(self, api_key: str = None):
+        """
+        初始化Gemini客户端
+        :param api_key: Gemini API密钥,如果为None则使用config中的密钥
+        """
+        self.api_key = api_key if api_key else config.GEMINI_API_KEY
+        self.client = genai.Client(api_key=self.api_key)
+
+    def process(self, content: str, system_prompt: str, model_name: str = "gemini-2.5-flash"):
+        """
+        处理文本内容
+        :param content: 要处理的文本内容
+        :param system_prompt: 系统提示语
+        :param model_name: 使用的模型名称,默认为gemini-2.5-flash
+        :return: 处理后的文本结果
+        """
+        input_text = ({content})
 
-
-client = genai.Client(api_key=config.GEMINI_API_KEY)
-
-
-# 系统提示词和COT配置
-DEFAULT_SYSTEM_PROMPT = """
-"""
-
-SYSTEM_PROMPT_FILE="system_prompt/v37"
-
-def load_system_prompt(prompt_file_path: str) -> str:
-    """
-    从指定文件加载系统提示词
-    :param prompt_file_path: 系统提示词文件路径
-    :return: 系统提示词内容
-    """
-    try:
-        with open(prompt_file_path, 'r', encoding='utf-8') as f:
-            system_prompt = f.read().strip()
-        print(f"成功从 {prompt_file_path} 加载系统提示词")
-        return system_prompt
-    except Exception as e:
-        print(f"读取系统提示词文件 {prompt_file_path} 出错: {str(e)}")
-        return DEFAULT_SYSTEM_PROMPT
-
-SYSTEM_PROMPT = load_system_prompt(SYSTEM_PROMPT_FILE)
-# 再次提醒:SYSTEM_PROMPT 的内容必须与您期望的输入/输出格式严格匹配。
-# 它应该明确说明模型将收到的是 "目标对话\n[目标对话JSON字符串]\n上下文对话\n[上下文JSON数组字符串]" 这种格式,
-# 并期望输出为您提供的 { "对话整体解构": {...}, "详细解构": [...] } JSON 对象结构。
-
-
-def process_files_sequentially(input_dir: str, output_dir: str, num_context_files: int = 4, delay_seconds: float = 2.0):
-    """
-    逐个处理文件夹中的文本文件,每个目标文件带上下文
-    :param input_dir: 输入文件夹路径
-    :param output_dir: 输出文件夹路径
-    :param num_context_files: 每个目标文件附带的上下文文件数量
-    :param delay_seconds: 每个文件处理之间的延迟(秒)
-    """
-
-    # 确保输出目录存在
-    os.makedirs(output_dir, exist_ok=True)
-    
-    # 获取所有txt文件
-    # 注意: f.endswith('') 会匹配所有文件,如果只想处理txt,应改为 f.endswith('.txt')
-    input_files_names = sorted([f for f in os.listdir(input_dir) if f.endswith('')]) 
-    total_files = len(input_files_names)
-    
-    print(f"找到 {total_files} 个文件。将逐个处理(每个目标文件附带 {num_context_files} 个上下文文件)")
-    
-    # 预先读取所有文件内容,以便高效构建上下文
-    all_file_contents = []
-    for filename in input_files_names:
-        input_path = os.path.join(input_dir, filename)
         try:
-            with open(input_path, 'r', encoding='utf-8') as f:
-                all_file_contents.append(f.read().strip())
-        except Exception as e:
-            print(f"  ✕ 预读取文件 {filename} 出错: {str(e)}")
-            all_file_contents.append(f"错误: 无法读取文件 '{filename}' - {str(e)}")
-
-    # 逐个处理文件
-    # i 现在直接代表当前处理文件的索引
-    for i in range(1):
-    # for i in range(total_files):
-        target_filename = input_files_names[i]
-        target_content = all_file_contents[i]
-        
-        # 收集上下文文件内容
-        context_contents = []
-        for k in range(1, num_context_files + 1):
-            context_idx = i + k
-            if context_idx < total_files:
-                context_contents.append(all_file_contents[context_idx])
-            # 如果没有足够的上下文文件,就按实际数量提供,不会填充空字符串
-        
-        print(f"\n处理文件 {i+1}/{total_files}: '{target_filename}' (目标 + {len(context_contents)} 个上下文文件)")
-        
-        output_path = os.path.join(output_dir, f"{os.path.splitext(target_filename)[0]}.json")
-        
-        target_content_json_str = json.dumps(target_content, ensure_ascii=False)
-        context_contents_json_str = json.dumps(context_contents, ensure_ascii=False)
-
-        # 构建符合 SYSTEM_PROMPT 期望的单个文本字符串,包含Markdown标题和JSON内容
-        combined_input_text = (
-            f"## 目标对话\n"
-            f"{target_content_json_str}\n" # 使用json.dumps后的字符串
-            f"## 上下文对话\n"
-            f"{context_contents_json_str}" # 使用json.dumps后的字符串
-        )
-
-
-        try:
-            contents = [
-                {"text": combined_input_text}
-            ]
-            
-            # 调用Gemini模型处理单个目标文件
-            response = client.models.generate_content(
-                model="gemini-2.5-pro", # 或者您需要的其他模型
+            # 调用Gemini模型处理内容
+            response = self.client.models.generate_content(
+                model=model_name,
                 config=types.GenerateContentConfig(
-                    system_instruction=SYSTEM_PROMPT),
-                contents=contents
+                    system_instruction=system_prompt
+                ),
+                contents=input_text
             )
-
-            result = response.text
-
-            # 移除Markdown代码块的围栏
-            result = re.sub(r'^\s*```json\s*|\s*```\s*$', '', result, flags=re.MULTILINE)
-            result = result.strip()  # 去除多余的空行
-            
-            # 尝试解析JSON响应
-            try:
-                # 此时 result 应该就是单个文件的 JSON 结果,即您提供的 { "对话整体解构": {...}, "详细解构": [...] } 结构
-                dialogue_report = json.loads(result)
-                print(f"  成功获取并解析API响应 '{target_filename}'")
-                
-                # 保存处理结果
-                # dialogue_report 现在是一个字典,可以直接保存
-                try:
-                    with open(output_path, 'w', encoding='utf-8') as f:
-                        json.dump(dialogue_report, f, indent=2, ensure_ascii=False)
-                    print(f"  ✓ 保存: {os.path.basename(output_path)}")
-                except Exception as e:
-                    error_msg = f"保存错误: {str(e)}"
-                    with open(output_path, 'w', encoding='utf-8') as f:
-                        f.write(error_msg)
-                    print(f"  ⚠ 保存失败: {error_msg}")
-                    
-            except json.JSONDecodeError as e:
-                print(f"  ⚠ API返回非JSON格式,尝试提取有效部分... 错误: {e}")
-                # ****** 重点修改:寻找 '{' 和 '}' 来提取JSON对象 ******
-                json_start = result.find('{')
-                json_end = result.rfind('}') + 1 # +1 to include the closing brace
-                if json_end > json_start > -1: # 检查是否找到了有效的括号对
-                    try:
-                        extracted_report = json.loads(result[json_start:json_end])
-                        print(f"  成功提取JSON数据 for '{target_filename}'")
-                        with open(output_path, 'w', encoding='utf-8') as f:
-                            json.dump(extracted_report, f, indent=2, ensure_ascii=False)
-                        print(f"  ✓ 保存 (提取成功): {os.path.basename(output_path)}")
-                    except Exception as extract_e:
-                        error_msg = f"无法提取有效JSON数据,使用原始响应。提取错误: {extract_e}\n原始响应:\n{result}"
-                        with open(output_path, 'w', encoding='utf-8') as f:
-                            f.write(error_msg)
-                        print(f"  ⚠ 保存失败 (提取错误): {error_msg}")
-                else:
-                    error_msg = f"无法定位JSON内容,使用原始响应。\n原始响应:\n{result}"
-                    with open(output_path, 'w', encoding='utf-8') as f:
-                        f.write(error_msg)
-                    print(f"  ⚠ 保存失败 (未找到JSON): {error_msg}")
-                
+            return response.text
         except Exception as e:
-            error_msg = f"处理 '{target_filename}' 时API错误: {str(e)}"
-            print(f"  ✕ {error_msg}")
-            # API调用失败,为当前文件生成错误文件
-            with open(output_path, 'w', encoding='utf-8') as f:
-                f.write(error_msg)
-        
-        # 延迟 (在处理完当前文件后,如果不是最后一个文件)
-        if i < total_files - 1:
-            print(f"等待 {delay_seconds} 秒...")
-            time.sleep(delay_seconds)
-    
-    print("\n所有文件处理完成")
+            print(f"处理内容时出错: {e}")
+            return None
+
+
+# 使用示例
+if __name__ == "__main__":
+    processor = GeminiProcessor()
+    result = processor.process(
+        content="你好,请介绍一下你自己",
+        system_prompt="你是一个有帮助的AI助手"
+    )
+    if result:
+        print(result)
+    print("\n处理完成")

+ 6 - 11
utils/fei_shu.py

@@ -4,20 +4,15 @@ from typing import Any, Dict, Optional
 
 import lark_oapi as lark
 from lark_oapi.core.enum import LogLevel
+import config
 
 
-class FeiShu(object):
-    _instance = None
+class FeiShu:
 
-    def __new__(cls, *args, **kwargs):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls, *args, **kwargs)
-        return cls._instance
-
-    def __init__(self):
-        app_id = os.getenv('FEISHU_APP_ID')
-        app_secret = os.getenv('FEISHU_APP_SECRET')
-        self.file_token = os.getenv('FEISHU_FILE_TOKEN')
+    def __init__(self, file_token:str = None):
+        app_id = os.getenv('FEISHU_APP_ID', config.FEISHU_APP_ID)
+        app_secret = os.getenv('FEISHU_APP_SECRET', config.FEISHU_APP_SECRET)
+        self.file_token = file_token if file_token is not None else os.getenv('FEISHU_FILE_TOKEN')
         self.client = (lark.client.Client().builder()
                        .app_id(app_id)
                        .app_secret(app_secret)

+ 72 - 0
utils/file.py

@@ -0,0 +1,72 @@
+import os
+from typing import Optional
+
+class File:
+
+    @staticmethod
+    def read_file(file_path: str) -> Optional[str]:
+        """
+        读取文件内容
+        
+        参数:
+            file_path: 文件路径
+            
+        返回:
+            文件内容字符串,如果出错返回None
+        """
+        try:
+            with open(file_path, 'r', encoding='utf-8') as f:
+                return f.read()
+        except Exception as e:
+            print(f"读取文件 {file_path} 出错: {str(e)}")
+            return None
+
+    @staticmethod
+    def write_file(file_path: str, content: str) -> bool:
+        """
+        写入内容到文件
+        
+        参数:
+            file_path: 文件路径
+            content: 要写入的内容
+            
+        返回:
+            是否成功写入
+        """
+        try:
+            with open(file_path, 'w', encoding='utf-8') as f:
+                f.write(content)
+            return True
+        except Exception as e:
+            print(f"写入文件 {file_path} 出错: {str(e)}")
+            return False
+
+    @staticmethod
+    def ensure_directory_exists(dir_path: str) -> bool:
+        """
+        确保目录存在,如果不存在则创建
+        
+        参数:
+            dir_path: 目录路径
+            
+        返回:
+            是否成功确保目录存在
+        """
+        try:
+            os.makedirs(dir_path, exist_ok=True)
+            return True
+        except Exception as e:
+            print(f"创建目录 {dir_path} 出错: {str(e)}")
+            return False
+
+
+# 使用示例
+if __name__ == "__main__":
+    # 测试load_system_prompt
+    test_content = "这是一个测试文件内容"
+    File.write_file("test.txt", test_content)
+    read_content = File.read_file("test.txt")
+    print(f"读取的文件内容: {read_content}")
+    
+    # 测试ensure_directory_exists
+    File.ensure_directory_exists("test_dir")