# -*- coding: utf-8 -*- """ 更新解析状态的工具 找出knowledge_parsing_content表中status=3或者=2的记录,依次进行以下判断设置 如果indentify_data为空,则设置status为3 如果indentify_data不为空,则设置status为2, 如果parsing_data为空且indentify_data为空,则设置status为3 如果parsing_data为空且indentify_data不为空,则设置status为2 如果parsing_data不为空,则设置status为5 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from loguru import logger class UpdateParsingStatus: def __init__(self): self.logger = logger def find_status_records(self): """ 查找status为3或2的记录 """ sql = """ SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status FROM knowledge_parsing_content WHERE status = 3 OR status = 2 """ try: records = MysqlHelper.get_values(sql) self.logger.info(f"找到 {len(records)} 条status为3或2的记录") return records except Exception as e: self.logger.error(f"查询status为3或2的记录失败: {e}") return [] def update_record_status(self, record_id, new_status): """ 更新记录的status字段 """ sql = "UPDATE knowledge_parsing_content SET status = %s WHERE id = %s" params = (new_status, record_id) try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"更新记录状态成功: record_id={record_id}, status={new_status}") return True return False except Exception as e: self.logger.error(f"更新记录状态失败: {e}") return False def batch_update_status(self, record_ids, new_status): """ 批量更新记录的status字段 """ if not record_ids: return 0 # 构建批量更新的SQL placeholders = ','.join(['%s'] * len(record_ids)) sql = f"UPDATE knowledge_parsing_content SET status = %s, update_time = NOW() WHERE id IN ({placeholders})" params = [new_status] + record_ids try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"批量更新status成功: 更新了 {result} 条记录,status={new_status}") return result return 0 except Exception as e: self.logger.error(f"批量更新status失败: {e}") return 0 def process_records(self, records): """ 处理所有status为3或2的记录 - 使用批量操作提高性能 """ status_2_records = [] status_3_records = [] status_5_records = [] # 分类记录 for record in records: record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}") # 检查indentify_data是否为空 indentify_data_empty = indentify_data is None or indentify_data.strip() == "" # 检查indentify_data是否不为空 indentify_data_not_empty = indentify_data is not None and indentify_data.strip() != "" # 检查parsing_data是否为空 parsing_data_empty = parsing_data is None or parsing_data.strip() == "" # 检查parsing_data是否不为空 parsing_data_not_empty = parsing_data is not None and parsing_data.strip() != "" new_status = None # 如果parsing_data为空且indentify_data为空,则设置status为3 if parsing_data_empty and indentify_data_empty: new_status = 3 status_3_records.append(record_id) self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data为空,设置status为3") # 如果parsing_data为空且indentify_data不为空,则设置status为2 if parsing_data_empty and indentify_data_not_empty: new_status = 2 status_2_records.append(record_id) self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data不为空,设置status为2") # 如果parsing_data不为空,则设置status为5 if parsing_data_not_empty: new_status = 5 status_5_records.append(record_id) self.logger.info(f"记录 {record_id}: parsing_data不为空,设置status为5") success_count = 0 # 批量更新status为2的记录 if status_2_records: success_count += self.batch_update_status(status_2_records, 2) # 批量更新status为3的记录 if status_3_records: success_count += self.batch_update_status(status_3_records, 3) # 批量更新status为5的记录 if status_5_records: success_count += self.batch_update_status(status_5_records, 5) self.logger.info(f"处理完成: 成功 {success_count} 条") self.logger.info(f"状态更新统计: 更新为status=2的有 {len(status_2_records)} 条,更新为status=3的有 {len(status_3_records)} 条,更新为status=5的有 {len(status_5_records)} 条") return success_count, 0 def run(self): """ 执行状态更新的主流程 """ self.logger.info("开始更新解析状态...") # 查找status为3或2的记录 records = self.find_status_records() if not records: self.logger.info("没有找到status为3或2的记录") return # 处理所有记录 success_count, error_count = self.process_records(records) self.logger.info(f"解析状态更新完成: 成功 {success_count} 条,失败 {error_count} 条") if __name__ == "__main__": updater = UpdateParsingStatus() updater.run()