123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- move_data脚本主要做以下的事情
- 1. 读取knowledge_search_content中request_id IS NOT NULL 并且 is_move = 0 的数据,将返回的多条数据每一条都执行以下步骤
- 提取出request_id,source_channel,query_word,formatted_content, multimodal_recognition,structured_data字段
- 2. 根据提取出的字段,写入到不同的表中
- 2.1 如果formatted_content不为空,则新增一条新纪录到knowledge_crawl_content表中,新增时字段取值如下
- content_id字段取自formatted_content解析为json里面的content_id字段;
- channel字段取自source_channel字段;
- request_id字段取自request_id字段;
- crawl_data字段取自formatted_content
- 2.2 如果multimodal_recognition不为空,且structured_data为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下
- content_id字段取自formatted_content解析为json里面的content_id字段;
- request_id字段取自request_id字段;
- indentify_data字段取自multimodal_recognition
- status设置为2
- 2.3 如果multimodal_recognition不为空,且structured_data不为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下
- content_id字段取自formatted_content解析为json里面的content_id字段;
- request_id字段取自request_id字段;
- indentify_data字段取自multimodal_recognition
- parsing_data字段取自structured_data
- status设置为5
- 另外如果本次查询所有的数据都满足2.3条件,则将knowledge_request表中request_id为request_id的parsing_status字段设置为2,否则设置为0
- 每条数据执行完毕后都更新knowledge_search_content表中is_move字段为1,标记为已处理
- """
- import json
- import sys
- import os
- import argparse
- from loguru import logger
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.mysql_db import MysqlHelper
- class MoveDataProcessor:
- """数据迁移处理器"""
-
- def __init__(self):
- self.logger = logger
-
- def get_unprocessed_request_ids(self):
- """获取未处理的request_id列表"""
- sql = """
- SELECT DISTINCT request_id
- FROM knowledge_search_content
- WHERE request_id IS NOT NULL AND is_move = 0
- ORDER BY request_id ASC
- LIMIT 1
- """
-
- try:
- result = MysqlHelper.get_values(sql)
- if result:
- return result[0][0] # 返回第一个request_id
- return None
- except Exception as e:
- self.logger.error(f"查询未处理request_id失败: {e}")
- return None
-
- def get_data_by_request_id(self, request_id):
- """根据request_id获取数据"""
- sql = """
- SELECT id, request_id, source_channel, query_word, formatted_content,
- multimodal_recognition, structured_data
- FROM knowledge_search_content
- WHERE request_id = %s AND is_move = 0
- ORDER BY id ASC
- """
-
- try:
- result = MysqlHelper.get_values(sql, (request_id,))
- if result:
- # 将元组转换为字典列表
- columns = ['id', 'request_id', 'source_channel', 'query_word',
- 'formatted_content', 'multimodal_recognition', 'structured_data']
- data_list = []
- for row in result:
- data_dict = dict(zip(columns, row))
- data_list.append(data_dict)
- return data_list
- return []
- except Exception as e:
- self.logger.error(f"查询request_id数据失败: {e}")
- return []
-
- def extract_content_id(self, formatted_content):
- """从formatted_content中提取content_id"""
- try:
- if not formatted_content:
- return None
-
- # 解析JSON
- content_data = json.loads(formatted_content)
- return content_data.get('channel_content_id')
- except (json.JSONDecodeError, AttributeError) as e:
- self.logger.error(f"解析formatted_content失败: {e}")
- return None
-
- def insert_crawl_content(self, content_id, source_channel, request_id, formatted_content):
- """插入数据到knowledge_crawl_content表"""
- sql = """
- INSERT INTO knowledge_crawl_content
- (content_id, channel, request_id, crawl_data, create_time)
- VALUES (%s, %s, %s, %s, NOW())
- """
-
- params = (content_id, source_channel, request_id, formatted_content)
-
- try:
- result = MysqlHelper.insert_and_get_id(sql, params)
- if result:
- self.logger.info(f"插入crawl_content成功: content_id={content_id}, request_id={request_id}, insert_id={result}")
- return result
- return None
- except Exception as e:
- self.logger.error(f"插入crawl_content失败: {e}")
- return None
-
- def insert_parsing_content(self, content_id, request_id, multimodal_recognition, structured_data=None):
- """插入数据到knowledge_parsing_content表"""
- # 根据是否有structured_data确定status
- status = 5 if structured_data else 2
-
- sql = """
- INSERT INTO knowledge_parsing_content
- (content_id, request_id, task_id, indentify_data, parsing_data, create_time, status)
- VALUES (%s, %s, %s, %s, %s, NOW(), %s)
- """
-
- params = (content_id, request_id, 1, multimodal_recognition, structured_data, status)
-
- try:
- result = MysqlHelper.insert_and_get_id(sql, params)
- if result:
- self.logger.info(f"插入parsing_content成功: content_id={content_id}, request_id={request_id}, status={status}, insert_id={result}")
- return result
- return None
- except Exception as e:
- self.logger.error(f"插入parsing_content失败: {e}")
- return None
-
- def update_request_status(self, request_id, status):
- """更新knowledge_request表的parsing_status"""
- sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
- params = (status, request_id)
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}")
- return True
- return False
- except Exception as e:
- self.logger.error(f"更新request状态失败: {e}")
- return False
-
- def mark_as_processed(self, record_id):
- """标记数据为已处理"""
- sql = "UPDATE knowledge_search_content SET is_move = 1 WHERE id = %s"
- params = (record_id,)
-
- try:
- result = MysqlHelper.update_values(sql, params)
- if result:
- self.logger.info(f"标记数据已处理成功: id={record_id}")
- return True
- return False
- except Exception as e:
- self.logger.error(f"标记数据已处理失败: {e}")
- return False
-
- def process_single_record(self, record):
- """处理单条记录"""
- record_id = record['id']
- request_id = record['request_id']
- source_channel = record['source_channel']
- formatted_content = record['formatted_content']
- multimodal_recognition = record['multimodal_recognition']
- structured_data = record['structured_data']
-
- self.logger.info(f"开始处理记录: id={record_id}, request_id={request_id}")
-
- # 提取content_id
- content_id = self.extract_content_id(formatted_content)
- if not content_id:
- self.logger.warning(f"无法提取content_id: id={record_id}")
- # 仍然标记为已处理,避免重复处理
- self.mark_as_processed(record_id)
- return False
-
- # 处理formatted_content
- if formatted_content:
- self.insert_crawl_content(content_id, source_channel, request_id, formatted_content)
-
- # 处理multimodal_recognition
- parsing_inserted = False
- if multimodal_recognition:
- if structured_data:
- # 情况2.3: multimodal_recognition不为空且structured_data不为空
- self.insert_parsing_content(content_id, request_id, multimodal_recognition, structured_data)
- parsing_inserted = True
- else:
- # 情况2.2: multimodal_recognition不为空但structured_data为空
- self.insert_parsing_content(content_id, request_id, multimodal_recognition)
- parsing_inserted = True
-
- # 标记为已处理
- self.mark_as_processed(record_id)
-
- return parsing_inserted
-
- def process_single_request_id(self, request_id=None):
- """处理单个request_id的数据"""
- # 如果没有指定request_id,则获取第一个未处理的request_id
- if request_id is None:
- request_id = self.get_unprocessed_request_ids()
- if not request_id:
- self.logger.info("没有需要处理的request_id")
- return False
-
- self.logger.info(f"开始处理request_id: {request_id}")
-
- # 获取该request_id的所有数据
- records = self.get_data_by_request_id(request_id)
-
- if not records:
- self.logger.info(f"request_id={request_id} 没有未处理的数据")
- return False
-
- self.logger.info(f"request_id={request_id} 找到 {len(records)} 条记录")
-
- all_satisfy_condition_23 = True
-
- # 处理该request_id的所有记录
- for record in records:
- parsing_inserted = self.process_single_record(record)
-
- # 检查是否满足条件2.3(multimodal_recognition不为空且structured_data不为空)
- multimodal_recognition = record['multimodal_recognition']
- structured_data = record['structured_data']
-
- if not (multimodal_recognition and structured_data):
- all_satisfy_condition_23 = False
-
- # 更新knowledge_request表的parsing_status
- if all_satisfy_condition_23:
- self.update_request_status(request_id, 2)
- self.logger.info(f"request_id={request_id} 所有数据都满足条件2.3,设置parsing_status=2")
- else:
- self.update_request_status(request_id, 0)
- self.logger.info(f"request_id={request_id} 部分数据不满足条件2.3,设置parsing_status=0")
-
- return True
-
- def process_all_data(self):
- """处理所有数据(循环处理每个request_id)"""
- processed_count = 0
-
- while True:
- # 获取下一个未处理的request_id
- request_id = self.get_unprocessed_request_ids()
- if not request_id:
- self.logger.info("所有数据已处理完成")
- break
-
- # 处理该request_id
- success = self.process_single_request_id(request_id)
- if success:
- processed_count += 1
- self.logger.info(f"已处理 {processed_count} 个request_id")
- else:
- self.logger.warning(f"处理request_id={request_id} 失败")
- break
-
- self.logger.info(f"总共处理了 {processed_count} 个request_id")
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='数据迁移脚本')
- parser.add_argument('--request-id', type=str, help='指定要处理的request_id,如果不指定则处理所有未处理的数据')
- parser.add_argument('--single', action='store_true', help='只处理一个request_id(第一个未处理的)')
-
- args = parser.parse_args()
-
- processor = MoveDataProcessor()
-
- if args.request_id:
- # 处理指定的request_id
- success = processor.process_single_request_id(args.request_id)
- if success:
- logger.info(f"成功处理request_id: {args.request_id}")
- else:
- logger.error(f"处理request_id失败: {args.request_id}")
- sys.exit(1)
- elif args.single:
- # 只处理一个request_id
- success = processor.process_single_request_id()
- if success:
- logger.info("成功处理一个request_id")
- else:
- logger.info("没有需要处理的request_id")
- else:
- # 处理所有数据
- processor.process_all_data()
- if __name__ == "__main__":
- main()
|