import mysql.connector import json # 配置数据库连接参数 db_config = { 'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com', 'database': 'incentive', 'port': 3306, 'user': 'wx2016_longvideo', 'password': 'wx2016_longvideoP@assword1234', } # 连接到MySQL数据库 cnx = mysql.connector.connect(**db_config) cursor = cnx.cursor() # 定义JSON字段名称 all_field_names = ['key_words', 'search_keys', 'extra_keys', 'category_list', 'tone', 'target_audience', 'target_age', 'target_gender', 'address', 'theme'] json_field_names = ['key_words', 'search_keys', 'extra_keys', 'category_list'] normal_field_names = ['tone', 'target_audience', 'target_age', 'target_gender', 'address', 'theme'] # 批量插入的参数列表 insert_batch = [] # 读取video_content表中的JSON数据并解析 select_sql = "SELECT * FROM video_content;" cursor.execute(select_sql) rows = cursor.fetchall() # 使用fetchall()确保读取所有行 print("Reading data from video_content table...") print("row count: ", len(rows)) for row in rows: video_id = row[1] # 遍历所有的JSON字段 for field_name in json_field_names: # 获取对应的JSON字符串 json_data = row[all_field_names.index(field_name) + 2] # 判断是否是json字符串 if not json_data: continue if json_data[0] != '[': continue # 解析JSON字符串 tags = json.loads(json_data) if json_data else [] # 构建批量插入的参数 for tag in tags: insert_batch.append((video_id, tag, field_name)) for field_name in normal_field_names: # 获取对应的字段值 value = row[all_field_names.index(field_name) + 2] # 构建批量插入的参数 insert_batch.append((video_id, value, field_name)) # 每1000个记录执行一次批量插入 if len(insert_batch) >= 1000: cursor.executemany(""" INSERT INTO video_content_mapping (video_id, tag, tag_type) VALUES (%s, %s, %s) """, insert_batch) # 清空列表以便下一次批量插入 print(f"Inserting records {len(insert_batch)} rows...") insert_batch.clear() # 插入剩余的记录(如果有) if insert_batch: cursor.executemany(""" INSERT INTO video_content_mapping (video_id, tag, tag_type) VALUES (%s, %s, %s) """, insert_batch) print(f"Inserting records {len(insert_batch)} rows...") # 提交事务 cnx.commit() # 关闭游标和连接 cursor.close() cnx.close()