#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ move_data脚本主要做以下的事情 1. 读取knowledge_search_content中request_id IS NOT NULL 并且 is_move = 0 的数据,将返回的多条数据每一条都执行以下步骤 提取出request_id,source_channel,query_word,formatted_content, multimodal_recognition,structured_data字段 2. 根据提取出的字段,写入到不同的表中 2.1 如果formatted_content不为空,则新增一条新纪录到knowledge_crawl_content表中,新增时字段取值如下 content_id字段取自formatted_content解析为json里面的content_id字段; channel字段取自source_channel字段; request_id字段取自request_id字段; crawl_data字段取自formatted_content 2.2 如果multimodal_recognition不为空,且structured_data为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下 content_id字段取自formatted_content解析为json里面的content_id字段; request_id字段取自request_id字段; indentify_data字段取自multimodal_recognition status设置为2 2.3 如果multimodal_recognition不为空,且structured_data不为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下 content_id字段取自formatted_content解析为json里面的content_id字段; request_id字段取自request_id字段; indentify_data字段取自multimodal_recognition parsing_data字段取自structured_data status设置为5 另外如果本次查询所有的数据都满足2.3条件,则将knowledge_request表中request_id为request_id的parsing_status字段设置为2,否则设置为0 每条数据执行完毕后都更新knowledge_search_content表中is_move字段为1,标记为已处理 """ import json import sys import os import argparse from loguru import logger # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper class MoveDataProcessor: """数据迁移处理器""" def __init__(self): self.logger = logger def get_unprocessed_request_ids(self): """获取未处理的request_id列表""" sql = """ SELECT DISTINCT request_id FROM knowledge_search_content WHERE request_id IS NOT NULL AND is_move = 0 ORDER BY request_id ASC LIMIT 1 """ try: result = MysqlHelper.get_values(sql) if result: return result[0][0] # 返回第一个request_id return None except Exception as e: self.logger.error(f"查询未处理request_id失败: {e}") return None def get_data_by_request_id(self, request_id): """根据request_id获取数据""" sql = """ SELECT id, request_id, source_channel, query_word, formatted_content, multimodal_recognition, structured_data FROM knowledge_search_content WHERE request_id = %s AND is_move = 0 ORDER BY id ASC """ try: result = MysqlHelper.get_values(sql, (request_id,)) if result: # 将元组转换为字典列表 columns = ['id', 'request_id', 'source_channel', 'query_word', 'formatted_content', 'multimodal_recognition', 'structured_data'] data_list = [] for row in result: data_dict = dict(zip(columns, row)) data_list.append(data_dict) return data_list return [] except Exception as e: self.logger.error(f"查询request_id数据失败: {e}") return [] def extract_content_id(self, formatted_content): """从formatted_content中提取content_id""" try: if not formatted_content: return None # 解析JSON content_data = json.loads(formatted_content) return content_data.get('channel_content_id') except (json.JSONDecodeError, AttributeError) as e: self.logger.error(f"解析formatted_content失败: {e}") return None def insert_crawl_content(self, content_id, source_channel, request_id, formatted_content): """插入数据到knowledge_crawl_content表""" sql = """ INSERT INTO knowledge_crawl_content (content_id, channel, request_id, crawl_data, create_time) VALUES (%s, %s, %s, %s, NOW()) """ params = (content_id, source_channel, request_id, formatted_content) try: result = MysqlHelper.insert_and_get_id(sql, params) if result: self.logger.info(f"插入crawl_content成功: content_id={content_id}, request_id={request_id}, insert_id={result}") return result return None except Exception as e: self.logger.error(f"插入crawl_content失败: {e}") return None def insert_parsing_content(self, content_id, request_id, multimodal_recognition, structured_data=None): """插入数据到knowledge_parsing_content表""" # 根据是否有structured_data确定status status = 5 if structured_data else 2 sql = """ INSERT INTO knowledge_parsing_content (content_id, request_id, task_id, indentify_data, parsing_data, create_time, status) VALUES (%s, %s, %s, %s, %s, NOW(), %s) """ params = (content_id, request_id, 1, multimodal_recognition, structured_data, status) try: result = MysqlHelper.insert_and_get_id(sql, params) if result: self.logger.info(f"插入parsing_content成功: content_id={content_id}, request_id={request_id}, status={status}, insert_id={result}") return result return None except Exception as e: self.logger.error(f"插入parsing_content失败: {e}") return None def update_request_status(self, request_id, status): """更新knowledge_request表的parsing_status""" sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s" params = (status, request_id) try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}") return True return False except Exception as e: self.logger.error(f"更新request状态失败: {e}") return False def mark_as_processed(self, record_id): """标记数据为已处理""" sql = "UPDATE knowledge_search_content SET is_move = 1 WHERE id = %s" params = (record_id,) try: result = MysqlHelper.update_values(sql, params) if result: self.logger.info(f"标记数据已处理成功: id={record_id}") return True return False except Exception as e: self.logger.error(f"标记数据已处理失败: {e}") return False def process_single_record(self, record): """处理单条记录""" record_id = record['id'] request_id = record['request_id'] source_channel = record['source_channel'] formatted_content = record['formatted_content'] multimodal_recognition = record['multimodal_recognition'] structured_data = record['structured_data'] self.logger.info(f"开始处理记录: id={record_id}, request_id={request_id}") # 提取content_id content_id = self.extract_content_id(formatted_content) if not content_id: self.logger.warning(f"无法提取content_id: id={record_id}") # 仍然标记为已处理,避免重复处理 self.mark_as_processed(record_id) return False # 处理formatted_content if formatted_content: self.insert_crawl_content(content_id, source_channel, request_id, formatted_content) # 处理multimodal_recognition parsing_inserted = False if multimodal_recognition: if structured_data: # 情况2.3: multimodal_recognition不为空且structured_data不为空 self.insert_parsing_content(content_id, request_id, multimodal_recognition, structured_data) parsing_inserted = True else: # 情况2.2: multimodal_recognition不为空但structured_data为空 self.insert_parsing_content(content_id, request_id, multimodal_recognition) parsing_inserted = True # 标记为已处理 self.mark_as_processed(record_id) return parsing_inserted def process_single_request_id(self, request_id=None): """处理单个request_id的数据""" # 如果没有指定request_id,则获取第一个未处理的request_id if request_id is None: request_id = self.get_unprocessed_request_ids() if not request_id: self.logger.info("没有需要处理的request_id") return False self.logger.info(f"开始处理request_id: {request_id}") # 获取该request_id的所有数据 records = self.get_data_by_request_id(request_id) if not records: self.logger.info(f"request_id={request_id} 没有未处理的数据") return False self.logger.info(f"request_id={request_id} 找到 {len(records)} 条记录") all_satisfy_condition_23 = True # 处理该request_id的所有记录 for record in records: parsing_inserted = self.process_single_record(record) # 检查是否满足条件2.3(multimodal_recognition不为空且structured_data不为空) multimodal_recognition = record['multimodal_recognition'] structured_data = record['structured_data'] if not (multimodal_recognition and structured_data): all_satisfy_condition_23 = False # 更新knowledge_request表的parsing_status if all_satisfy_condition_23: self.update_request_status(request_id, 2) self.logger.info(f"request_id={request_id} 所有数据都满足条件2.3,设置parsing_status=2") else: self.update_request_status(request_id, 0) self.logger.info(f"request_id={request_id} 部分数据不满足条件2.3,设置parsing_status=0") return True def process_all_data(self): """处理所有数据(循环处理每个request_id)""" processed_count = 0 while True: # 获取下一个未处理的request_id request_id = self.get_unprocessed_request_ids() if not request_id: self.logger.info("所有数据已处理完成") break # 处理该request_id success = self.process_single_request_id(request_id) if success: processed_count += 1 self.logger.info(f"已处理 {processed_count} 个request_id") else: self.logger.warning(f"处理request_id={request_id} 失败") break self.logger.info(f"总共处理了 {processed_count} 个request_id") def main(): """主函数""" parser = argparse.ArgumentParser(description='数据迁移脚本') parser.add_argument('--request-id', type=str, help='指定要处理的request_id,如果不指定则处理所有未处理的数据') parser.add_argument('--single', action='store_true', help='只处理一个request_id(第一个未处理的)') args = parser.parse_args() processor = MoveDataProcessor() if args.request_id: # 处理指定的request_id success = processor.process_single_request_id(args.request_id) if success: logger.info(f"成功处理request_id: {args.request_id}") else: logger.error(f"处理request_id失败: {args.request_id}") sys.exit(1) elif args.single: # 只处理一个request_id success = processor.process_single_request_id() if success: logger.info("成功处理一个request_id") else: logger.info("没有需要处理的request_id") else: # 处理所有数据 processor.process_all_data() if __name__ == "__main__": main()