handle_429_error.py 9.2 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. 处理429错误的数据清理工具
  4. 执行 SELECT * FROM `knowledge_parsing_content` WHERE `parsing_data` LIKE "%429 You exceeded your current quota%" OR `indentify_data` LIKE "%429 You exceeded your current quota%"
  5. 查找出结果之后,将结果逐条进行处理,如果是parsing_data,则将parsing_data字段设置为空,如果是indentify_data,则将indentify_data字段和parsing_data字段设置为空,
  6. 并且将status字段设置为3
  7. 再取出该条中的request_id字段,在knowledge_request中将该request_id的parsing_status、extraction_status和expansion_status字段都设置为3
  8. """
  9. import sys
  10. import os
  11. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. from utils.mysql_db import MysqlHelper
  13. from loguru import logger
  14. class Handle429Error:
  15. def __init__(self):
  16. self.logger = logger
  17. def find_429_error_records(self):
  18. """
  19. 查找包含429错误的记录
  20. """
  21. sql = """
  22. SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status
  23. FROM knowledge_parsing_content
  24. WHERE parsing_data LIKE %s OR indentify_data LIKE %s
  25. """
  26. params = ("%429 You exceeded your current quota%", "%429 You exceeded your current quota%")
  27. try:
  28. records = MysqlHelper.get_values(sql, params)
  29. self.logger.info(f"找到 {len(records)} 条包含429错误的记录")
  30. return records
  31. except Exception as e:
  32. self.logger.error(f"查询429错误记录失败: {e}")
  33. return []
  34. def process_parsing_data_error(self, record_id, request_id):
  35. """
  36. 处理parsing_data字段包含429错误的情况
  37. 将parsing_data字段设置为空,status设置为2
  38. """
  39. sql = "UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2 WHERE id = %s"
  40. params = (record_id,)
  41. try:
  42. result = MysqlHelper.update_values(sql, params)
  43. if result:
  44. self.logger.info(f"处理parsing_data错误成功: record_id={record_id}")
  45. # 更新knowledge_request表的parsing_status
  46. self.update_request_status(request_id, 3)
  47. return True
  48. return False
  49. except Exception as e:
  50. self.logger.error(f"处理parsing_data错误失败: {e}")
  51. return False
  52. def process_indentify_data_error(self, record_id, request_id):
  53. """
  54. 处理indentify_data字段包含429错误的情况
  55. 将indentify_data字段和parsing_data字段设置为空,status设置为3
  56. """
  57. sql = "UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3 WHERE id = %s"
  58. params = (record_id,)
  59. try:
  60. result = MysqlHelper.update_values(sql, params)
  61. if result:
  62. self.logger.info(f"处理indentify_data错误成功: record_id={record_id}")
  63. # 更新knowledge_request表的parsing_status
  64. self.update_request_status(request_id, 3)
  65. return True
  66. return False
  67. except Exception as e:
  68. self.logger.error(f"处理indentify_data错误失败: {e}")
  69. return False
  70. def update_request_status(self, request_id, status):
  71. """
  72. 更新knowledge_request表的parsing_status、extraction_status和expansion_status字段
  73. """
  74. sql = "UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id = %s"
  75. params = (status, status, status, request_id)
  76. try:
  77. result = MysqlHelper.update_values(sql, params)
  78. if result:
  79. self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}, extraction_status={status}, expansion_status={status}")
  80. return True
  81. return False
  82. except Exception as e:
  83. self.logger.error(f"更新request状态失败: {e}")
  84. return False
  85. def batch_process_parsing_data_error(self, record_ids):
  86. """
  87. 批量处理parsing_data字段包含429错误的情况
  88. """
  89. if not record_ids:
  90. return 0
  91. # 构建批量更新的SQL
  92. placeholders = ','.join(['%s'] * len(record_ids))
  93. sql = f"UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2, update_time = NOW() WHERE id IN ({placeholders})"
  94. try:
  95. result = MysqlHelper.update_values(sql, record_ids)
  96. if result:
  97. self.logger.info(f"批量处理parsing_data错误成功: 更新了 {result} 条记录")
  98. return result
  99. return 0
  100. except Exception as e:
  101. self.logger.error(f"批量处理parsing_data错误失败: {e}")
  102. return 0
  103. def batch_process_indentify_data_error(self, record_ids):
  104. """
  105. 批量处理indentify_data字段包含429错误的情况
  106. """
  107. if not record_ids:
  108. return 0
  109. # 构建批量更新的SQL
  110. placeholders = ','.join(['%s'] * len(record_ids))
  111. sql = f"UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3, update_time = NOW() WHERE id IN ({placeholders})"
  112. try:
  113. result = MysqlHelper.update_values(sql, record_ids)
  114. if result:
  115. self.logger.info(f"批量处理indentify_data错误成功: 更新了 {result} 条记录")
  116. return result
  117. return 0
  118. except Exception as e:
  119. self.logger.error(f"批量处理indentify_data错误失败: {e}")
  120. return 0
  121. def batch_update_request_status(self, request_ids, status):
  122. """
  123. 批量更新knowledge_request表的状态字段
  124. """
  125. if not request_ids:
  126. return 0
  127. # 构建批量更新的SQL
  128. placeholders = ','.join(['%s'] * len(request_ids))
  129. sql = f"UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id IN ({placeholders})"
  130. params = [status, status, status] + request_ids
  131. try:
  132. result = MysqlHelper.update_values(sql, params)
  133. if result:
  134. self.logger.info(f"批量更新request状态成功: 更新了 {result} 条记录,status={status}")
  135. return result
  136. return 0
  137. except Exception as e:
  138. self.logger.error(f"批量更新request状态失败: {e}")
  139. return 0
  140. def process_records(self, records):
  141. """
  142. 处理所有包含429错误的记录 - 使用批量操作提高性能
  143. """
  144. parsing_data_records = []
  145. indentify_data_records = []
  146. request_ids_to_update = set()
  147. # 分类记录
  148. for record in records:
  149. record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record
  150. self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}")
  151. # 检查indentify_data是否包含429错误
  152. if indentify_data and "429 You exceeded your current quota" in indentify_data:
  153. indentify_data_records.append(record_id)
  154. request_ids_to_update.add(request_id)
  155. self.logger.info(f"记录 {record_id}: indentify_data包含429错误")
  156. # 检查parsing_data是否包含429错误
  157. if parsing_data and "429 You exceeded your current quota" in parsing_data:
  158. parsing_data_records.append(record_id)
  159. request_ids_to_update.add(request_id)
  160. self.logger.info(f"记录 {record_id}: parsing_data包含429错误")
  161. success_count = 0
  162. error_count = 0
  163. # 批量处理parsing_data错误
  164. if parsing_data_records:
  165. success_count += self.batch_process_parsing_data_error(parsing_data_records)
  166. # 批量处理indentify_data错误
  167. if indentify_data_records:
  168. success_count += self.batch_process_indentify_data_error(indentify_data_records)
  169. # 批量更新request状态
  170. if request_ids_to_update:
  171. self.batch_update_request_status(list(request_ids_to_update), 3)
  172. self.logger.info(f"处理完成: 成功 {success_count} 条,失败 {error_count} 条")
  173. return success_count, error_count
  174. def run(self):
  175. """
  176. 执行429错误处理的主流程
  177. """
  178. self.logger.info("开始处理429错误...")
  179. # 查找包含429错误的记录
  180. records = self.find_429_error_records()
  181. if not records:
  182. self.logger.info("没有找到包含429错误的记录")
  183. return
  184. # 处理所有记录
  185. success_count, error_count = self.process_records(records)
  186. self.logger.info(f"429错误处理完成: 成功 {success_count} 条,失败 {error_count} 条")
  187. if __name__ == "__main__":
  188. handler = Handle429Error()
  189. handler.run()