from typing import List, Dict, Any import json from .my_trace import get_current_time import re import uuid import datetime def parse_json_from_text(text: str) -> dict: """ 从文本中解析JSON,支持多种格式的JSON代码块 Args: text (str): 包含JSON的文本 Returns: dict: 解析后的JSON数据,解析失败返回空字典 """ if not text or not isinstance(text, str): return {} # 去除首尾空白字符 text = text.strip() # 定义可能的JSON代码块标记 json_markers = [ ("'''json", "'''"), ('"""json', '"""'), ("```json", "```"), ("```", "```") ] # 尝试提取JSON代码块 json_content = text for start_marker, end_marker in json_markers: if text.startswith(start_marker): # 找到开始标记,查找结束标记 start_pos = len(start_marker) end_pos = text.find(end_marker, start_pos) if end_pos != -1: json_content = text[start_pos:end_pos].strip() break # 如果没有找到代码块标记,检查是否以结束标记结尾并移除 if json_content == text: for _, end_marker in json_markers: if text.endswith(end_marker): json_content = text[:-len(end_marker)].strip() break # 尝试解析JSON try: return json.loads(json_content) except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") # 如果直接解析失败,尝试查找第一个{到最后一个}的内容 try: first_brace = json_content.find('{') last_brace = json_content.rfind('}') if first_brace != -1 and last_brace != -1 and first_brace < last_brace: json_part = json_content[first_brace:last_brace + 1] return json.loads(json_part) except json.JSONDecodeError: pass return {} def get_safe_filename(filename: str) -> str: """ 生成安全的文件名,移除不安全字符 Args: filename: 原始文件名 Returns: str: 安全的文件名 """ # 移除不安全的字符,只保留字母、数字、下划线、连字符和点 return re.sub(r'[^\w\-\./]', '_', filename) def generate_image_filename(mime_type: str, prefix: str = "gemini_img") -> str: """ 生成合理的图片文件名 Args: mime_type: 文件MIME类型 prefix: 文件名前缀 Returns: str: 生成的文件名 """ # 获取当前时间戳 timestamp = datetime.datetime.now().strftime("%Y%m%d/%H%M%S") # 获取文件扩展名 extension = mime_type.split('/')[-1] if extension == "jpeg": extension = "jpg" # 生成唯一ID (短UUID) unique_id = str(uuid.uuid4())[:4] # 组合文件名 filename = f"{prefix}/{timestamp}_{unique_id}.{extension}" # 确保文件名安全 return get_safe_filename(filename) def parse_multimodal_content(content: str) -> List[Dict[str, Any]]: """解析多模态内容,保持上下文顺序,适用于AI参数传递 """ result = [] lines = content.split('\n') role = '' for line in lines: line = line.strip() if not line: continue # 分割前缀和内容 if ':' in line: prefix, content = line.split(':', 1) prefix = prefix.strip().lower() content = content.strip() row = {} if prefix == 'image': row = { "type": "image_url", "image_url": { "url": content } } elif prefix == 'text': row = { "type": "text", "text": content } elif prefix == 'role': role = content if row: if role: row['role'] = role role = '' result.append(row) return result def read_json(file_path): """ 读取JSON文件并返回解析后的数据 Args: file_path: JSON文件路径 Returns: 解析后的JSON数据 """ try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"读取JSON文件时出错: {e}") return None def save_json(data, file_path): """ 保存数据到JSON文件 Args: data: 要保存的数据 file_path: 保存路径 """ with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def get_script_data(file_path): """ 读取JSON文件并返回解析后的数据 Args: file_path: JSON文件路径 """ return read_json(file_path)['脚本'] import os import xml.etree.ElementTree as ET from typing import Dict, List, Any import re import unicodedata def get_model(model_name): # return 'gemini/gemini-2.5-flash' # return 'litellm/gemini/gemini-2.5-flash' if model_name.startswith('litellm'): return model_name else: from openai import AsyncOpenAI from agents import OpenAIChatCompletionsModel BASE_URL = os.getenv("EXAMPLE_BASE_URL") or "https://openrouter.ai/api/v1" API_KEY = os.getenv("OPENROUTER_API_KEY") or "" client = AsyncOpenAI( base_url=BASE_URL, api_key=API_KEY, ) return OpenAIChatCompletionsModel( # model='google/gemini-2.5-pro-preview', # model='google/gemini-2.5-flash-preview-05-20', # model='google/gemini-2.5-flash-preview-05-20', # model='google/gemini-2.5-flash', # model='google/gemini-2.5-flash', # model='google/gemini-2.5-flash-preview-05-20:thinking', # model='google/gemini-2.0-flash-001', model=model_name, openai_client=client, ) def read_file_as_string(file_path): """读取文件内容并返回字符串""" try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read().strip() return content except Exception as e: print(f"读取文件时出错: {e}") return None def save_file_as_string(file_path, content): """将字符串内容写入文件""" with open(file_path, 'w', encoding='utf-8') as f: f.write(content) def extract_html_from_markdown(text): """ 从可能包含markdown或其他代码块的文本中提取HTML内容 参数: text: 可能包含各种格式的文本 返回: 提取出的纯HTML内容 """ # 处理```html```格式(反引号) backtick_pattern = r"```(?:html)?\s*([\s\S]*?)```" backtick_matches = re.findall(backtick_pattern, text) # 处理'''html'''格式(单引号) single_quote_pattern = r"'''(?:html)?\s*([\s\S]*?)'''" single_quote_matches = re.findall(single_quote_pattern, text) # 处理"""html"""格式(双引号) double_quote_pattern = r'"""(?:html)?\s*([\s\S]*?)"""' double_quote_matches = re.findall(double_quote_pattern, text) if backtick_matches: # 优先使用反引号格式 return backtick_matches[0].strip() elif single_quote_matches: # 其次使用单引号格式 return single_quote_matches[0].strip() elif double_quote_matches: # 再次使用双引号格式 return double_quote_matches[0].strip() else: # 如果没有代码块格式,直接返回原get_current_time始文本 return text def create_workspace_dir(current_time=None, make_dir=True): if not current_time: current_time = get_current_time() task_dir = f"result/{current_time}" if make_dir: os.makedirs(task_dir, exist_ok=True) task_dir_absolute = os.path.abspath(task_dir) # print(f"任务目录的绝对路径: {task_dir_absolute}") return task_dir_absolute, str(current_time) def extract_tag_content(text, tag_name): """ 从文本中提取指定标签内的内容 参数: text (str): 要处理的文本 tag_name (str): 要提取的标签名称 返回: str: 标签内的内容,如果未找到则返回空字符串 """ import re pattern = f"<{tag_name}>(.*?)" match = re.search(pattern, text, re.DOTALL) if match: return match.group(1).strip() return "" from typing import Dict, List, Optional def parse_tasks(tasks_xml: str) -> List[Dict]: """Parse XML tasks into a list of task dictionaries.""" tasks = [] current_task = {} for line in tasks_xml.split('\n'): line = line.strip() if not line: continue if line.startswith(""): current_task = {} elif line.startswith(""): current_task["name"] = line[6:-7].strip() elif line.startswith(""): current_task["output"] = line[12:-13].strip() elif line.startswith(""): if "description" in current_task: if "type" not in current_task: current_task["type"] = "default" tasks.append(current_task) return tasks def parse_xml_content(xml_string: str) -> Dict[str, Any]: """ 将XML字符串解析成字典,提取main_task、thoughts、tasks和resources 参数: xml_string: 包含任务信息的XML字符串 返回: 包含所有解析信息的字典 """ # 创建结果字典 result = { "main_task": {}, "thoughts": "", "tasks": [], "resources": [] } try: # 提取thoughts内容 thoughts_match = re.search(r'(.*?)', xml_string, re.DOTALL) if thoughts_match: result["thoughts"] = thoughts_match.group(1).strip() # 提取main_task内容 main_task_match = re.search(r'(.*?)', xml_string, re.DOTALL) if main_task_match: main_task_content = main_task_match.group(1) main_task = {} # 获取主任务名称 name_match = re.search(r'(.*?)', main_task_content, re.DOTALL) if name_match: main_task['name'] = name_match.group(1).strip() # 获取主任务输出 output_match = re.search(r'(.*?)', main_task_content, re.DOTALL) if output_match: main_task['output'] = output_match.group(1).strip() # 获取主任务描述 description_match = re.search(r'(.*?)', main_task_content, re.DOTALL) if description_match: main_task['description'] = description_match.group(1).strip() result["main_task"] = main_task # 提取...部分 tasks_pattern = re.compile(r'(.*?)', re.DOTALL) tasks_match = tasks_pattern.search(xml_string) if tasks_match: tasks_content = tasks_match.group(1) # 提取每个task块 task_pattern = re.compile(r'(.*?)', re.DOTALL) task_matches = task_pattern.finditer(tasks_content) for task_match in task_matches: task_content = task_match.group(1) task_dict = {} # 获取任务名称 name_match = re.search(r'(.*?)', task_content, re.DOTALL) if not name_match: continue # 跳过没有名称的任务 name = name_match.group(1).strip() task_dict['name'] = name # 获取输出信息 output_match = re.search(r'(.*?)', task_content, re.DOTALL) task_dict['output'] = output_match.group(1).strip() if output_match else "" # 获取描述信息 description_match = re.search(r'(.*?)', task_content, re.DOTALL) task_dict['description'] = description_match.group(1).strip() if description_match else "" # 获取依赖任务列表 depend_tasks = [] depend_tasks_section = re.search(r'(.*?)', task_content, re.DOTALL) if depend_tasks_section: depend_task_matches = re.finditer(r'(.*?)', depend_tasks_section.group(1), re.DOTALL) for dt_match in depend_task_matches: if dt_match.group(1).strip(): depend_tasks.append(dt_match.group(1).strip()) task_dict['depend_tasks'] = depend_tasks # 获取依赖资源列表 depend_resources = [] resources_match = re.search(r'(.*?)', task_content, re.DOTALL) if resources_match and resources_match.group(1).strip(): resources_text = resources_match.group(1).strip() depend_resources = [res.strip() for res in resources_text.split(',') if res.strip()] task_dict['depend_resources'] = depend_resources # 将任务添加到结果字典 result["tasks"].append(task_dict) # 提取resources内容 resources_pattern = re.compile(r'(.*?)', re.DOTALL) resources_match = resources_pattern.search(xml_string) if resources_match: resources_content = resources_match.group(1).strip() result["resources"] = resources_content return result except Exception as e: raise ValueError(f"处理XML数据时发生错误: {e}") def parse_planner_result(result): """ 解析规划结果,并为每个任务添加任务目录名 参数: result: 包含thoughts、main_task、tasks和resources的规划结果字符串 返回: 解析后的完整规划信息字典 """ # 使用parse_xml_content解析完整内容 parsed_result = parse_xml_content(result) task_name_to_index = {} task_dict = { 'tasks': {}, 'max_index': 1, } # 为每个任务添加task_dir字段 for i, task_info in enumerate(parsed_result["tasks"]): # 使用sanitize_filename生成目录名 task_name = task_info.get("name", "task") depend_tasks_dir = [] task_info['task_dir'] = get_task_dir(task_name, task_dict) for depend_task in task_info.get("depend_tasks", []): depend_tasks_dir.append(get_task_dir(depend_task, task_dict)) task_info['depend_tasks_dir'] = depend_tasks_dir task_info['status'] = 'todo' # 任务状态,todo: 未开始,doing: 进行中,success: 已完成,fail: 失败 task_name_to_index[task_name] = i # 为主任务也添加task_dir字段 if parsed_result["main_task"]: main_task_name = parsed_result["main_task"].get("name", "main_task") parsed_result["main_task"]["task_dir"] = sanitize_filename(main_task_name) return parsed_result, task_name_to_index def get_task_dir(task_name, task_dict, append_index=True): max_index = task_dict.get('max_index', 1) if task_name in task_dict['tasks']: return task_dict['tasks'][task_name] max_index_str = f"{max_index:02d}" task_dir_raw = sanitize_filename(task_name) if append_index: task_dir = f"{max_index_str}_{task_dir_raw}" else: task_dir = task_dir_raw task_dict['tasks'][task_name] = task_dir task_dict['max_index'] = max_index + 1 return task_dir def sanitize_filename(task_name: str, max_length: int = 20) -> str: """ 将任务名称转换为适合作为文件夹名称的字符串 参数: task_name: 需要转换的任务名称 max_length: 文件名最大长度限制,默认80个字符 返回: 处理后适合作为文件名/文件夹名的字符串 """ # 替换Windows和Unix系统中不允许的文件名字符 # 替换 / \ : * ? " < > | 等字符为下划线 sanitized = re.sub(r'[\\/*?:"<>|]', '_', task_name) # 替换连续的空白字符为单个下划线 sanitized = re.sub(r'\s+', '_', sanitized) # 移除开头和结尾的点和空格 sanitized = sanitized.strip('. ') # 如果名称过长,截断它 if len(sanitized) > max_length: # 保留前面的部分和后面的部分,中间用...连接 half_length = (max_length - 3) // 2 sanitized = sanitized[:half_length] + '...' + sanitized[-half_length:] # 确保名称不为空 if not sanitized: sanitized = "unnamed_task" return sanitized def write_json(data, file_path: str) -> None: """ 将数据写入JSON文件 参数: data: 要写入的数据对象 file_path: 目标文件路径 返回: 无 """ import json with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def write_string_to_file(content: str, file_path: str) -> None: """ 将字符串内容写入文件 参数: content: 要写入的字符串内容 file_path: 目标文件路径 返回: 无 """ with open(file_path, 'w', encoding='utf-8') as f: f.write(content) def pretty_process(result): def format_output(in_str): return in_str.replace('\n\n', '\n').replace('\\"', '"') process_list = [] i = 0 call_dict = {} # 首先收集所有工具调用输出 for row in result: if isinstance(row, list): # 处理列表:递归处理列表中的每个项目 for item in row: if isinstance(item, dict) and item.get('type', '') == 'function_call_output': call_id = item['call_id'] call_dict[call_id] = item['output'] elif isinstance(row, dict) and row.get('type', '') == 'function_call_output': call_id = row['call_id'] call_dict[call_id] = row['output'] # 然后处理每一行 for row in result: if isinstance(row, list): # 递归处理列表中的每个项目 for item in row: if isinstance(item, dict): process_row(item, process_list, call_dict, i) i += 1 else: # 直接处理字典项 process_row(row, process_list, call_dict, i) i += 1 process_str = '\n'.join(process_list) return process_str def process_row(row, process_list, call_dict, i): """处理单个行项目,添加到处理列表中""" def format_output(in_str): return in_str.replace('\n\n', '\n').replace('\\"', '"') if not isinstance(row, dict): return action = '' out = '' call_id = '' role_ = row.get('role', '') type_ = row.get('type', '') if type_ == 'function_call': action = f'工具调用-{row.get("name")}' out = row['arguments'] call_id = row['call_id'] elif type_ == 'function_call_output': return # 跳过函数调用输出,它们已经被收集到call_dict中 elif role_ in ('user', 'assistant'): action = role_ if isinstance(row['content'], str): out = row['content'] else: content_text = "" for this_c in row['content']: if isinstance(this_c, dict) and 'text' in this_c: content_text += this_c['text'] out = content_text process_list.append('\n\n' + f'{i+1}. ' + '## ' + action + ' ' * 4 + '-' * 32 + '\n') process_list.append(format_output(str(out))) # 如果存在对应的工具输出,添加它 if call_id and call_id in call_dict: process_list.append('\n\n' + f'{i+2}. ' + '## ' + '工具输出' + ' ' * 4 + '-' * 32 + '\n') process_list.append(format_output(call_dict[call_id]))