3_handle.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import os
  2. import json
  3. import time
  4. import sys
  5. import argparse
  6. from typing import Dict, Any, List, Optional
  7. # 导入自定义模块
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from utils.mysql_db import MysqlHelper
  10. from gemini import GeminiProcessor
  11. from utils.file import File
  12. class Handler:
  13. def __init__(self):
  14. # 初始化飞书客户端
  15. self.mysql = MysqlHelper()
  16. self.processor = GeminiProcessor()
  17. self.system_prompt = File.read_file('prompt/handle.md')
  18. # print(self.system_prompt)
  19. def process_single_record(self, record) -> bool:
  20. """处理单条记录"""
  21. try:
  22. # 提取内容
  23. content = self.extract_content_from_record(record)
  24. # 检查是否有输入内容
  25. if not content.strip() :
  26. print(f"记录 {record.record_id} 没有输入内容,跳过")
  27. return True
  28. result = self.processor.process(content, self.system_prompt)
  29. # 更新飞书表格
  30. self.update_feishu_record(record.record_id, result)
  31. # 添加延迟避免API限制
  32. time.sleep(1)
  33. return True
  34. except Exception as e:
  35. print(f"处理记录 {record.record_id} 失败: {e}")
  36. return False
  37. def process_all_records(self):
  38. """处理所有记录"""
  39. print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
  40. page_token = None
  41. total_processed = 0
  42. total_success = 0
  43. while True:
  44. try:
  45. # 获取记录
  46. result = self.feishu.get_all_records(self.table_id, page_token)
  47. if not result.items:
  48. print("没有找到记录")
  49. break
  50. print(f"获取到 {len(result.items)} 条记录")
  51. # 处理每条记录
  52. for record in result.items:
  53. total_processed += 1
  54. if self.process_single_record(record):
  55. total_success += 1
  56. # 检查是否有下一页
  57. if not result.has_more:
  58. break
  59. page_token = result.page_token
  60. print(f"继续获取下一页,token: {page_token}")
  61. except Exception as e:
  62. print(f"获取记录失败: {e}")
  63. break
  64. print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
  65. def main():
  66. """主函数"""
  67. # 创建命令行参数解析器
  68. parser = argparse.ArgumentParser(description='内容识别脚本')
  69. parser.add_argument('--query_word', nargs='?', help='query词')
  70. parser.add_argument('--source_type', nargs='?', help='数据源类型')
  71. parser.add_argument('--source_channel', nargs='?', help='数据源渠道')
  72. args = parser.parse_args()
  73. try:
  74. # 创建内容识别器实例
  75. hadnler = Handler()
  76. hadnler.process_all_records(query_word=args.query_word, source_type=source_type, source_channel=source_channel)
  77. except Exception as e:
  78. print(f"程序执行失败: {e}")
  79. sys.exit(1)
  80. if __name__ == "__main__":
  81. main()