move_data.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. move_data脚本主要做以下的事情
  5. 1. 读取knowledge_search_content中request_id IS NOT NULL 并且 is_move = 0 的数据,将返回的多条数据每一条都执行以下步骤
  6. 提取出request_id,source_channel,query_word,formatted_content, multimodal_recognition,structured_data字段
  7. 2. 根据提取出的字段,写入到不同的表中
  8. 2.1 如果formatted_content不为空,则新增一条新纪录到knowledge_crawl_content表中,新增时字段取值如下
  9. content_id字段取自formatted_content解析为json里面的content_id字段;
  10. channel字段取自source_channel字段;
  11. request_id字段取自request_id字段;
  12. crawl_data字段取自formatted_content
  13. 2.2 如果multimodal_recognition不为空,且structured_data为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下
  14. content_id字段取自formatted_content解析为json里面的content_id字段;
  15. request_id字段取自request_id字段;
  16. indentify_data字段取自multimodal_recognition
  17. status设置为2
  18. 2.3 如果multimodal_recognition不为空,且structured_data不为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下
  19. content_id字段取自formatted_content解析为json里面的content_id字段;
  20. request_id字段取自request_id字段;
  21. indentify_data字段取自multimodal_recognition
  22. parsing_data字段取自structured_data
  23. status设置为5
  24. 另外如果本次查询所有的数据都满足2.3条件,则将knowledge_request表中request_id为request_id的parsing_status字段设置为2,否则设置为0
  25. 每条数据执行完毕后都更新knowledge_search_content表中is_move字段为1,标记为已处理
  26. """
  27. import json
  28. import sys
  29. import os
  30. import argparse
  31. from loguru import logger
  32. # 添加项目根目录到Python路径
  33. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  34. from utils.mysql_db import MysqlHelper
  35. class MoveDataProcessor:
  36. """数据迁移处理器"""
  37. def __init__(self):
  38. self.logger = logger
  39. def get_unprocessed_request_ids(self):
  40. """获取未处理的request_id列表"""
  41. sql = """
  42. SELECT DISTINCT request_id
  43. FROM knowledge_search_content
  44. WHERE request_id IS NOT NULL AND is_move = 0
  45. ORDER BY request_id ASC
  46. LIMIT 1
  47. """
  48. try:
  49. result = MysqlHelper.get_values(sql)
  50. if result:
  51. return result[0][0] # 返回第一个request_id
  52. return None
  53. except Exception as e:
  54. self.logger.error(f"查询未处理request_id失败: {e}")
  55. return None
  56. def get_data_by_request_id(self, request_id):
  57. """根据request_id获取数据"""
  58. sql = """
  59. SELECT id, request_id, source_channel, query_word, formatted_content,
  60. multimodal_recognition, structured_data
  61. FROM knowledge_search_content
  62. WHERE request_id = %s AND is_move = 0
  63. ORDER BY id ASC
  64. """
  65. try:
  66. result = MysqlHelper.get_values(sql, (request_id,))
  67. if result:
  68. # 将元组转换为字典列表
  69. columns = ['id', 'request_id', 'source_channel', 'query_word',
  70. 'formatted_content', 'multimodal_recognition', 'structured_data']
  71. data_list = []
  72. for row in result:
  73. data_dict = dict(zip(columns, row))
  74. data_list.append(data_dict)
  75. return data_list
  76. return []
  77. except Exception as e:
  78. self.logger.error(f"查询request_id数据失败: {e}")
  79. return []
  80. def extract_content_id(self, formatted_content):
  81. """从formatted_content中提取content_id"""
  82. try:
  83. if not formatted_content:
  84. return None
  85. # 解析JSON
  86. content_data = json.loads(formatted_content)
  87. return content_data.get('channel_content_id')
  88. except (json.JSONDecodeError, AttributeError) as e:
  89. self.logger.error(f"解析formatted_content失败: {e}")
  90. return None
  91. def insert_crawl_content(self, content_id, source_channel, request_id, formatted_content):
  92. """插入数据到knowledge_crawl_content表"""
  93. sql = """
  94. INSERT INTO knowledge_crawl_content
  95. (content_id, channel, request_id, crawl_data, create_time)
  96. VALUES (%s, %s, %s, %s, NOW())
  97. """
  98. params = (content_id, source_channel, request_id, formatted_content)
  99. try:
  100. result = MysqlHelper.insert_and_get_id(sql, params)
  101. if result:
  102. self.logger.info(f"插入crawl_content成功: content_id={content_id}, request_id={request_id}, insert_id={result}")
  103. return result
  104. return None
  105. except Exception as e:
  106. self.logger.error(f"插入crawl_content失败: {e}")
  107. return None
  108. def insert_parsing_content(self, content_id, request_id, multimodal_recognition, structured_data=None):
  109. """插入数据到knowledge_parsing_content表"""
  110. # 根据是否有structured_data确定status
  111. status = 5 if structured_data else 2
  112. sql = """
  113. INSERT INTO knowledge_parsing_content
  114. (content_id, request_id, task_id, indentify_data, parsing_data, create_time, status)
  115. VALUES (%s, %s, %s, %s, %s, NOW(), %s)
  116. """
  117. params = (content_id, request_id, 1, multimodal_recognition, structured_data, status)
  118. try:
  119. result = MysqlHelper.insert_and_get_id(sql, params)
  120. if result:
  121. self.logger.info(f"插入parsing_content成功: content_id={content_id}, request_id={request_id}, status={status}, insert_id={result}")
  122. return result
  123. return None
  124. except Exception as e:
  125. self.logger.error(f"插入parsing_content失败: {e}")
  126. return None
  127. def update_request_status(self, request_id, status):
  128. """更新knowledge_request表的parsing_status"""
  129. sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
  130. params = (status, request_id)
  131. try:
  132. result = MysqlHelper.update_values(sql, params)
  133. if result:
  134. self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}")
  135. return True
  136. return False
  137. except Exception as e:
  138. self.logger.error(f"更新request状态失败: {e}")
  139. return False
  140. def mark_as_processed(self, record_id):
  141. """标记数据为已处理"""
  142. sql = "UPDATE knowledge_search_content SET is_move = 1 WHERE id = %s"
  143. params = (record_id,)
  144. try:
  145. result = MysqlHelper.update_values(sql, params)
  146. if result:
  147. self.logger.info(f"标记数据已处理成功: id={record_id}")
  148. return True
  149. return False
  150. except Exception as e:
  151. self.logger.error(f"标记数据已处理失败: {e}")
  152. return False
  153. def process_single_record(self, record):
  154. """处理单条记录"""
  155. record_id = record['id']
  156. request_id = record['request_id']
  157. source_channel = record['source_channel']
  158. formatted_content = record['formatted_content']
  159. multimodal_recognition = record['multimodal_recognition']
  160. structured_data = record['structured_data']
  161. self.logger.info(f"开始处理记录: id={record_id}, request_id={request_id}")
  162. # 提取content_id
  163. content_id = self.extract_content_id(formatted_content)
  164. if not content_id:
  165. self.logger.warning(f"无法提取content_id: id={record_id}")
  166. # 仍然标记为已处理,避免重复处理
  167. self.mark_as_processed(record_id)
  168. return False
  169. # 处理formatted_content
  170. if formatted_content:
  171. self.insert_crawl_content(content_id, source_channel, request_id, formatted_content)
  172. # 处理multimodal_recognition
  173. parsing_inserted = False
  174. if multimodal_recognition:
  175. if structured_data:
  176. # 情况2.3: multimodal_recognition不为空且structured_data不为空
  177. self.insert_parsing_content(content_id, request_id, multimodal_recognition, structured_data)
  178. parsing_inserted = True
  179. else:
  180. # 情况2.2: multimodal_recognition不为空但structured_data为空
  181. self.insert_parsing_content(content_id, request_id, multimodal_recognition)
  182. parsing_inserted = True
  183. # 标记为已处理
  184. self.mark_as_processed(record_id)
  185. return parsing_inserted
  186. def process_single_request_id(self, request_id=None):
  187. """处理单个request_id的数据"""
  188. # 如果没有指定request_id,则获取第一个未处理的request_id
  189. if request_id is None:
  190. request_id = self.get_unprocessed_request_ids()
  191. if not request_id:
  192. self.logger.info("没有需要处理的request_id")
  193. return False
  194. self.logger.info(f"开始处理request_id: {request_id}")
  195. # 获取该request_id的所有数据
  196. records = self.get_data_by_request_id(request_id)
  197. if not records:
  198. self.logger.info(f"request_id={request_id} 没有未处理的数据")
  199. return False
  200. self.logger.info(f"request_id={request_id} 找到 {len(records)} 条记录")
  201. all_satisfy_condition_23 = True
  202. # 处理该request_id的所有记录
  203. for record in records:
  204. parsing_inserted = self.process_single_record(record)
  205. # 检查是否满足条件2.3(multimodal_recognition不为空且structured_data不为空)
  206. multimodal_recognition = record['multimodal_recognition']
  207. structured_data = record['structured_data']
  208. if not (multimodal_recognition and structured_data):
  209. all_satisfy_condition_23 = False
  210. # 更新knowledge_request表的parsing_status
  211. if all_satisfy_condition_23:
  212. self.update_request_status(request_id, 2)
  213. self.logger.info(f"request_id={request_id} 所有数据都满足条件2.3,设置parsing_status=2")
  214. else:
  215. self.update_request_status(request_id, 0)
  216. self.logger.info(f"request_id={request_id} 部分数据不满足条件2.3,设置parsing_status=0")
  217. return True
  218. def process_all_data(self):
  219. """处理所有数据(循环处理每个request_id)"""
  220. processed_count = 0
  221. while True:
  222. # 获取下一个未处理的request_id
  223. request_id = self.get_unprocessed_request_ids()
  224. if not request_id:
  225. self.logger.info("所有数据已处理完成")
  226. break
  227. # 处理该request_id
  228. success = self.process_single_request_id(request_id)
  229. if success:
  230. processed_count += 1
  231. self.logger.info(f"已处理 {processed_count} 个request_id")
  232. else:
  233. self.logger.warning(f"处理request_id={request_id} 失败")
  234. break
  235. self.logger.info(f"总共处理了 {processed_count} 个request_id")
  236. def main():
  237. """主函数"""
  238. parser = argparse.ArgumentParser(description='数据迁移脚本')
  239. parser.add_argument('--request-id', type=str, help='指定要处理的request_id,如果不指定则处理所有未处理的数据')
  240. parser.add_argument('--single', action='store_true', help='只处理一个request_id(第一个未处理的)')
  241. args = parser.parse_args()
  242. processor = MoveDataProcessor()
  243. if args.request_id:
  244. # 处理指定的request_id
  245. success = processor.process_single_request_id(args.request_id)
  246. if success:
  247. logger.info(f"成功处理request_id: {args.request_id}")
  248. else:
  249. logger.error(f"处理request_id失败: {args.request_id}")
  250. sys.exit(1)
  251. elif args.single:
  252. # 只处理一个request_id
  253. success = processor.process_single_request_id()
  254. if success:
  255. logger.info("成功处理一个request_id")
  256. else:
  257. logger.info("没有需要处理的request_id")
  258. else:
  259. # 处理所有数据
  260. processor.process_all_data()
  261. if __name__ == "__main__":
  262. main()