123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import orjson
- from loguru import logger
- import ast
- def parse_general_json(response_text, key=None):
- """
- 通用的 JSON 解析方法,兼容单引号和双引号的 JSON 字符串。
- 优先尝试标准 JSON 解析,失败后尝试处理单引号情况。
- :param response_text: 待解析的 JSON 字符串
- :param key: 若指定,则从解析结果中提取该键对应的值,默认为 None
- :return: 解析结果,如果指定了 key 则返回对应的值,解析失败返回 None
- """
- # 处理字符串未闭合的情况,尝试修正
- response_text = fix_unclosed_string(response_text)
- try:
- # 优先尝试使用 orjson.loads 解析标准 JSON 格式
- data = orjson.loads(response_text.strip())
- except orjson.JSONDecodeError:
- try:
- # 若 orjson.loads 解析失败,尝试使用 ast.literal_eval 处理单引号情况
- data = ast.literal_eval(response_text.strip())
- except (SyntaxError, ValueError) as e:
- # 记录解析错误信息
- logger.error(f"JSON 解析错误, 响应内容: {response_text}, 错误信息: {e}")
- return None
- if key is not None:
- try:
- return data[key]
- except KeyError:
- logger.error(f"键 {key} 不存在于解析结果中,响应内容: {response_text}")
- return None
- return data
- def fix_unclosed_string(text):
- """
- 尝试修复未闭合的字符串
- :param text: 输入的文本
- :return: 修复后的文本
- """
- try:
- # 查找未闭合的字符串
- open_quotes = []
- for _, char in enumerate(text):
- if char in ['"', "'"]:
- if open_quotes and open_quotes[-1] == char:
- open_quotes.pop()
- else:
- open_quotes.append(char)
- if open_quotes:
- # 如果存在未闭合的字符串,尝试在末尾添加闭合引号
- closing_quote = open_quotes[-1]
- text = text.rstrip() + closing_quote
- return text
- except:
- return text
- if __name__ == '__main__':
- # 测试正常解析情况 - 标准 JSON(双引号)
- standard_json = '{"text": "这是标准 JSON 测试内容"}'
- result_standard = parse_general_json(standard_json, key='text')
- print("标准 JSON 解析结果:", result_standard)
- # 测试正常解析情况 - 非标准 JSON(单引号)
- non_standard_json = "{'text': '这是非标准 JSON 测试内容'}"
- result_non_standard = parse_general_json(non_standard_json, key='text')
- print("非标准 JSON 解析结果:", result_non_standard)
|