123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 内容结构化处理模块
- 主要功能:
- 1. 从数据库中拉取需要结构化的数据
- 2. 调用Gemini API进行内容结构化
- 3. 将结构化结果更新到数据库
- """
- import os
- import json
- import time
- import sys
- import threading
- from typing import Dict, Any, List, Optional, Tuple
- # 导入自定义模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.mysql_db import MysqlHelper
- from gemini import GeminiProcessor
- from utils.file import File
- from utils.logging_config import get_logger
- class StructureProcessor:
- def __init__(self):
- # 设置日志
- self.logger = get_logger('StructureProcessor')
-
- # 初始化处理器
- self.processor = GeminiProcessor()
- self.system_prompt = File.read_file('../prompt/structure.md')
- self.logger.info("系统提示词加载完成")
- self.logger.debug(f"系统提示词: {self.system_prompt}")
-
- # 线程控制
- self.lock = threading.Lock()
- self.stop_event = threading.Event()
- self.threads = []
-
- def get_query_words(self) -> List[str]:
- """从 knowledge_content_query 表中获取 category_id = 0 的所有 query_word"""
- try:
- sql = """
- SELECT query_word
- FROM knowledge_content_query
- WHERE category_id = 0
- """
-
- result = MysqlHelper.get_values(sql)
- if result:
- query_words = [row[0] for row in result]
- self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
- return query_words
- else:
- self.logger.warning("未找到 category_id = 0 的 query_word")
- return []
-
- except Exception as e:
- self.logger.error(f"获取 query_word 失败: {e}")
- return []
-
- def process_single_record(self) -> bool:
- """处理单条记录"""
- try:
- with self.lock:
- # 第一步:获取 category_id = 0 的所有 query_word
- query_words = self.get_query_words()
- if not query_words:
- self.logger.warning("没有可用的 query_word")
- return False
-
- # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
- # 构建带引号的查询条件
- quoted_words = [f"'{word}'" for word in query_words]
- placeholders = ','.join(quoted_words)
-
- # 使用 FOR UPDATE 锁定记录,确保原子性操作
- # 明确排除正在处理中和已处理的记录
- select_sql = f"""
- SELECT id, multimodal_recognition
- FROM knowledge_search_content
- WHERE multimodal_recognition IS NOT NULL
- AND structured_data IS NULL
- AND query_word IN ({placeholders})
- ORDER BY id ASC
- LIMIT 1
- """
-
- self.logger.info(f"执行查询: {select_sql}")
-
- records = MysqlHelper.get_values(select_sql)
- if not records:
- self.logger.warning("没有找到需要处理的记录")
- return False
-
- row = records[0]
- self.logger.info(f"row: {row}")
- record_id = row[0]
- self.logger.info(f"record_id: {record_id}")
-
- # 立即标记为处理中,防止其他线程取到重复处理
- mark_sql = """
- UPDATE knowledge_search_content
- SET structured_data = 'PROCESSING'
- WHERE id = %s
- """
-
- mark_result = MysqlHelper.update_values(mark_sql, (record_id,))
- if mark_result is None:
- self.logger.error(f"标记记录 {record_id} 为处理中失败")
- return False
-
- self.logger.info(f"记录 {record_id} 已标记为处理中")
-
- # 处理内容
- result = self.processor.process(row[1], self.system_prompt)
- self.logger.info(f"处理完成,结果长度: {len(str(result))}")
- self.logger.debug(f"处理结果: {result}")
-
- # 更新数据库为实际结果
- update_sql = """
- UPDATE knowledge_search_content
- SET structured_data = %s
- WHERE id = %s
- """
-
- update_result = MysqlHelper.update_values(update_sql, (result, record_id))
- if update_result is None:
- self.logger.error(f"更新记录 {record_id} 失败")
- return False
-
- self.logger.info(f"记录 {record_id} 处理完成并更新数据库")
- return True
-
- except Exception as e:
- self.logger.error(f"处理记录失败: {str(e)}", exc_info=True)
- return False
-
- def worker_thread(self, thread_id: int):
- """工作线程函数"""
- thread_logger = get_logger(f'WorkerThread-{thread_id}')
- thread_logger.info(f"线程 {thread_id} 启动")
-
- while not self.stop_event.is_set():
- try:
- # 尝试处理一条记录
- success = self.process_single_record()
-
- if not success:
- thread_logger.info(f"没有找到需要处理的记录,等待5秒后重试")
- # 等待时也要检查停止信号
- if self.stop_event.wait(5):
- break
- continue
-
- # 处理成功后等待5秒再处理下一条
- thread_logger.info(f"处理完成,等待5秒后处理下一条")
- # 等待时也要检查停止信号
- if self.stop_event.wait(5):
- break
-
- except Exception as e:
- thread_logger.error(f"发生错误: {str(e)}", exc_info=True)
- # 等待时也要检查停止信号
- if self.stop_event.wait(5):
- break
-
- thread_logger.info(f"线程 {thread_id} 已停止")
-
- def start_multi_thread_processing(self):
- """启动多线程处理"""
- self.threads = []
-
- self.logger.info("启动多线程处理...")
- self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
-
- # 创建5个线程,间隔5秒启动
- for i in range(5):
- thread = threading.Thread(
- target=self.worker_thread,
- args=(i + 1,)
- )
- self.threads.append(thread)
-
- # 启动线程
- thread.start()
- self.logger.info(f"线程 {i + 1} 已启动")
-
- # 等待5秒后启动下一个线程
- if i < 4: # 最后一个线程不需要等待
- self.logger.info("等待5秒后启动下一个线程...")
- time.sleep(5)
-
- self.logger.info("所有线程已启动,使用 ./start_structure.sh stop 停止")
-
- try:
- # 等待所有线程完成
- for thread in self.threads:
- thread.join()
- except KeyboardInterrupt:
- self.logger.info("收到停止信号,正在停止所有线程...")
- self.stop_all_threads()
-
- def stop_all_threads(self):
- """停止所有线程"""
- self.logger.info("正在停止所有线程...")
- self.stop_event.set()
-
- # 等待所有线程结束
- for i, thread in enumerate(self.threads):
- if thread.is_alive():
- self.logger.info(f"等待线程 {i + 1} 结束...")
- thread.join(timeout=10) # 最多等待10秒
- if thread.is_alive():
- self.logger.warning(f"线程 {i + 1} 未能正常结束")
- else:
- self.logger.info(f"线程 {i + 1} 已正常结束")
-
- self.logger.info("所有线程已停止")
- def main():
- """主函数"""
- try:
- processor = StructureProcessor()
- processor.start_multi_thread_processing()
- except Exception as e:
- print(f"程序执行失败: {str(e)}")
- sys.exit(1)
- if __name__ == "__main__":
- # 测试单条记录处理
- processor = StructureProcessor()
- processor.process_single_record()
|