3_handle.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import os
  2. import json
  3. import time
  4. import sys
  5. import argparse
  6. from typing import Dict, Any, List, Optional
  7. # 导入自定义模块
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from utils.mysql_db import MysqlHelper
  10. from gemini import GeminiProcessor
  11. from utils.file import File
  12. class Handler:
  13. def __init__(self, table_id: Optional[str] = None):
  14. # 初始化飞书客户端
  15. self.mysql = MysqlHelper()
  16. self.processor = GeminiProcessor()
  17. self.system_prompt = File.read_file('prompt/handle.md')
  18. # print(self.system_prompt)
  19. def extract_content_from_record(self, record) -> Dict[str, Any]:
  20. """从飞书记录中提取内容"""
  21. fields = record.fields
  22. # 提取识别结果
  23. result = fields.get(self.input_field, [])
  24. return ''.join([item['text'] for item in result])
  25. def update_feishu_record(self, record_id: str, content: str):
  26. """更新飞书表格中的记录"""
  27. try:
  28. import lark_oapi as lark
  29. # 创建更新记录
  30. update_record = (lark.bitable.v1.AppTableRecord.builder()
  31. .record_id(record_id)
  32. .fields({
  33. self.output_field: content
  34. })
  35. .build())
  36. # 执行更新
  37. self.feishu.update_record(self.table_id, update_record)
  38. print(f"已更新记录 {record_id}")
  39. except Exception as e:
  40. print(f"更新飞书记录失败: {e}")
  41. def process_single_record(self, record) -> bool:
  42. """处理单条记录"""
  43. try:
  44. # 提取内容
  45. content = self.extract_content_from_record(record)
  46. # 检查是否有输入内容
  47. if not content.strip() :
  48. print(f"记录 {record.record_id} 没有输入内容,跳过")
  49. return True
  50. result = self.processor.process(content, self.system_prompt)
  51. # 更新飞书表格
  52. self.update_feishu_record(record.record_id, result)
  53. # 添加延迟避免API限制
  54. time.sleep(1)
  55. return True
  56. except Exception as e:
  57. print(f"处理记录 {record.record_id} 失败: {e}")
  58. return False
  59. def process_all_records(self):
  60. """处理所有记录"""
  61. print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
  62. page_token = None
  63. total_processed = 0
  64. total_success = 0
  65. while True:
  66. try:
  67. # 获取记录
  68. result = self.feishu.get_all_records(self.table_id, page_token)
  69. if not result.items:
  70. print("没有找到记录")
  71. break
  72. print(f"获取到 {len(result.items)} 条记录")
  73. # 处理每条记录
  74. for record in result.items:
  75. total_processed += 1
  76. if self.process_single_record(record):
  77. total_success += 1
  78. # 检查是否有下一页
  79. if not result.has_more:
  80. break
  81. page_token = result.page_token
  82. print(f"继续获取下一页,token: {page_token}")
  83. except Exception as e:
  84. print(f"获取记录失败: {e}")
  85. break
  86. print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
  87. def main():
  88. """主函数"""
  89. # 创建命令行参数解析器
  90. parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
  91. parser.add_argument('--table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
  92. parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
  93. parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
  94. args = parser.parse_args()
  95. try:
  96. # 创建内容识别器实例
  97. hadnler = Handler(table_id=args.table_id)
  98. print(f"使用表格ID: {hadnler.table_id}")
  99. hadnler.process_all_records()
  100. except Exception as e:
  101. print(f"程序执行失败: {e}")
  102. sys.exit(1)
  103. if __name__ == "__main__":
  104. main()