#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 内容结构化处理模块 主要功能: 1. 从数据库中拉取需要结构化的数据 2. 调用Gemini API进行内容结构化 3. 将结构化结果更新到数据库 """ import os import json import time import sys import threading from typing import Dict, Any, List, Optional, Tuple # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from gemini import GeminiProcessor from utils.file import File from utils.logging_config import get_logger class StructureProcessor: def __init__(self): # 设置日志 self.logger = get_logger('StructureProcessor') # 初始化处理器 self.processor = GeminiProcessor() self.system_prompt = File.read_file('../prompt/structure.md') self.logger.info("系统提示词加载完成") self.logger.debug(f"系统提示词: {self.system_prompt}") # 线程控制 self.lock = threading.Lock() self.stop_event = threading.Event() self.threads = [] def get_query_words(self) -> List[str]: """从 knowledge_content_query 表中获取 category_id = 0 的所有 query_word""" try: sql = """ SELECT query_word FROM knowledge_content_query WHERE category_id = 0 """ result = MysqlHelper.get_values(sql) if result: query_words = [row[0] for row in result] self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word") return query_words else: self.logger.warning("未找到 category_id = 0 的 query_word") return [] except Exception as e: self.logger.error(f"获取 query_word 失败: {e}") return [] def process_single_record(self) -> bool: """处理单条记录""" try: with self.lock: # 第一步:获取 category_id = 0 的所有 query_word query_words = self.get_query_words() if not query_words: self.logger.warning("没有可用的 query_word") return False # 第二步:用这些 query_word 去匹配 knowledge_search_content 表 # 构建带引号的查询条件 quoted_words = [f"'{word}'" for word in query_words] placeholders = ','.join(quoted_words) # 使用 FOR UPDATE 锁定记录,确保原子性操作 # 明确排除正在处理中和已处理的记录 select_sql = f""" SELECT id, multimodal_recognition FROM knowledge_search_content WHERE multimodal_recognition IS NOT NULL AND structured_data IS NULL AND query_word IN ({placeholders}) ORDER BY id ASC LIMIT 1 """ self.logger.info(f"执行查询: {select_sql}") records = MysqlHelper.get_values(select_sql) if not records: self.logger.warning("没有找到需要处理的记录") return False row = records[0] self.logger.info(f"row: {row}") record_id = row[0] self.logger.info(f"record_id: {record_id}") # 立即标记为处理中,防止其他线程取到重复处理 mark_sql = """ UPDATE knowledge_search_content SET structured_data = 'PROCESSING' WHERE id = %s """ mark_result = MysqlHelper.update_values(mark_sql, (record_id,)) if mark_result is None: self.logger.error(f"标记记录 {record_id} 为处理中失败") return False self.logger.info(f"记录 {record_id} 已标记为处理中") # 处理内容 result = self.processor.process(row[1], self.system_prompt) self.logger.info(f"处理完成,结果长度: {len(str(result))}") self.logger.debug(f"处理结果: {result}") # 更新数据库为实际结果 update_sql = """ UPDATE knowledge_search_content SET structured_data = %s WHERE id = %s """ update_result = MysqlHelper.update_values(update_sql, (result, record_id)) if update_result is None: self.logger.error(f"更新记录 {record_id} 失败") return False self.logger.info(f"记录 {record_id} 处理完成并更新数据库") return True except Exception as e: self.logger.error(f"处理记录失败: {str(e)}", exc_info=True) return False def worker_thread(self, thread_id: int): """工作线程函数""" thread_logger = get_logger(f'WorkerThread-{thread_id}') thread_logger.info(f"线程 {thread_id} 启动") while not self.stop_event.is_set(): try: # 尝试处理一条记录 success = self.process_single_record() if not success: thread_logger.info(f"没有找到需要处理的记录,等待5秒后重试") # 等待时也要检查停止信号 if self.stop_event.wait(5): break continue # 处理成功后等待5秒再处理下一条 thread_logger.info(f"处理完成,等待5秒后处理下一条") # 等待时也要检查停止信号 if self.stop_event.wait(5): break except Exception as e: thread_logger.error(f"发生错误: {str(e)}", exc_info=True) # 等待时也要检查停止信号 if self.stop_event.wait(5): break thread_logger.info(f"线程 {thread_id} 已停止") def start_multi_thread_processing(self): """启动多线程处理""" self.threads = [] self.logger.info("启动多线程处理...") self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)") # 创建5个线程,间隔5秒启动 for i in range(5): thread = threading.Thread( target=self.worker_thread, args=(i + 1,) ) self.threads.append(thread) # 启动线程 thread.start() self.logger.info(f"线程 {i + 1} 已启动") # 等待5秒后启动下一个线程 if i < 4: # 最后一个线程不需要等待 self.logger.info("等待5秒后启动下一个线程...") time.sleep(5) self.logger.info("所有线程已启动,使用 ./start_structure.sh stop 停止") try: # 等待所有线程完成 for thread in self.threads: thread.join() except KeyboardInterrupt: self.logger.info("收到停止信号,正在停止所有线程...") self.stop_all_threads() def stop_all_threads(self): """停止所有线程""" self.logger.info("正在停止所有线程...") self.stop_event.set() # 等待所有线程结束 for i, thread in enumerate(self.threads): if thread.is_alive(): self.logger.info(f"等待线程 {i + 1} 结束...") thread.join(timeout=10) # 最多等待10秒 if thread.is_alive(): self.logger.warning(f"线程 {i + 1} 未能正常结束") else: self.logger.info(f"线程 {i + 1} 已正常结束") self.logger.info("所有线程已停止") def main(): """主函数""" try: processor = StructureProcessor() processor.start_multi_thread_processing() except Exception as e: print(f"程序执行失败: {str(e)}") sys.exit(1) if __name__ == "__main__": # 测试单条记录处理 processor = StructureProcessor() processor.process_single_record()