structure_processor.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 内容结构化处理模块
  5. 主要功能:
  6. 1. 从数据库中拉取需要结构化的数据
  7. 2. 调用Gemini API进行内容结构化
  8. 3. 将结构化结果更新到数据库
  9. """
  10. import os
  11. import json
  12. import time
  13. import sys
  14. import threading
  15. from typing import Dict, Any, List, Optional, Tuple
  16. # 导入自定义模块
  17. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  18. from utils.mysql_db import MysqlHelper
  19. from gemini import GeminiProcessor
  20. from utils.file import File
  21. from utils.logging_config import get_logger
  22. class StructureProcessor:
  23. def __init__(self):
  24. # 设置日志
  25. self.logger = get_logger('StructureProcessor')
  26. # 初始化处理器
  27. self.processor = GeminiProcessor()
  28. self.system_prompt = File.read_file('../prompt/structure.md')
  29. self.logger.info("系统提示词加载完成")
  30. self.logger.debug(f"系统提示词: {self.system_prompt}")
  31. # 线程控制
  32. self.lock = threading.Lock()
  33. self.stop_event = threading.Event()
  34. self.threads = []
  35. def get_query_words(self) -> List[str]:
  36. """从 knowledge_content_query 表中获取 category_id = 0 的所有 query_word"""
  37. try:
  38. sql = """
  39. SELECT query_word
  40. FROM knowledge_content_query
  41. WHERE category_id = 0
  42. """
  43. result = MysqlHelper.get_values(sql)
  44. if result:
  45. query_words = [row[0] for row in result]
  46. self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
  47. return query_words
  48. else:
  49. self.logger.warning("未找到 category_id = 0 的 query_word")
  50. return []
  51. except Exception as e:
  52. self.logger.error(f"获取 query_word 失败: {e}")
  53. return []
  54. def process_single_record(self) -> bool:
  55. """处理单条记录"""
  56. try:
  57. with self.lock:
  58. # 第一步:获取 category_id = 0 的所有 query_word
  59. query_words = self.get_query_words()
  60. if not query_words:
  61. self.logger.warning("没有可用的 query_word")
  62. return False
  63. # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
  64. # 构建带引号的查询条件
  65. quoted_words = [f"'{word}'" for word in query_words]
  66. placeholders = ','.join(quoted_words)
  67. # 使用 FOR UPDATE 锁定记录,确保原子性操作
  68. # 明确排除正在处理中和已处理的记录
  69. select_sql = f"""
  70. SELECT id, multimodal_recognition
  71. FROM knowledge_search_content
  72. WHERE multimodal_recognition IS NOT NULL
  73. AND structured_data IS NULL
  74. AND query_word IN ({placeholders})
  75. LIMIT 1
  76. """
  77. self.logger.info(f"执行查询: {select_sql}")
  78. records = MysqlHelper.get_values(select_sql)
  79. if not records:
  80. self.logger.warning("没有找到需要处理的记录")
  81. return False
  82. row = records[0]
  83. self.logger.info(f"row: {row}")
  84. record_id = row[0]
  85. self.logger.info(f"record_id: {record_id}")
  86. # 立即标记为处理中,防止其他线程取到重复处理
  87. mark_sql = """
  88. UPDATE knowledge_search_content
  89. SET structured_data = 'PROCESSING'
  90. WHERE id = %s
  91. """
  92. mark_result = MysqlHelper.update_values(mark_sql, (record_id,))
  93. if mark_result is None:
  94. self.logger.error(f"标记记录 {record_id} 为处理中失败")
  95. return False
  96. self.logger.info(f"记录 {record_id} 已标记为处理中")
  97. # 处理内容
  98. result = self.processor.process(row[1], self.system_prompt)
  99. self.logger.info(f"处理完成,结果长度: {len(str(result))}")
  100. self.logger.debug(f"处理结果: {result}")
  101. # 更新数据库为实际结果
  102. update_sql = """
  103. UPDATE knowledge_search_content
  104. SET structured_data = %s
  105. WHERE id = %s
  106. """
  107. update_result = MysqlHelper.update_values(update_sql, (result, record_id))
  108. if update_result is None:
  109. self.logger.error(f"更新记录 {record_id} 失败")
  110. return False
  111. self.logger.info(f"记录 {record_id} 处理完成并更新数据库")
  112. return True
  113. except Exception as e:
  114. self.logger.error(f"处理记录失败: {str(e)}", exc_info=True)
  115. return False
  116. def worker_thread(self, thread_id: int):
  117. """工作线程函数"""
  118. thread_logger = get_logger(f'WorkerThread-{thread_id}')
  119. thread_logger.info(f"线程 {thread_id} 启动")
  120. while not self.stop_event.is_set():
  121. try:
  122. # 尝试处理一条记录
  123. success = self.process_single_record()
  124. if not success:
  125. thread_logger.info(f"没有找到需要处理的记录,等待5秒后重试")
  126. # 等待时也要检查停止信号
  127. if self.stop_event.wait(5):
  128. break
  129. continue
  130. # 处理成功后等待5秒再处理下一条
  131. thread_logger.info(f"处理完成,等待5秒后处理下一条")
  132. # 等待时也要检查停止信号
  133. if self.stop_event.wait(5):
  134. break
  135. except Exception as e:
  136. thread_logger.error(f"发生错误: {str(e)}", exc_info=True)
  137. # 等待时也要检查停止信号
  138. if self.stop_event.wait(5):
  139. break
  140. thread_logger.info(f"线程 {thread_id} 已停止")
  141. def start_multi_thread_processing(self):
  142. """启动多线程处理"""
  143. self.threads = []
  144. self.logger.info("启动多线程处理...")
  145. self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
  146. # 创建5个线程,间隔5秒启动
  147. for i in range(5):
  148. thread = threading.Thread(
  149. target=self.worker_thread,
  150. args=(i + 1,)
  151. )
  152. self.threads.append(thread)
  153. # 启动线程
  154. thread.start()
  155. self.logger.info(f"线程 {i + 1} 已启动")
  156. # 等待5秒后启动下一个线程
  157. if i < 4: # 最后一个线程不需要等待
  158. self.logger.info("等待5秒后启动下一个线程...")
  159. time.sleep(5)
  160. self.logger.info("所有线程已启动,使用 ./start_structure.sh stop 停止")
  161. try:
  162. # 等待所有线程完成
  163. for thread in self.threads:
  164. thread.join()
  165. except KeyboardInterrupt:
  166. self.logger.info("收到停止信号,正在停止所有线程...")
  167. self.stop_all_threads()
  168. def stop_all_threads(self):
  169. """停止所有线程"""
  170. self.logger.info("正在停止所有线程...")
  171. self.stop_event.set()
  172. # 等待所有线程结束
  173. for i, thread in enumerate(self.threads):
  174. if thread.is_alive():
  175. self.logger.info(f"等待线程 {i + 1} 结束...")
  176. thread.join(timeout=10) # 最多等待10秒
  177. if thread.is_alive():
  178. self.logger.warning(f"线程 {i + 1} 未能正常结束")
  179. else:
  180. self.logger.info(f"线程 {i + 1} 已正常结束")
  181. self.logger.info("所有线程已停止")
  182. def main():
  183. """主函数"""
  184. try:
  185. processor = StructureProcessor()
  186. processor.start_multi_thread_processing()
  187. except Exception as e:
  188. print(f"程序执行失败: {str(e)}")
  189. sys.exit(1)
  190. if __name__ == "__main__":
  191. # 测试单条记录处理
  192. processor = StructureProcessor()
  193. processor.process_single_record()