123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- # 读取 文件/视频分析报告.xlsx文件,逐行将第5列数据的JSON字符串读取出来,提取segments字段并处理
- import pandas as pd
- import json
- import logging
- import re
- # Set up logging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- def clean_json_string(json_str):
- """清理和格式化JSON字符串"""
- # 移除BOM和空白字符
- json_str = json_str.strip().lstrip('\ufeff')
-
- # 处理Python风格的字符串(单引号转双引号)
- json_str = re.sub(r'(?<!\\)"([^"]*?)(?<!\\)"', r'__DOUBLE_QUOTED__\1__END__', json_str)
- json_str = json_str.replace("'", '"')
- json_str = re.sub(r'__DOUBLE_QUOTED__([^_]*?)__END__', r'"\1"', json_str)
-
- # 确保是有效的JSON数组或对象
- if not (json_str.startswith('[') or json_str.startswith('{')):
- json_str = '[' + json_str
- if not (json_str.endswith(']') or json_str.endswith('}')):
- json_str = json_str + ']'
-
- return json_str
- try:
- # 读取 Excel 文件
- input_file = '文件/视频分析报告.xlsx'
- output_file = '文件/视频分析报告_new.xlsx'
-
- logger.info(f"Reading file: {input_file}")
- df = pd.read_excel(input_file)
-
- # 获取第5列的列名
- column_5 = df.columns[4] # 0-based index for 5th column
-
- # 逐行处理数据
- for index, row in df.iterrows():
- try:
- json_str = str(row[column_5])
- if not json_str or json_str == 'nan':
- logger.warning(f"Empty or invalid JSON string at row {index + 1}")
- continue
-
- # 清理和格式化JSON字符串
- json_str = clean_json_string(json_str)
-
- # 解析JSON数据
- try:
- data = json.loads(json_str)
- # 如果数据是字典,尝试获取segments字段
- if isinstance(data, dict):
- segments = data.get('segments', [])
- remaining_data = {k: v for k, v in data.items() if k != 'segments'}
- # 如果数据是列表,直接使用
- elif isinstance(data, list):
- segments = data
- remaining_data = {}
- else:
- logger.warning(f"Unexpected data type at row {index + 1}: {type(data)}")
- continue
-
- # 更新数据
- df.loc[index, '第7列'] = json.dumps(segments, ensure_ascii=False)
- df.loc[index, '第6列'] = json.dumps(remaining_data, ensure_ascii=False)
-
- except json.JSONDecodeError as je:
- logger.error(f"JSON decode error at row {index + 1}: {str(je)}")
- logger.error(f"Error position: {je.pos}")
- logger.error(f"Character at error: {repr(json_str[je.pos:je.pos+10])}")
- continue
-
- except Exception as e:
- logger.error(f"Error processing row {index + 1}: {str(e)}")
- continue
-
- # 保存到新的Excel文件
- logger.info(f"Saving to: {output_file}")
- df.to_excel(output_file, index=False)
- logger.info("Processing completed successfully")
-
- except Exception as e:
- logger.error(f"An error occurred: {str(e)}")
|