123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- import os
- import json
- import time
- import sys
- import argparse
- from typing import Dict, Any, List, Optional
- # 导入自定义模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.fei_shu import FeiShu
- from gemini import GeminiProcessor
- from utils.file import File
- class Handler:
- def __init__(self, table_id: Optional[str] = None):
-
- # 初始化飞书客户端
- self.feishu = FeiShu(file_token='VEBsbCfaWa3gF3slQILc6Rybnde')
- self.processor = GeminiProcessor()
-
-
- # 获取表格ID:优先使用传入的参数,其次使用环境变量
- self.table_id = table_id
- # 字段名称配置
- self.input_field = '识别结果'
- self.output_field = '初步理解'
- self.system_prompt = File.read_file('prompt/handle.md')
- print(self.system_prompt)
- def extract_content_from_record(self, record) -> Dict[str, Any]:
- """从飞书记录中提取内容"""
- fields = record.fields
-
- # 提取识别结果
- result = fields.get(self.input_field, [])
- return ''.join([item['text'] for item in result])
-
-
- def update_feishu_record(self, record_id: str, content: str):
- """更新飞书表格中的记录"""
- try:
- import lark_oapi as lark
- # 创建更新记录
- update_record = (lark.bitable.v1.AppTableRecord.builder()
- .record_id(record_id)
- .fields({
- self.output_field: content
- })
- .build())
-
- # 执行更新
- self.feishu.update_record(self.table_id, update_record)
- print(f"已更新记录 {record_id}")
-
- except Exception as e:
- print(f"更新飞书记录失败: {e}")
-
- def process_single_record(self, record) -> bool:
- """处理单条记录"""
- try:
- # 提取内容
- content = self.extract_content_from_record(record)
- # 检查是否有输入内容
- if not content.strip() :
- print(f"记录 {record.record_id} 没有输入内容,跳过")
- return True
- result = self.processor.process(content, self.system_prompt)
-
- # 更新飞书表格
- self.update_feishu_record(record.record_id, result)
-
- # 添加延迟避免API限制
- time.sleep(1)
-
- return True
-
- except Exception as e:
- print(f"处理记录 {record.record_id} 失败: {e}")
- return False
-
- def process_all_records(self):
- """处理所有记录"""
- print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
-
- page_token = None
- total_processed = 0
- total_success = 0
-
- while True:
- try:
- # 获取记录
- result = self.feishu.get_all_records(self.table_id, page_token)
-
- if not result.items:
- print("没有找到记录")
- break
-
- print(f"获取到 {len(result.items)} 条记录")
-
- # 处理每条记录
- for record in result.items:
- total_processed += 1
- if self.process_single_record(record):
- total_success += 1
-
- # 检查是否有下一页
- if not result.has_more:
- break
-
- page_token = result.page_token
- print(f"继续获取下一页,token: {page_token}")
-
- except Exception as e:
- print(f"获取记录失败: {e}")
- break
-
- print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
- def main():
- """主函数"""
- # 创建命令行参数解析器
- parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
- parser.add_argument('--table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
- parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
- parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
-
- args = parser.parse_args()
-
- try:
- # 创建内容识别器实例
- hadnler = Handler(table_id=args.table_id)
-
- print(f"使用表格ID: {hadnler.table_id}")
-
- hadnler.process_all_records()
-
- except Exception as e:
- print(f"程序执行失败: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|