|
@@ -40,61 +40,79 @@ class StructureProcessor:
|
|
|
self.stop_event = threading.Event()
|
|
|
self.threads = []
|
|
|
|
|
|
- def build_query_conditions(self, query_word: Optional[str],
|
|
|
- source_type: Optional[str],
|
|
|
- source_channel: Optional[str]) -> Tuple[str, Tuple]:
|
|
|
- """构建查询条件和参数"""
|
|
|
- conditions = ["multimodal_recognition is not null", "structured_data is null"]
|
|
|
- params = []
|
|
|
-
|
|
|
- if query_word is not None:
|
|
|
- conditions.append("query_word = %s")
|
|
|
- params.append(query_word)
|
|
|
- if source_type is not None:
|
|
|
- conditions.append("source_type = %s")
|
|
|
- params.append(source_type)
|
|
|
- if source_channel is not None:
|
|
|
- conditions.append("source_channel = %s")
|
|
|
- params.append(source_channel)
|
|
|
+ def get_query_words(self) -> List[str]:
|
|
|
+ """从 knowledge_content_query 表中获取 category_id = 0 的所有 query_word"""
|
|
|
+ try:
|
|
|
+ sql = """
|
|
|
+ SELECT query_word
|
|
|
+ FROM knowledge_content_query
|
|
|
+ WHERE category_id = 0
|
|
|
+ """
|
|
|
|
|
|
- where_clause = " AND ".join(conditions)
|
|
|
- return where_clause, tuple(params)
|
|
|
+ result = MysqlHelper.get_values(sql)
|
|
|
+ if result:
|
|
|
+ query_words = [row[0] for row in result]
|
|
|
+ self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
|
|
|
+ return query_words
|
|
|
+ else:
|
|
|
+ self.logger.warning("未找到 category_id = 0 的 query_word")
|
|
|
+ return []
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"获取 query_word 失败: {e}")
|
|
|
+ return []
|
|
|
|
|
|
- def process_single_record(self, query_word: Optional[str],
|
|
|
- source_type: Optional[str],
|
|
|
- source_channel: Optional[str]) -> bool:
|
|
|
+ def process_single_record(self) -> bool:
|
|
|
"""处理单条记录"""
|
|
|
try:
|
|
|
with self.lock:
|
|
|
- # 构建查询条件和参数
|
|
|
- where_clause, params = self.build_query_conditions(query_word, source_type, source_channel)
|
|
|
+ # 第一步:获取 category_id = 0 的所有 query_word
|
|
|
+ query_words = self.get_query_words()
|
|
|
+ if not query_words:
|
|
|
+ self.logger.warning("没有可用的 query_word")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
|
|
|
+ # 构建带引号的查询条件
|
|
|
+ quoted_words = [f"'{word}'" for word in query_words]
|
|
|
+ placeholders = ','.join(quoted_words)
|
|
|
|
|
|
- # 先查询一条需要处理的记录
|
|
|
+ # 使用 FOR UPDATE 锁定记录,确保原子性操作
|
|
|
+ # 明确排除正在处理中和已处理的记录
|
|
|
select_sql = f"""
|
|
|
SELECT id, multimodal_recognition
|
|
|
FROM knowledge_search_content
|
|
|
- WHERE {where_clause}
|
|
|
+ WHERE multimodal_recognition IS NOT NULL
|
|
|
+ AND structured_data IS NULL
|
|
|
+ AND query_word IN ({placeholders})
|
|
|
LIMIT 1
|
|
|
"""
|
|
|
|
|
|
- records = MysqlHelper.get_values(select_sql, params)
|
|
|
+ self.logger.info(f"执行查询: {select_sql}")
|
|
|
+
|
|
|
+ records = MysqlHelper.get_values(select_sql)
|
|
|
if not records:
|
|
|
self.logger.warning("没有找到需要处理的记录")
|
|
|
return False
|
|
|
|
|
|
row = records[0]
|
|
|
+ self.logger.info(f"row: {row}")
|
|
|
record_id = row[0]
|
|
|
+ self.logger.info(f"record_id: {record_id}")
|
|
|
|
|
|
- # 标记为处理中,防止其他线程取到重复处理
|
|
|
+ # 立即标记为处理中,防止其他线程取到重复处理
|
|
|
mark_sql = """
|
|
|
UPDATE knowledge_search_content
|
|
|
- SET structured_data = '{}'
|
|
|
+ SET structured_data = 'PROCESSING'
|
|
|
WHERE id = %s
|
|
|
"""
|
|
|
|
|
|
- MysqlHelper.update_values(mark_sql, (record_id,))
|
|
|
+ mark_result = MysqlHelper.update_values(mark_sql, (record_id,))
|
|
|
+ if mark_result is None:
|
|
|
+ self.logger.error(f"标记记录 {record_id} 为处理中失败")
|
|
|
+ return False
|
|
|
|
|
|
- self.logger.info(f"开始处理记录 ID: {record_id}")
|
|
|
+ self.logger.info(f"记录 {record_id} 已标记为处理中")
|
|
|
|
|
|
# 处理内容
|
|
|
result = self.processor.process(row[1], self.system_prompt)
|
|
@@ -108,7 +126,11 @@ class StructureProcessor:
|
|
|
WHERE id = %s
|
|
|
"""
|
|
|
|
|
|
- MysqlHelper.update_values(update_sql, (result, record_id))
|
|
|
+ update_result = MysqlHelper.update_values(update_sql, (result, record_id))
|
|
|
+ if update_result is None:
|
|
|
+ self.logger.error(f"更新记录 {record_id} 失败")
|
|
|
+ return False
|
|
|
+
|
|
|
self.logger.info(f"记录 {record_id} 处理完成并更新数据库")
|
|
|
return True
|
|
|
|
|
@@ -116,8 +138,7 @@ class StructureProcessor:
|
|
|
self.logger.error(f"处理记录失败: {str(e)}", exc_info=True)
|
|
|
return False
|
|
|
|
|
|
- def worker_thread(self, thread_id: int, query_word: Optional[str],
|
|
|
- source_type: Optional[str], source_channel: Optional[str]):
|
|
|
+ def worker_thread(self, thread_id: int):
|
|
|
"""工作线程函数"""
|
|
|
thread_logger = get_logger(f'WorkerThread-{thread_id}')
|
|
|
thread_logger.info(f"线程 {thread_id} 启动")
|
|
@@ -125,7 +146,7 @@ class StructureProcessor:
|
|
|
while not self.stop_event.is_set():
|
|
|
try:
|
|
|
# 尝试处理一条记录
|
|
|
- success = self.process_single_record(query_word, source_type, source_channel)
|
|
|
+ success = self.process_single_record()
|
|
|
|
|
|
if not success:
|
|
|
thread_logger.info(f"没有找到需要处理的记录,等待5秒后重试")
|
|
@@ -148,20 +169,18 @@ class StructureProcessor:
|
|
|
|
|
|
thread_logger.info(f"线程 {thread_id} 已停止")
|
|
|
|
|
|
- def start_multi_thread_processing(self, query_word: Optional[str],
|
|
|
- source_type: Optional[str],
|
|
|
- source_channel: Optional[str]):
|
|
|
+ def start_multi_thread_processing(self):
|
|
|
"""启动多线程处理"""
|
|
|
self.threads = []
|
|
|
|
|
|
self.logger.info("启动多线程处理...")
|
|
|
- self.logger.info(f"查询条件: query_word={query_word}, source_type={source_type}, source_channel={source_channel}")
|
|
|
+ self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
|
|
|
|
|
|
# 创建5个线程,间隔5秒启动
|
|
|
for i in range(5):
|
|
|
thread = threading.Thread(
|
|
|
target=self.worker_thread,
|
|
|
- args=(i + 1, query_word, source_type, source_channel)
|
|
|
+ args=(i + 1,)
|
|
|
)
|
|
|
self.threads.append(thread)
|
|
|
|
|
@@ -204,23 +223,9 @@ class StructureProcessor:
|
|
|
|
|
|
def main():
|
|
|
"""主函数"""
|
|
|
- import argparse
|
|
|
-
|
|
|
- parser = argparse.ArgumentParser(description='内容结构化处理脚本')
|
|
|
- parser.add_argument('--query_word', default=None, help='query词')
|
|
|
- parser.add_argument('--source_type', default=None, help='数据源类型')
|
|
|
- parser.add_argument('--source_channel', default=None, help='数据源渠道')
|
|
|
-
|
|
|
- args = parser.parse_args()
|
|
|
-
|
|
|
try:
|
|
|
processor = StructureProcessor()
|
|
|
-
|
|
|
- processor.start_multi_thread_processing(
|
|
|
- query_word=args.query_word,
|
|
|
- source_type=args.source_type,
|
|
|
- source_channel=args.source_channel
|
|
|
- )
|
|
|
+ processor.start_multi_thread_processing()
|
|
|
except Exception as e:
|
|
|
print(f"程序执行失败: {str(e)}")
|
|
|
sys.exit(1)
|
|
@@ -229,4 +234,4 @@ def main():
|
|
|
if __name__ == "__main__":
|
|
|
# 测试单条记录处理
|
|
|
processor = StructureProcessor()
|
|
|
- processor.process_single_record(query_word=None, source_type=None, source_channel=None)
|
|
|
+ processor.process_single_record()
|