123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # -*- coding: utf-8 -*-
- """
- 更新解析状态的工具
- 找出knowledge_parsing_content表中status=3或者=2的记录,依次进行以下判断设置
- 如果indentify_data为空,则设置status为3
- 如果indentify_data不为空,则设置status为2,
- 如果parsing_data为空且indentify_data为空,则设置status为3
- 如果parsing_data为空且indentify_data不为空,则设置status为2
- 如果parsing_data不为空,则设置status为5
- """
- import sys
- import os
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.mysql_db import MysqlHelper
- from loguru import logger
- class UpdateParsingStatus:
- def __init__(self):
- self.logger = logger
-
- def find_status_records(self):
- """
- 查找status为3或2的记录
- """
- sql = """
- SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status
- FROM knowledge_parsing_content
- WHERE status = 3 OR status = 2
- """
-
- try:
- records = MysqlHelper.get_values(sql)
- self.logger.info(f"找到 {len(records)} 条status为3或2的记录")
- return records
- except Exception as e:
- self.logger.error(f"查询status为3或2的记录失败: {e}")
- return []
-
- def update_record_status(self, record_id, new_status):
- """
- 更新记录的status字段
- """
- sql = "UPDATE knowledge_parsing_content SET status = %s WHERE id = %s"
- params = (new_status, record_id)
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"更新记录状态成功: record_id={record_id}, status={new_status}")
- return True
- return False
- except Exception as e:
- self.logger.error(f"更新记录状态失败: {e}")
- return False
-
- def batch_update_status(self, record_ids, new_status):
- """
- 批量更新记录的status字段
- """
- if not record_ids:
- return 0
-
- # 构建批量更新的SQL
- placeholders = ','.join(['%s'] * len(record_ids))
- sql = f"UPDATE knowledge_parsing_content SET status = %s, update_time = NOW() WHERE id IN ({placeholders})"
- params = [new_status] + record_ids
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"批量更新status成功: 更新了 {result} 条记录,status={new_status}")
- return result
- return 0
- except Exception as e:
- self.logger.error(f"批量更新status失败: {e}")
- return 0
-
- def process_records(self, records):
- """
- 处理所有status为3或2的记录 - 使用批量操作提高性能
- """
- status_2_records = []
- status_3_records = []
- status_5_records = []
-
- # 分类记录
- for record in records:
- record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record
-
- self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}")
-
- # 检查indentify_data是否为空
- indentify_data_empty = indentify_data is None or indentify_data.strip() == ""
- # 检查indentify_data是否不为空
- indentify_data_not_empty = indentify_data is not None and indentify_data.strip() != ""
- # 检查parsing_data是否为空
- parsing_data_empty = parsing_data is None or parsing_data.strip() == ""
- # 检查parsing_data是否不为空
- parsing_data_not_empty = parsing_data is not None and parsing_data.strip() != ""
-
- new_status = None
-
- # 如果parsing_data为空且indentify_data为空,则设置status为3
- if parsing_data_empty and indentify_data_empty:
- new_status = 3
- status_3_records.append(record_id)
- self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data为空,设置status为3")
-
- # 如果parsing_data为空且indentify_data不为空,则设置status为2
- if parsing_data_empty and indentify_data_not_empty:
- new_status = 2
- status_2_records.append(record_id)
- self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data不为空,设置status为2")
-
- # 如果parsing_data不为空,则设置status为5
- if parsing_data_not_empty:
- new_status = 5
- status_5_records.append(record_id)
- self.logger.info(f"记录 {record_id}: parsing_data不为空,设置status为5")
-
- success_count = 0
-
- # 批量更新status为2的记录
- if status_2_records:
- success_count += self.batch_update_status(status_2_records, 2)
-
- # 批量更新status为3的记录
- if status_3_records:
- success_count += self.batch_update_status(status_3_records, 3)
-
- # 批量更新status为5的记录
- if status_5_records:
- success_count += self.batch_update_status(status_5_records, 5)
-
- self.logger.info(f"处理完成: 成功 {success_count} 条")
- self.logger.info(f"状态更新统计: 更新为status=2的有 {len(status_2_records)} 条,更新为status=3的有 {len(status_3_records)} 条,更新为status=5的有 {len(status_5_records)} 条")
- return success_count, 0
-
- def run(self):
- """
- 执行状态更新的主流程
- """
- self.logger.info("开始更新解析状态...")
-
- # 查找status为3或2的记录
- records = self.find_status_records()
-
- if not records:
- self.logger.info("没有找到status为3或2的记录")
- return
-
- # 处理所有记录
- success_count, error_count = self.process_records(records)
-
- self.logger.info(f"解析状态更新完成: 成功 {success_count} 条,失败 {error_count} 条")
- if __name__ == "__main__":
- updater = UpdateParsingStatus()
- updater.run()
|