import os import json import time import sys import argparse from typing import Dict, Any, List, Optional # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from gemini import GeminiProcessor from utils.file import File class Handler: def __init__(self): # 初始化飞书客户端 self.processor = GeminiProcessor() self.system_prompt = File.read_file('prompt/handle.md') # print(self.system_prompt) def process_all_records(self, query_word, source_type, source_channel): """处理所有记录""" total_processed = 0 total_success = 0 while True: try: # 查库 获取记录 sql = """ select id, formatted_content from knowledge_search_content where formatted_content is not null and multimodal_recognition is null """ # 添加条件(当参数有值时) conditions = [] if query_word is not None: conditions.append(f"query_word='{query_word}'") if source_type is not None: conditions.append(f"source_type='{source_type}'") if source_channel is not None: conditions.append(f"source_channel='{source_channel}'") # 如果有条件,添加到SQL中 if conditions: sql += " and " + " and ".join(conditions) records = MysqlHelper.get_values(sql) print(f"获取到 {len(result)} 条记录") # 处理每条记录 for row in records: total_processed += 1 """处理单条记录""" try: result = self.processor.process(row[1], self.system_prompt) # 更新数据库 update_sql = """ update knowledge_search_content set multimodal_recognition = %s where id = %s """ MysqlHelper.update_values(update_sql, (result, row[0])) # 添加延迟避免API限制 time.sleep(1) total_success += 1 except Exception as e: print(f"处理记录 {record.record_id} 失败: {e}") # 检查是否有下一页 if not result.has_more: break page_token = result.page_token print(f"继续获取下一页,token: {page_token}") except Exception as e: print(f"获取记录失败: {e}") break print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条") def main(): """主函数""" # 创建命令行参数解析器 parser = argparse.ArgumentParser(description='内容识别脚本') parser.add_argument('--query_word', default=None, help='query词') parser.add_argument('--source_type', default=None, help='数据源类型') parser.add_argument('--source_channel', default=None, help='数据源渠道') args = parser.parse_args() try: # 创建内容识别器实例 handler = Handler() handler.process_all_records( query_word=args.query_word, source_type=args.source_type, source_channel=args.source_channel ) except Exception as e: print(f"程序执行失败: {e}") sys.exit(1) if __name__ == "__main__": main()