import os import json import time import sys import argparse from typing import Dict, Any, List, Optional # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.fei_shu import FeiShu from gemini import GeminiProcessor from utils.file import File class Handler: def __init__(self, table_id: Optional[str] = None): # 初始化飞书客户端 self.feishu = FeiShu(file_token='VEBsbCfaWa3gF3slQILc6Rybnde') self.processor = GeminiProcessor() # 获取表格ID:优先使用传入的参数,其次使用环境变量 self.table_id = table_id # 字段名称配置 self.input_field = '识别结果' self.output_field = '初步理解' self.system_prompt = File.read_file('prompt/handle.md') print(self.system_prompt) def extract_content_from_record(self, record) -> Dict[str, Any]: """从飞书记录中提取内容""" fields = record.fields # 提取识别结果 result = fields.get(self.input_field, []) return ''.join([item['text'] for item in result]) def update_feishu_record(self, record_id: str, content: str): """更新飞书表格中的记录""" try: import lark_oapi as lark # 创建更新记录 update_record = (lark.bitable.v1.AppTableRecord.builder() .record_id(record_id) .fields({ self.output_field: content }) .build()) # 执行更新 self.feishu.update_record(self.table_id, update_record) print(f"已更新记录 {record_id}") except Exception as e: print(f"更新飞书记录失败: {e}") def process_single_record(self, record) -> bool: """处理单条记录""" try: # 提取内容 content = self.extract_content_from_record(record) # 检查是否有输入内容 if not content.strip() : print(f"记录 {record.record_id} 没有输入内容,跳过") return True result = self.processor.process(content, self.system_prompt) # 更新飞书表格 self.update_feishu_record(record.record_id, result) # 添加延迟避免API限制 time.sleep(1) return True except Exception as e: print(f"处理记录 {record.record_id} 失败: {e}") return False def process_all_records(self): """处理所有记录""" print(f"开始处理飞书表格 {self.table_id} 中的所有记录") page_token = None total_processed = 0 total_success = 0 while True: try: # 获取记录 result = self.feishu.get_all_records(self.table_id, page_token) if not result.items: print("没有找到记录") break print(f"获取到 {len(result.items)} 条记录") # 处理每条记录 for record in result.items: total_processed += 1 if self.process_single_record(record): total_success += 1 # 检查是否有下一页 if not result.has_more: break page_token = result.page_token print(f"继续获取下一页,token: {page_token}") except Exception as e: print(f"获取记录失败: {e}") break print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条") def main(): """主函数""" # 创建命令行参数解析器 parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据') parser.add_argument('--table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)') parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理') parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API') args = parser.parse_args() try: # 创建内容识别器实例 hadnler = Handler(table_id=args.table_id) print(f"使用表格ID: {hadnler.table_id}") hadnler.process_all_records() except Exception as e: print(f"程序执行失败: {e}") sys.exit(1) if __name__ == "__main__": main()