# -*- coding: utf-8 -*- """ 处理429错误的数据清理工具 执行 SELECT * FROM `knowledge_parsing_content` WHERE `parsing_data` LIKE "%429 You exceeded your current quota%" OR `indentify_data` LIKE "%429 You exceeded your current quota%" 查找出结果之后,将结果逐条进行处理,如果是parsing_data,则将parsing_data字段设置为空,如果是indentify_data,则将indentify_data字段和parsing_data字段设置为空, 并且将status字段设置为3 再取出该条中的request_id字段,在knowledge_request中将该request_id的parsing_status、extraction_status和expansion_status字段都设置为3 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from loguru import logger class Handle429Error: def __init__(self): self.logger = logger def find_429_error_records(self): """ 查找包含429错误的记录 """ sql = """ SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status FROM knowledge_parsing_content WHERE parsing_data LIKE %s OR indentify_data LIKE %s """ params = ("%429 You exceeded your current quota%", "%429 You exceeded your current quota%") try: records = MysqlHelper.get_values(sql, params) self.logger.info(f"找到 {len(records)} 条包含429错误的记录") return records except Exception as e: self.logger.error(f"查询429错误记录失败: {e}") return [] def process_parsing_data_error(self, record_id, request_id): """ 处理parsing_data字段包含429错误的情况 将parsing_data字段设置为空,status设置为2 """ sql = "UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2 WHERE id = %s" params = (record_id,) try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"处理parsing_data错误成功: record_id={record_id}") # 更新knowledge_request表的parsing_status self.update_request_status(request_id, 3) return True return False except Exception as e: self.logger.error(f"处理parsing_data错误失败: {e}") return False def process_indentify_data_error(self, record_id, request_id): """ 处理indentify_data字段包含429错误的情况 将indentify_data字段和parsing_data字段设置为空,status设置为3 """ sql = "UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3 WHERE id = %s" params = (record_id,) try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"处理indentify_data错误成功: record_id={record_id}") # 更新knowledge_request表的parsing_status self.update_request_status(request_id, 3) return True return False except Exception as e: self.logger.error(f"处理indentify_data错误失败: {e}") return False def update_request_status(self, request_id, status): """ 更新knowledge_request表的parsing_status、extraction_status和expansion_status字段 """ sql = "UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id = %s" params = (status, status, status, request_id) try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}, extraction_status={status}, expansion_status={status}") return True return False except Exception as e: self.logger.error(f"更新request状态失败: {e}") return False def batch_process_parsing_data_error(self, record_ids): """ 批量处理parsing_data字段包含429错误的情况 """ if not record_ids: return 0 # 构建批量更新的SQL placeholders = ','.join(['%s'] * len(record_ids)) sql = f"UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2, update_time = NOW() WHERE id IN ({placeholders})" try: result = MysqlHelper.update_values(sql, record_ids) if result: self.logger.info(f"批量处理parsing_data错误成功: 更新了 {result} 条记录") return result return 0 except Exception as e: self.logger.error(f"批量处理parsing_data错误失败: {e}") return 0 def batch_process_indentify_data_error(self, record_ids): """ 批量处理indentify_data字段包含429错误的情况 """ if not record_ids: return 0 # 构建批量更新的SQL placeholders = ','.join(['%s'] * len(record_ids)) sql = f"UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3, update_time = NOW() WHERE id IN ({placeholders})" try: result = MysqlHelper.update_values(sql, record_ids) if result: self.logger.info(f"批量处理indentify_data错误成功: 更新了 {result} 条记录") return result return 0 except Exception as e: self.logger.error(f"批量处理indentify_data错误失败: {e}") return 0 def batch_update_request_status(self, request_ids, status): """ 批量更新knowledge_request表的状态字段 """ if not request_ids: return 0 # 构建批量更新的SQL placeholders = ','.join(['%s'] * len(request_ids)) sql = f"UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id IN ({placeholders})" params = [status, status, status] + request_ids try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"批量更新request状态成功: 更新了 {result} 条记录,status={status}") return result return 0 except Exception as e: self.logger.error(f"批量更新request状态失败: {e}") return 0 def process_records(self, records): """ 处理所有包含429错误的记录 - 使用批量操作提高性能 """ parsing_data_records = [] indentify_data_records = [] request_ids_to_update = set() # 分类记录 for record in records: record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}") # 检查indentify_data是否包含429错误 if indentify_data and "429 You exceeded your current quota" in indentify_data: indentify_data_records.append(record_id) request_ids_to_update.add(request_id) self.logger.info(f"记录 {record_id}: indentify_data包含429错误") # 检查parsing_data是否包含429错误 if parsing_data and "429 You exceeded your current quota" in parsing_data: parsing_data_records.append(record_id) request_ids_to_update.add(request_id) self.logger.info(f"记录 {record_id}: parsing_data包含429错误") success_count = 0 error_count = 0 # 批量处理parsing_data错误 if parsing_data_records: success_count += self.batch_process_parsing_data_error(parsing_data_records) # 批量处理indentify_data错误 if indentify_data_records: success_count += self.batch_process_indentify_data_error(indentify_data_records) # 批量更新request状态 if request_ids_to_update: self.batch_update_request_status(list(request_ids_to_update), 3) self.logger.info(f"处理完成: 成功 {success_count} 条,失败 {error_count} 条") return success_count, error_count def run(self): """ 执行429错误处理的主流程 """ self.logger.info("开始处理429错误...") # 查找包含429错误的记录 records = self.find_429_error_records() if not records: self.logger.info("没有找到包含429错误的记录") return # 处理所有记录 success_count, error_count = self.process_records(records) self.logger.info(f"429错误处理完成: 成功 {success_count} 条,失败 {error_count} 条") if __name__ == "__main__": handler = Handle429Error() handler.run()