update_parsing_status.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding: utf-8 -*-
  2. """
  3. 更新解析状态的工具
  4. 找出knowledge_parsing_content表中status=3或者=2的记录,依次进行以下判断设置
  5. 如果indentify_data为空,则设置status为3
  6. 如果indentify_data不为空,则设置status为2,
  7. 如果parsing_data为空且indentify_data为空,则设置status为3
  8. 如果parsing_data为空且indentify_data不为空,则设置status为2
  9. 如果parsing_data不为空,则设置status为5
  10. """
  11. import sys
  12. import os
  13. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. from utils.mysql_db import MysqlHelper
  15. from loguru import logger
  16. class UpdateParsingStatus:
  17. def __init__(self):
  18. self.logger = logger
  19. def find_status_records(self):
  20. """
  21. 查找status为3或2的记录
  22. """
  23. sql = """
  24. SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status
  25. FROM knowledge_parsing_content
  26. WHERE status = 3 OR status = 2
  27. """
  28. try:
  29. records = MysqlHelper.get_values(sql)
  30. self.logger.info(f"找到 {len(records)} 条status为3或2的记录")
  31. return records
  32. except Exception as e:
  33. self.logger.error(f"查询status为3或2的记录失败: {e}")
  34. return []
  35. def update_record_status(self, record_id, new_status):
  36. """
  37. 更新记录的status字段
  38. """
  39. sql = "UPDATE knowledge_parsing_content SET status = %s WHERE id = %s"
  40. params = (new_status, record_id)
  41. try:
  42. result = MysqlHelper.update_values(sql, params)
  43. if result:
  44. self.logger.info(f"更新记录状态成功: record_id={record_id}, status={new_status}")
  45. return True
  46. return False
  47. except Exception as e:
  48. self.logger.error(f"更新记录状态失败: {e}")
  49. return False
  50. def batch_update_status(self, record_ids, new_status):
  51. """
  52. 批量更新记录的status字段
  53. """
  54. if not record_ids:
  55. return 0
  56. # 构建批量更新的SQL
  57. placeholders = ','.join(['%s'] * len(record_ids))
  58. sql = f"UPDATE knowledge_parsing_content SET status = %s, update_time = NOW() WHERE id IN ({placeholders})"
  59. params = [new_status] + record_ids
  60. try:
  61. result = MysqlHelper.update_values(sql, params)
  62. if result:
  63. self.logger.info(f"批量更新status成功: 更新了 {result} 条记录,status={new_status}")
  64. return result
  65. return 0
  66. except Exception as e:
  67. self.logger.error(f"批量更新status失败: {e}")
  68. return 0
  69. def process_records(self, records):
  70. """
  71. 处理所有status为3或2的记录 - 使用批量操作提高性能
  72. """
  73. status_2_records = []
  74. status_3_records = []
  75. status_5_records = []
  76. # 分类记录
  77. for record in records:
  78. record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record
  79. self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}")
  80. # 检查indentify_data是否为空
  81. indentify_data_empty = indentify_data is None or indentify_data.strip() == ""
  82. # 检查indentify_data是否不为空
  83. indentify_data_not_empty = indentify_data is not None and indentify_data.strip() != ""
  84. # 检查parsing_data是否为空
  85. parsing_data_empty = parsing_data is None or parsing_data.strip() == ""
  86. # 检查parsing_data是否不为空
  87. parsing_data_not_empty = parsing_data is not None and parsing_data.strip() != ""
  88. new_status = None
  89. # 如果parsing_data为空且indentify_data为空,则设置status为3
  90. if parsing_data_empty and indentify_data_empty:
  91. new_status = 3
  92. status_3_records.append(record_id)
  93. self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data为空,设置status为3")
  94. # 如果parsing_data为空且indentify_data不为空,则设置status为2
  95. if parsing_data_empty and indentify_data_not_empty:
  96. new_status = 2
  97. status_2_records.append(record_id)
  98. self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data不为空,设置status为2")
  99. # 如果parsing_data不为空,则设置status为5
  100. if parsing_data_not_empty:
  101. new_status = 5
  102. status_5_records.append(record_id)
  103. self.logger.info(f"记录 {record_id}: parsing_data不为空,设置status为5")
  104. success_count = 0
  105. # 批量更新status为2的记录
  106. if status_2_records:
  107. success_count += self.batch_update_status(status_2_records, 2)
  108. # 批量更新status为3的记录
  109. if status_3_records:
  110. success_count += self.batch_update_status(status_3_records, 3)
  111. # 批量更新status为5的记录
  112. if status_5_records:
  113. success_count += self.batch_update_status(status_5_records, 5)
  114. self.logger.info(f"处理完成: 成功 {success_count} 条")
  115. self.logger.info(f"状态更新统计: 更新为status=2的有 {len(status_2_records)} 条,更新为status=3的有 {len(status_3_records)} 条,更新为status=5的有 {len(status_5_records)} 条")
  116. return success_count, 0
  117. def run(self):
  118. """
  119. 执行状态更新的主流程
  120. """
  121. self.logger.info("开始更新解析状态...")
  122. # 查找status为3或2的记录
  123. records = self.find_status_records()
  124. if not records:
  125. self.logger.info("没有找到status为3或2的记录")
  126. return
  127. # 处理所有记录
  128. success_count, error_count = self.process_records(records)
  129. self.logger.info(f"解析状态更新完成: 成功 {success_count} 条,失败 {error_count} 条")
  130. if __name__ == "__main__":
  131. updater = UpdateParsingStatus()
  132. updater.run()