123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- # -*- coding: utf-8 -*-
- """
- 处理429错误的数据清理工具
- 执行 SELECT * FROM `knowledge_parsing_content` WHERE `parsing_data` LIKE "%429 You exceeded your current quota%" OR `indentify_data` LIKE "%429 You exceeded your current quota%"
- 查找出结果之后,将结果逐条进行处理,如果是parsing_data,则将parsing_data字段设置为空,如果是indentify_data,则将indentify_data字段和parsing_data字段设置为空,
- 并且将status字段设置为3
- 再取出该条中的request_id字段,在knowledge_request中将该request_id的parsing_status、extraction_status和expansion_status字段都设置为3
- """
- import sys
- import os
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.mysql_db import MysqlHelper
- from loguru import logger
- class Handle429Error:
- def __init__(self):
- self.logger = logger
-
- def find_429_error_records(self):
- """
- 查找包含429错误的记录
- """
- sql = """
- SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status
- FROM knowledge_parsing_content
- WHERE parsing_data LIKE %s OR indentify_data LIKE %s
- """
- params = ("%429 You exceeded your current quota%", "%429 You exceeded your current quota%")
-
- try:
- records = MysqlHelper.get_values(sql, params)
- self.logger.info(f"找到 {len(records)} 条包含429错误的记录")
- return records
- except Exception as e:
- self.logger.error(f"查询429错误记录失败: {e}")
- return []
-
- def process_parsing_data_error(self, record_id, request_id):
- """
- 处理parsing_data字段包含429错误的情况
- 将parsing_data字段设置为空,status设置为2
- """
- sql = "UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2 WHERE id = %s"
- params = (record_id,)
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"处理parsing_data错误成功: record_id={record_id}")
- # 更新knowledge_request表的parsing_status
- self.update_request_status(request_id, 3)
- return True
- return False
- except Exception as e:
- self.logger.error(f"处理parsing_data错误失败: {e}")
- return False
-
- def process_indentify_data_error(self, record_id, request_id):
- """
- 处理indentify_data字段包含429错误的情况
- 将indentify_data字段和parsing_data字段设置为空,status设置为3
- """
- sql = "UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3 WHERE id = %s"
- params = (record_id,)
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"处理indentify_data错误成功: record_id={record_id}")
- # 更新knowledge_request表的parsing_status
- self.update_request_status(request_id, 3)
- return True
- return False
- except Exception as e:
- self.logger.error(f"处理indentify_data错误失败: {e}")
- return False
-
- def update_request_status(self, request_id, status):
- """
- 更新knowledge_request表的parsing_status、extraction_status和expansion_status字段
- """
- sql = "UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id = %s"
- params = (status, status, status, request_id)
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}, extraction_status={status}, expansion_status={status}")
- return True
- return False
- except Exception as e:
- self.logger.error(f"更新request状态失败: {e}")
- return False
-
- def batch_process_parsing_data_error(self, record_ids):
- """
- 批量处理parsing_data字段包含429错误的情况
- """
- if not record_ids:
- return 0
-
- # 构建批量更新的SQL
- placeholders = ','.join(['%s'] * len(record_ids))
- sql = f"UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2, update_time = NOW() WHERE id IN ({placeholders})"
-
- try:
- result = MysqlHelper.update_values(sql, record_ids)
- if result:
- self.logger.info(f"批量处理parsing_data错误成功: 更新了 {result} 条记录")
- return result
- return 0
- except Exception as e:
- self.logger.error(f"批量处理parsing_data错误失败: {e}")
- return 0
-
- def batch_process_indentify_data_error(self, record_ids):
- """
- 批量处理indentify_data字段包含429错误的情况
- """
- if not record_ids:
- return 0
-
- # 构建批量更新的SQL
- placeholders = ','.join(['%s'] * len(record_ids))
- sql = f"UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3, update_time = NOW() WHERE id IN ({placeholders})"
-
- try:
- result = MysqlHelper.update_values(sql, record_ids)
- if result:
- self.logger.info(f"批量处理indentify_data错误成功: 更新了 {result} 条记录")
- return result
- return 0
- except Exception as e:
- self.logger.error(f"批量处理indentify_data错误失败: {e}")
- return 0
-
- def batch_update_request_status(self, request_ids, status):
- """
- 批量更新knowledge_request表的状态字段
- """
- if not request_ids:
- return 0
-
- # 构建批量更新的SQL
- placeholders = ','.join(['%s'] * len(request_ids))
- sql = f"UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id IN ({placeholders})"
- params = [status, status, status] + request_ids
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"批量更新request状态成功: 更新了 {result} 条记录,status={status}")
- return result
- return 0
- except Exception as e:
- self.logger.error(f"批量更新request状态失败: {e}")
- return 0
-
- def process_records(self, records):
- """
- 处理所有包含429错误的记录 - 使用批量操作提高性能
- """
- parsing_data_records = []
- indentify_data_records = []
- request_ids_to_update = set()
-
- # 分类记录
- for record in records:
- record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record
-
- self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}")
-
- # 检查indentify_data是否包含429错误
- if indentify_data and "429 You exceeded your current quota" in indentify_data:
- indentify_data_records.append(record_id)
- request_ids_to_update.add(request_id)
- self.logger.info(f"记录 {record_id}: indentify_data包含429错误")
-
- # 检查parsing_data是否包含429错误
- if parsing_data and "429 You exceeded your current quota" in parsing_data:
- parsing_data_records.append(record_id)
- request_ids_to_update.add(request_id)
- self.logger.info(f"记录 {record_id}: parsing_data包含429错误")
-
- success_count = 0
- error_count = 0
-
- # 批量处理parsing_data错误
- if parsing_data_records:
- success_count += self.batch_process_parsing_data_error(parsing_data_records)
-
- # 批量处理indentify_data错误
- if indentify_data_records:
- success_count += self.batch_process_indentify_data_error(indentify_data_records)
-
- # 批量更新request状态
- if request_ids_to_update:
- self.batch_update_request_status(list(request_ids_to_update), 3)
-
- self.logger.info(f"处理完成: 成功 {success_count} 条,失败 {error_count} 条")
- return success_count, error_count
-
- def run(self):
- """
- 执行429错误处理的主流程
- """
- self.logger.info("开始处理429错误...")
-
- # 查找包含429错误的记录
- records = self.find_429_error_records()
-
- if not records:
- self.logger.info("没有找到包含429错误的记录")
- return
-
- # 处理所有记录
- success_count, error_count = self.process_records(records)
-
- self.logger.info(f"429错误处理完成: 成功 {success_count} 条,失败 {error_count} 条")
- if __name__ == "__main__":
- handler = Handle429Error()
- handler.run()
|