import os import json import time import sys import argparse from typing import Dict, Any, List, Optional, Tuple # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from gemini import GeminiProcessor from utils.file import File class Handler: def __init__(self): # 初始化处理器 self.processor = GeminiProcessor() self.system_prompt = File.read_file('prompt/handle.md') def build_query_conditions(self, query_word: Optional[str], source_type: Optional[str], source_channel: Optional[str]) -> Tuple[str, Tuple]: """构建查询条件和参数""" conditions = ["multimodal_recognition is not null", "structured_data is null"] params = [] if query_word is not None: conditions.append("query_word = %s") params.append(query_word) if source_type is not None: conditions.append("source_type = %s") params.append(source_type) if source_channel is not None: conditions.append("source_channel = %s") params.append(source_channel) where_clause = " AND ".join(conditions) return where_clause, tuple(params) def process_all_records(self, query_word: Optional[str], source_type: Optional[str], source_channel: Optional[str]): """处理所有记录""" total_processed = 0 total_success = 0 try: # 构建查询条件和参数 where_clause, params = self.build_query_conditions(query_word, source_type, source_channel) sql = f""" SELECT id, multimodal_recognition FROM knowledge_search_content WHERE {where_clause} """ # 查询记录 records = MysqlHelper.get_values(sql, params) print(f"获取到 {len(records)} 条记录") # 处理每条记录 for row in records: total_processed += 1 try: # 处理内容 result = self.processor.process(row[1], self.system_prompt) print(result) # 更新数据库 update_sql = """ UPDATE knowledge_search_content SET structured_data = %s WHERE id = %s """ affected_rows = MysqlHelper.update_values(update_sql, (result, row[0])) total_success += 1 # 添加延迟避免API限制 time.sleep(5) except Exception as e: print(f"处理记录 {row[0]} 失败: {str(e)}") finally: print(f"处理完成!总数据量 {len(records)},已处理 {total_processed} ,成功 {total_success} ") except Exception as e: print(f"处理过程中发生错误: {str(e)}") def main(): """主函数""" parser = argparse.ArgumentParser(description='内容识别脚本') parser.add_argument('--query_word', default=None, help='query词') parser.add_argument('--source_type', default=None, help='数据源类型') parser.add_argument('--source_channel', default=None, help='数据源渠道') args = parser.parse_args() try: handler = Handler() handler.process_all_records( query_word=args.query_word, source_type=args.source_type, source_channel=args.source_channel ) except Exception as e: print(f"程序执行失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()