from typing import List, Dict, Any
import json
from .my_trace import get_current_time
import re
import uuid
import datetime
def parse_json_from_text(text: str) -> dict:
"""
从文本中解析JSON,支持多种格式的JSON代码块
Args:
text (str): 包含JSON的文本
Returns:
dict: 解析后的JSON数据,解析失败返回空字典
"""
if not text or not isinstance(text, str):
return {}
# 去除首尾空白字符
text = text.strip()
# 定义可能的JSON代码块标记
json_markers = [
("'''json", "'''"),
('"""json', '"""'),
("```json", "```"),
("```", "```")
]
# 尝试提取JSON代码块
json_content = text
for start_marker, end_marker in json_markers:
if text.startswith(start_marker):
# 找到开始标记,查找结束标记
start_pos = len(start_marker)
end_pos = text.find(end_marker, start_pos)
if end_pos != -1:
json_content = text[start_pos:end_pos].strip()
break
# 如果没有找到代码块标记,检查是否以结束标记结尾并移除
if json_content == text:
for _, end_marker in json_markers:
if text.endswith(end_marker):
json_content = text[:-len(end_marker)].strip()
break
# 尝试解析JSON
try:
return json.loads(json_content)
except json.JSONDecodeError as e:
# 打印详细的解析失败信息
print(f"JSON解析失败: {e}")
print(f"原始文本长度: {len(text)}")
print(f"提取的JSON内容长度: {len(json_content)}")
print(f"原始文本内容预览 (前500字符):\n{text[:500]}")
print(f"提取的JSON内容预览 (前500字符):\n{json_content[:500]}")
print("-" * 80)
# 如果直接解析失败,尝试查找第一个{到最后一个}的内容
try:
first_brace = json_content.find('{')
last_brace = json_content.rfind('}')
if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
json_part = json_content[first_brace:last_brace + 1]
return json.loads(json_part)
except json.JSONDecodeError as e2:
print(f"二次解析也失败: {e2}")
if first_brace != -1 and last_brace != -1:
print(f"尝试解析的内容:\n{json_part[:500]}")
return {}
def get_safe_filename(filename: str) -> str:
"""
生成安全的文件名,移除不安全字符
Args:
filename: 原始文件名
Returns:
str: 安全的文件名
"""
# 移除不安全的字符,只保留字母、数字、下划线、连字符和点
return re.sub(r'[^\w\-\./]', '_', filename)
def generate_image_filename(mime_type: str, prefix: str = "gemini_img") -> str:
"""
生成合理的图片文件名
Args:
mime_type: 文件MIME类型
prefix: 文件名前缀
Returns:
str: 生成的文件名
"""
# 获取当前时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d/%H%M%S")
# 获取文件扩展名
extension = mime_type.split('/')[-1]
if extension == "jpeg":
extension = "jpg"
# 生成唯一ID (短UUID)
unique_id = str(uuid.uuid4())[:4]
# 组合文件名
filename = f"{prefix}/{timestamp}_{unique_id}.{extension}"
# 确保文件名安全
return get_safe_filename(filename)
def parse_multimodal_content(content: str) -> List[Dict[str, Any]]:
"""解析多模态内容,保持上下文顺序,适用于AI参数传递 """
result = []
lines = content.split('\n')
role = ''
for line in lines:
line = line.strip()
if not line:
continue
# 分割前缀和内容
if ':' in line:
prefix, content = line.split(':', 1)
prefix = prefix.strip().lower()
content = content.strip()
row = {}
if prefix == 'image':
row = {
"type": "image_url",
"image_url": {
"url": content
}
}
elif prefix == 'text':
row = {
"type": "text",
"text": content
}
elif prefix == 'role':
role = content
if row:
if role:
row['role'] = role
role = ''
result.append(row)
return result
def read_json(file_path):
"""
读取JSON文件并返回解析后的数据
Args:
file_path: JSON文件路径
Returns:
解析后的JSON数据
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"读取JSON文件时出错: {e}")
return None
def save_json(data, file_path):
"""
保存数据到JSON文件
Args:
data: 要保存的数据
file_path: 保存路径
"""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_script_data(file_path):
"""
读取JSON文件并返回解析后的数据
Args:
file_path: JSON文件路径
"""
return read_json(file_path)['脚本']
import os
import xml.etree.ElementTree as ET
from typing import Dict, List, Any
import re
import unicodedata
def get_model(model_name):
# return 'gemini/gemini-2.5-flash'
# return 'litellm/gemini/gemini-2.5-flash'
if model_name.startswith('litellm'):
return model_name
else:
from openai import AsyncOpenAI
from agents import OpenAIChatCompletionsModel
BASE_URL = os.getenv("EXAMPLE_BASE_URL") or "https://openrouter.ai/api/v1"
API_KEY = os.getenv("OPENROUTER_API_KEY") or ""
client = AsyncOpenAI(
base_url=BASE_URL,
api_key=API_KEY,
)
return OpenAIChatCompletionsModel(
# model='google/gemini-2.5-pro-preview',
# model='google/gemini-2.5-flash-preview-05-20',
# model='google/gemini-2.5-flash-preview-05-20',
# model='google/gemini-2.5-flash',
# model='google/gemini-2.5-flash',
# model='google/gemini-2.5-flash-preview-05-20:thinking',
# model='google/gemini-2.0-flash-001',
model=model_name,
openai_client=client,
)
def read_file_as_string(file_path):
"""读取文件内容并返回字符串"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read().strip()
return content
except Exception as e:
print(f"读取文件时出错: {e}")
return None
def save_file_as_string(file_path, content):
"""将字符串内容写入文件"""
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
def extract_html_from_markdown(text):
"""
从可能包含markdown或其他代码块的文本中提取HTML内容
参数:
text: 可能包含各种格式的文本
返回:
提取出的纯HTML内容
"""
# 处理```html```格式(反引号)
backtick_pattern = r"```(?:html)?\s*([\s\S]*?)```"
backtick_matches = re.findall(backtick_pattern, text)
# 处理'''html'''格式(单引号)
single_quote_pattern = r"'''(?:html)?\s*([\s\S]*?)'''"
single_quote_matches = re.findall(single_quote_pattern, text)
# 处理"""html"""格式(双引号)
double_quote_pattern = r'"""(?:html)?\s*([\s\S]*?)"""'
double_quote_matches = re.findall(double_quote_pattern, text)
if backtick_matches:
# 优先使用反引号格式
return backtick_matches[0].strip()
elif single_quote_matches:
# 其次使用单引号格式
return single_quote_matches[0].strip()
elif double_quote_matches:
# 再次使用双引号格式
return double_quote_matches[0].strip()
else:
# 如果没有代码块格式,直接返回原get_current_time始文本
return text
def create_workspace_dir(current_time=None, make_dir=True):
if not current_time:
current_time = get_current_time()
task_dir = f"result/{current_time}"
if make_dir:
os.makedirs(task_dir, exist_ok=True)
task_dir_absolute = os.path.abspath(task_dir)
# print(f"任务目录的绝对路径: {task_dir_absolute}")
return task_dir_absolute, str(current_time)
def extract_tag_content(text, tag_name):
"""
从文本中提取指定标签内的内容
参数:
text (str): 要处理的文本
tag_name (str): 要提取的标签名称
返回:
str: 标签内的内容,如果未找到则返回空字符串
"""
import re
pattern = f"<{tag_name}>(.*?){tag_name}>"
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
return ""
from typing import Dict, List, Optional
def parse_tasks(tasks_xml: str) -> List[Dict]:
"""Parse XML tasks into a list of task dictionaries."""
tasks = []
current_task = {}
for line in tasks_xml.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith(""):
current_task = {}
elif line.startswith(""):
current_task["name"] = line[6:-7].strip()
elif line.startswith(""):
if "description" in current_task:
if "type" not in current_task:
current_task["type"] = "default"
tasks.append(current_task)
return tasks
def parse_xml_content(xml_string: str) -> Dict[str, Any]:
"""
将XML字符串解析成字典,提取main_task、thoughts、tasks和resources
参数:
xml_string: 包含任务信息的XML字符串
返回:
包含所有解析信息的字典
"""
# 创建结果字典
result = {
"main_task": {},
"thoughts": "",
"tasks": [],
"resources": []
}
try:
# 提取thoughts内容
thoughts_match = re.search(r'(.*?)', xml_string, re.DOTALL)
if thoughts_match:
result["thoughts"] = thoughts_match.group(1).strip()
# 提取main_task内容
main_task_match = re.search(r'(.*?)', xml_string, re.DOTALL)
if main_task_match:
main_task_content = main_task_match.group(1)
main_task = {}
# 获取主任务名称
name_match = re.search(r'(.*?)', main_task_content, re.DOTALL)
if name_match:
main_task['name'] = name_match.group(1).strip()
# 获取主任务输出
output_match = re.search(r'', main_task_content, re.DOTALL)
if output_match:
main_task['output'] = output_match.group(1).strip()
# 获取主任务描述
description_match = re.search(r'(.*?)', main_task_content, re.DOTALL)
if description_match:
main_task['description'] = description_match.group(1).strip()
result["main_task"] = main_task
# 提取...部分
tasks_pattern = re.compile(r'(.*?)', re.DOTALL)
tasks_match = tasks_pattern.search(xml_string)
if tasks_match:
tasks_content = tasks_match.group(1)
# 提取每个task块
task_pattern = re.compile(r'(.*?)', re.DOTALL)
task_matches = task_pattern.finditer(tasks_content)
for task_match in task_matches:
task_content = task_match.group(1)
task_dict = {}
# 获取任务名称
name_match = re.search(r'(.*?)', task_content, re.DOTALL)
if not name_match:
continue # 跳过没有名称的任务
name = name_match.group(1).strip()
task_dict['name'] = name
# 获取输出信息
output_match = re.search(r'', task_content, re.DOTALL)
task_dict['output'] = output_match.group(1).strip() if output_match else ""
# 获取描述信息
description_match = re.search(r'(.*?)', task_content, re.DOTALL)
task_dict['description'] = description_match.group(1).strip() if description_match else ""
# 获取依赖任务列表
depend_tasks = []
depend_tasks_section = re.search(r'(.*?)', task_content, re.DOTALL)
if depend_tasks_section:
depend_task_matches = re.finditer(r'(.*?)',
depend_tasks_section.group(1), re.DOTALL)
for dt_match in depend_task_matches:
if dt_match.group(1).strip():
depend_tasks.append(dt_match.group(1).strip())
task_dict['depend_tasks'] = depend_tasks
# 获取依赖资源列表
depend_resources = []
resources_match = re.search(r'(.*?)', task_content, re.DOTALL)
if resources_match and resources_match.group(1).strip():
resources_text = resources_match.group(1).strip()
depend_resources = [res.strip() for res in resources_text.split(',') if res.strip()]
task_dict['depend_resources'] = depend_resources
# 将任务添加到结果字典
result["tasks"].append(task_dict)
# 提取resources内容
resources_pattern = re.compile(r'(.*?)', re.DOTALL)
resources_match = resources_pattern.search(xml_string)
if resources_match:
resources_content = resources_match.group(1).strip()
result["resources"] = resources_content
return result
except Exception as e:
raise ValueError(f"处理XML数据时发生错误: {e}")
def parse_planner_result(result):
"""
解析规划结果,并为每个任务添加任务目录名
参数:
result: 包含thoughts、main_task、tasks和resources的规划结果字符串
返回:
解析后的完整规划信息字典
"""
# 使用parse_xml_content解析完整内容
parsed_result = parse_xml_content(result)
task_name_to_index = {}
task_dict = {
'tasks': {},
'max_index': 1,
}
# 为每个任务添加task_dir字段
for i, task_info in enumerate(parsed_result["tasks"]):
# 使用sanitize_filename生成目录名
task_name = task_info.get("name", "task")
depend_tasks_dir = []
task_info['task_dir'] = get_task_dir(task_name, task_dict)
for depend_task in task_info.get("depend_tasks", []):
depend_tasks_dir.append(get_task_dir(depend_task, task_dict))
task_info['depend_tasks_dir'] = depend_tasks_dir
task_info['status'] = 'todo' # 任务状态,todo: 未开始,doing: 进行中,success: 已完成,fail: 失败
task_name_to_index[task_name] = i
# 为主任务也添加task_dir字段
if parsed_result["main_task"]:
main_task_name = parsed_result["main_task"].get("name", "main_task")
parsed_result["main_task"]["task_dir"] = sanitize_filename(main_task_name)
return parsed_result, task_name_to_index
def get_task_dir(task_name, task_dict, append_index=True):
max_index = task_dict.get('max_index', 1)
if task_name in task_dict['tasks']:
return task_dict['tasks'][task_name]
max_index_str = f"{max_index:02d}"
task_dir_raw = sanitize_filename(task_name)
if append_index:
task_dir = f"{max_index_str}_{task_dir_raw}"
else:
task_dir = task_dir_raw
task_dict['tasks'][task_name] = task_dir
task_dict['max_index'] = max_index + 1
return task_dir
def sanitize_filename(task_name: str, max_length: int = 20) -> str:
"""
将任务名称转换为适合作为文件夹名称的字符串
参数:
task_name: 需要转换的任务名称
max_length: 文件名最大长度限制,默认80个字符
返回:
处理后适合作为文件名/文件夹名的字符串
"""
# 替换Windows和Unix系统中不允许的文件名字符
# 替换 / \ : * ? " < > | 等字符为下划线
sanitized = re.sub(r'[\\/*?:"<>|]', '_', task_name)
# 替换连续的空白字符为单个下划线
sanitized = re.sub(r'\s+', '_', sanitized)
# 移除开头和结尾的点和空格
sanitized = sanitized.strip('. ')
# 如果名称过长,截断它
if len(sanitized) > max_length:
# 保留前面的部分和后面的部分,中间用...连接
half_length = (max_length - 3) // 2
sanitized = sanitized[:half_length] + '...' + sanitized[-half_length:]
# 确保名称不为空
if not sanitized:
sanitized = "unnamed_task"
return sanitized
def write_json(data, file_path: str) -> None:
"""
将数据写入JSON文件
参数:
data: 要写入的数据对象
file_path: 目标文件路径
返回:
无
"""
import json
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def write_string_to_file(content: str, file_path: str) -> None:
"""
将字符串内容写入文件
参数:
content: 要写入的字符串内容
file_path: 目标文件路径
返回:
无
"""
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
def pretty_process(result):
def format_output(in_str):
return in_str.replace('\n\n', '\n').replace('\\"', '"')
process_list = []
i = 0
call_dict = {}
# 首先收集所有工具调用输出
for row in result:
if isinstance(row, list):
# 处理列表:递归处理列表中的每个项目
for item in row:
if isinstance(item, dict) and item.get('type', '') == 'function_call_output':
call_id = item['call_id']
call_dict[call_id] = item['output']
elif isinstance(row, dict) and row.get('type', '') == 'function_call_output':
call_id = row['call_id']
call_dict[call_id] = row['output']
# 然后处理每一行
for row in result:
if isinstance(row, list):
# 递归处理列表中的每个项目
for item in row:
if isinstance(item, dict):
process_row(item, process_list, call_dict, i)
i += 1
else:
# 直接处理字典项
process_row(row, process_list, call_dict, i)
i += 1
process_str = '\n'.join(process_list)
return process_str
def process_row(row, process_list, call_dict, i):
"""处理单个行项目,添加到处理列表中"""
def format_output(in_str):
return in_str.replace('\n\n', '\n').replace('\\"', '"')
if not isinstance(row, dict):
return
action = ''
out = ''
call_id = ''
role_ = row.get('role', '')
type_ = row.get('type', '')
if type_ == 'function_call':
action = f'工具调用-{row.get("name")}'
out = row['arguments']
call_id = row['call_id']
elif type_ == 'function_call_output':
return # 跳过函数调用输出,它们已经被收集到call_dict中
elif role_ in ('user', 'assistant'):
action = role_
if isinstance(row['content'], str):
out = row['content']
else:
content_text = ""
for this_c in row['content']:
if isinstance(this_c, dict) and 'text' in this_c:
content_text += this_c['text']
out = content_text
process_list.append('\n\n' + f'{i+1}. ' + '## ' + action + ' ' * 4 + '-' * 32 + '\n')
process_list.append(format_output(str(out)))
# 如果存在对应的工具输出,添加它
if call_id and call_id in call_dict:
process_list.append('\n\n' + f'{i+2}. ' + '## ' + '工具输出' + ' ' * 4 + '-' * 32 + '\n')
process_list.append(format_output(call_dict[call_id]))