| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 |
- from typing import List, Dict, Any
- import json
- from .my_trace import get_current_time
- import re
- import uuid
- import datetime
- def parse_json_from_text(text: str) -> dict:
- """
- 从文本中解析JSON,支持多种格式的JSON代码块
-
- Args:
- text (str): 包含JSON的文本
-
- Returns:
- dict: 解析后的JSON数据,解析失败返回空字典
- """
- if not text or not isinstance(text, str):
- return {}
-
- # 去除首尾空白字符
- text = text.strip()
-
- # 定义可能的JSON代码块标记
- json_markers = [
- ("'''json", "'''"),
- ('"""json', '"""'),
- ("```json", "```"),
- ("```", "```")
- ]
-
- # 尝试提取JSON代码块
- json_content = text
- for start_marker, end_marker in json_markers:
- if text.startswith(start_marker):
- # 找到开始标记,查找结束标记
- start_pos = len(start_marker)
- end_pos = text.find(end_marker, start_pos)
- if end_pos != -1:
- json_content = text[start_pos:end_pos].strip()
- break
-
- # 如果没有找到代码块标记,检查是否以结束标记结尾并移除
- if json_content == text:
- for _, end_marker in json_markers:
- if text.endswith(end_marker):
- json_content = text[:-len(end_marker)].strip()
- break
-
- # 尝试解析JSON
- try:
- return json.loads(json_content)
- except json.JSONDecodeError as e:
- print(f"JSON解析失败: {e}")
- # 如果直接解析失败,尝试查找第一个{到最后一个}的内容
- try:
- first_brace = json_content.find('{')
- last_brace = json_content.rfind('}')
- if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
- json_part = json_content[first_brace:last_brace + 1]
- return json.loads(json_part)
- except json.JSONDecodeError:
- pass
-
- return {}
- def get_safe_filename(filename: str) -> str:
- """
- 生成安全的文件名,移除不安全字符
-
- Args:
- filename: 原始文件名
-
- Returns:
- str: 安全的文件名
- """
- # 移除不安全的字符,只保留字母、数字、下划线、连字符和点
- return re.sub(r'[^\w\-\./]', '_', filename)
- def generate_image_filename(mime_type: str, prefix: str = "gemini_img") -> str:
- """
- 生成合理的图片文件名
- Args:
- mime_type: 文件MIME类型
- prefix: 文件名前缀
- Returns:
- str: 生成的文件名
- """
- # 获取当前时间戳
- timestamp = datetime.datetime.now().strftime("%Y%m%d/%H%M%S")
- # 获取文件扩展名
- extension = mime_type.split('/')[-1]
- if extension == "jpeg":
- extension = "jpg"
- # 生成唯一ID (短UUID)
- unique_id = str(uuid.uuid4())[:4]
- # 组合文件名
- filename = f"{prefix}/{timestamp}_{unique_id}.{extension}"
- # 确保文件名安全
- return get_safe_filename(filename)
- def parse_multimodal_content(content: str) -> List[Dict[str, Any]]:
- """解析多模态内容,保持上下文顺序,适用于AI参数传递 """
-
- result = []
- lines = content.split('\n')
- role = ''
-
- for line in lines:
- line = line.strip()
- if not line:
- continue
-
- # 分割前缀和内容
- if ':' in line:
- prefix, content = line.split(':', 1)
- prefix = prefix.strip().lower()
- content = content.strip()
- row = {}
- if prefix == 'image':
- row = {
- "type": "image_url",
- "image_url": {
- "url": content
- }
- }
- elif prefix == 'text':
- row = {
- "type": "text",
- "text": content
- }
- elif prefix == 'role':
- role = content
- if row:
- if role:
- row['role'] = role
- role = ''
- result.append(row)
-
- return result
- def read_json(file_path):
- """
- 读取JSON文件并返回解析后的数据
-
- Args:
- file_path: JSON文件路径
-
- Returns:
- 解析后的JSON数据
- """
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- print(f"读取JSON文件时出错: {e}")
- return None
- def save_json(data, file_path):
- """
- 保存数据到JSON文件
-
- Args:
- data: 要保存的数据
- file_path: 保存路径
- """
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
-
- def get_script_data(file_path):
- """
- 读取JSON文件并返回解析后的数据
-
- Args:
- file_path: JSON文件路径
- """
- return read_json(file_path)['脚本']
- import os
- import xml.etree.ElementTree as ET
- from typing import Dict, List, Any
- import re
- import unicodedata
- def get_model(model_name):
- # return 'gemini/gemini-2.5-flash'
- # return 'litellm/gemini/gemini-2.5-flash'
- if model_name.startswith('litellm'):
- return model_name
- else:
- from openai import AsyncOpenAI
- from agents import OpenAIChatCompletionsModel
- BASE_URL = os.getenv("EXAMPLE_BASE_URL") or "https://openrouter.ai/api/v1"
- API_KEY = os.getenv("OPENROUTER_API_KEY") or ""
- client = AsyncOpenAI(
- base_url=BASE_URL,
- api_key=API_KEY,
- )
- return OpenAIChatCompletionsModel(
- # model='google/gemini-2.5-pro-preview',
- # model='google/gemini-2.5-flash-preview-05-20',
- # model='google/gemini-2.5-flash-preview-05-20',
- # model='google/gemini-2.5-flash',
- # model='google/gemini-2.5-flash',
- # model='google/gemini-2.5-flash-preview-05-20:thinking',
- # model='google/gemini-2.0-flash-001',
- model=model_name,
- openai_client=client,
- )
- def read_file_as_string(file_path):
- """读取文件内容并返回字符串"""
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- content = file.read().strip()
- return content
- except Exception as e:
- print(f"读取文件时出错: {e}")
- return None
- def save_file_as_string(file_path, content):
- """将字符串内容写入文件"""
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(content)
- def extract_html_from_markdown(text):
- """
- 从可能包含markdown或其他代码块的文本中提取HTML内容
-
- 参数:
- text: 可能包含各种格式的文本
-
- 返回:
- 提取出的纯HTML内容
- """
- # 处理```html```格式(反引号)
- backtick_pattern = r"```(?:html)?\s*([\s\S]*?)```"
- backtick_matches = re.findall(backtick_pattern, text)
-
- # 处理'''html'''格式(单引号)
- single_quote_pattern = r"'''(?:html)?\s*([\s\S]*?)'''"
- single_quote_matches = re.findall(single_quote_pattern, text)
-
- # 处理"""html"""格式(双引号)
- double_quote_pattern = r'"""(?:html)?\s*([\s\S]*?)"""'
- double_quote_matches = re.findall(double_quote_pattern, text)
-
- if backtick_matches:
- # 优先使用反引号格式
- return backtick_matches[0].strip()
- elif single_quote_matches:
- # 其次使用单引号格式
- return single_quote_matches[0].strip()
- elif double_quote_matches:
- # 再次使用双引号格式
- return double_quote_matches[0].strip()
- else:
- # 如果没有代码块格式,直接返回原get_current_time始文本
- return text
-
- def create_workspace_dir(current_time=None, make_dir=True):
- if not current_time:
- current_time = get_current_time()
- task_dir = f"result/{current_time}"
- if make_dir:
- os.makedirs(task_dir, exist_ok=True)
- task_dir_absolute = os.path.abspath(task_dir)
- # print(f"任务目录的绝对路径: {task_dir_absolute}")
- return task_dir_absolute, str(current_time)
- def extract_tag_content(text, tag_name):
- """
- 从文本中提取指定标签内的内容
-
- 参数:
- text (str): 要处理的文本
- tag_name (str): 要提取的标签名称
-
- 返回:
- str: 标签内的内容,如果未找到则返回空字符串
- """
- import re
- pattern = f"<{tag_name}>(.*?)</{tag_name}>"
- match = re.search(pattern, text, re.DOTALL)
- if match:
- return match.group(1).strip()
- return ""
- from typing import Dict, List, Optional
- def parse_tasks(tasks_xml: str) -> List[Dict]:
- """Parse XML tasks into a list of task dictionaries."""
- tasks = []
- current_task = {}
-
- for line in tasks_xml.split('\n'):
- line = line.strip()
- if not line:
- continue
-
- if line.startswith("<task>"):
- current_task = {}
- elif line.startswith("<name>"):
- current_task["name"] = line[6:-7].strip()
- elif line.startswith("<output>"):
- current_task["output"] = line[12:-13].strip()
- elif line.startswith("</task>"):
- if "description" in current_task:
- if "type" not in current_task:
- current_task["type"] = "default"
- tasks.append(current_task)
-
- return tasks
-
-
- def parse_xml_content(xml_string: str) -> Dict[str, Any]:
- """
- 将XML字符串解析成字典,提取main_task、thoughts、tasks和resources
-
- 参数:
- xml_string: 包含任务信息的XML字符串
-
- 返回:
- 包含所有解析信息的字典
- """
- # 创建结果字典
- result = {
- "main_task": {},
- "thoughts": "",
- "tasks": [],
- "resources": []
- }
-
- try:
- # 提取thoughts内容
- thoughts_match = re.search(r'<thoughts>(.*?)</thoughts>', xml_string, re.DOTALL)
- if thoughts_match:
- result["thoughts"] = thoughts_match.group(1).strip()
-
- # 提取main_task内容
- main_task_match = re.search(r'<main_task>(.*?)</main_task>', xml_string, re.DOTALL)
- if main_task_match:
- main_task_content = main_task_match.group(1)
- main_task = {}
-
- # 获取主任务名称
- name_match = re.search(r'<name>(.*?)</name>', main_task_content, re.DOTALL)
- if name_match:
- main_task['name'] = name_match.group(1).strip()
-
- # 获取主任务输出
- output_match = re.search(r'<output>(.*?)</output>', main_task_content, re.DOTALL)
- if output_match:
- main_task['output'] = output_match.group(1).strip()
-
- # 获取主任务描述
- description_match = re.search(r'<description>(.*?)</description>', main_task_content, re.DOTALL)
- if description_match:
- main_task['description'] = description_match.group(1).strip()
-
- result["main_task"] = main_task
-
- # 提取<tasks>...</tasks>部分
- tasks_pattern = re.compile(r'<tasks>(.*?)</tasks>', re.DOTALL)
- tasks_match = tasks_pattern.search(xml_string)
-
- if tasks_match:
- tasks_content = tasks_match.group(1)
-
- # 提取每个task块
- task_pattern = re.compile(r'<task>(.*?)</task>', re.DOTALL)
- task_matches = task_pattern.finditer(tasks_content)
-
- for task_match in task_matches:
- task_content = task_match.group(1)
- task_dict = {}
-
- # 获取任务名称
- name_match = re.search(r'<name>(.*?)</name>', task_content, re.DOTALL)
- if not name_match:
- continue # 跳过没有名称的任务
-
- name = name_match.group(1).strip()
- task_dict['name'] = name
- # 获取输出信息
- output_match = re.search(r'<output>(.*?)</output>', task_content, re.DOTALL)
- task_dict['output'] = output_match.group(1).strip() if output_match else ""
-
- # 获取描述信息
- description_match = re.search(r'<description>(.*?)</description>', task_content, re.DOTALL)
- task_dict['description'] = description_match.group(1).strip() if description_match else ""
-
- # 获取依赖任务列表
- depend_tasks = []
- depend_tasks_section = re.search(r'<depend_tasks>(.*?)</depend_tasks>', task_content, re.DOTALL)
- if depend_tasks_section:
- depend_task_matches = re.finditer(r'<depend_task>(.*?)</depend_task>',
- depend_tasks_section.group(1), re.DOTALL)
- for dt_match in depend_task_matches:
- if dt_match.group(1).strip():
- depend_tasks.append(dt_match.group(1).strip())
-
- task_dict['depend_tasks'] = depend_tasks
-
- # 获取依赖资源列表
- depend_resources = []
- resources_match = re.search(r'<depend_resources>(.*?)</depend_resources>', task_content, re.DOTALL)
- if resources_match and resources_match.group(1).strip():
- resources_text = resources_match.group(1).strip()
- depend_resources = [res.strip() for res in resources_text.split(',') if res.strip()]
-
- task_dict['depend_resources'] = depend_resources
-
- # 将任务添加到结果字典
- result["tasks"].append(task_dict)
-
- # 提取resources内容
- resources_pattern = re.compile(r'<resources>(.*?)</resources>', re.DOTALL)
- resources_match = resources_pattern.search(xml_string)
-
- if resources_match:
- resources_content = resources_match.group(1).strip()
- result["resources"] = resources_content
- return result
-
- except Exception as e:
- raise ValueError(f"处理XML数据时发生错误: {e}")
- def parse_planner_result(result):
- """
- 解析规划结果,并为每个任务添加任务目录名
-
- 参数:
- result: 包含thoughts、main_task、tasks和resources的规划结果字符串
-
- 返回:
- 解析后的完整规划信息字典
- """
- # 使用parse_xml_content解析完整内容
- parsed_result = parse_xml_content(result)
- task_name_to_index = {}
- task_dict = {
- 'tasks': {},
- 'max_index': 1,
- }
-
- # 为每个任务添加task_dir字段
- for i, task_info in enumerate(parsed_result["tasks"]):
- # 使用sanitize_filename生成目录名
- task_name = task_info.get("name", "task")
- depend_tasks_dir = []
- task_info['task_dir'] = get_task_dir(task_name, task_dict)
- for depend_task in task_info.get("depend_tasks", []):
- depend_tasks_dir.append(get_task_dir(depend_task, task_dict))
- task_info['depend_tasks_dir'] = depend_tasks_dir
- task_info['status'] = 'todo' # 任务状态,todo: 未开始,doing: 进行中,success: 已完成,fail: 失败
- task_name_to_index[task_name] = i
-
- # 为主任务也添加task_dir字段
- if parsed_result["main_task"]:
- main_task_name = parsed_result["main_task"].get("name", "main_task")
- parsed_result["main_task"]["task_dir"] = sanitize_filename(main_task_name)
-
- return parsed_result, task_name_to_index
- def get_task_dir(task_name, task_dict, append_index=True):
- max_index = task_dict.get('max_index', 1)
- if task_name in task_dict['tasks']:
- return task_dict['tasks'][task_name]
- max_index_str = f"{max_index:02d}"
- task_dir_raw = sanitize_filename(task_name)
- if append_index:
- task_dir = f"{max_index_str}_{task_dir_raw}"
- else:
- task_dir = task_dir_raw
- task_dict['tasks'][task_name] = task_dir
- task_dict['max_index'] = max_index + 1
- return task_dir
-
- def sanitize_filename(task_name: str, max_length: int = 20) -> str:
- """
- 将任务名称转换为适合作为文件夹名称的字符串
-
- 参数:
- task_name: 需要转换的任务名称
- max_length: 文件名最大长度限制,默认80个字符
-
- 返回:
- 处理后适合作为文件名/文件夹名的字符串
- """
- # 替换Windows和Unix系统中不允许的文件名字符
- # 替换 / \ : * ? " < > | 等字符为下划线
- sanitized = re.sub(r'[\\/*?:"<>|]', '_', task_name)
-
- # 替换连续的空白字符为单个下划线
- sanitized = re.sub(r'\s+', '_', sanitized)
-
- # 移除开头和结尾的点和空格
- sanitized = sanitized.strip('. ')
-
- # 如果名称过长,截断它
- if len(sanitized) > max_length:
- # 保留前面的部分和后面的部分,中间用...连接
- half_length = (max_length - 3) // 2
- sanitized = sanitized[:half_length] + '...' + sanitized[-half_length:]
-
- # 确保名称不为空
- if not sanitized:
- sanitized = "unnamed_task"
-
- return sanitized
- def write_json(data, file_path: str) -> None:
- """
- 将数据写入JSON文件
-
- 参数:
- data: 要写入的数据对象
- file_path: 目标文件路径
-
- 返回:
- 无
- """
- import json
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- def write_string_to_file(content: str, file_path: str) -> None:
- """
- 将字符串内容写入文件
-
- 参数:
- content: 要写入的字符串内容
- file_path: 目标文件路径
-
- 返回:
- 无
- """
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(content)
- def pretty_process(result):
- def format_output(in_str):
- return in_str.replace('\n\n', '\n').replace('\\"', '"')
- process_list = []
- i = 0
- call_dict = {}
-
- # 首先收集所有工具调用输出
- for row in result:
- if isinstance(row, list):
- # 处理列表:递归处理列表中的每个项目
- for item in row:
- if isinstance(item, dict) and item.get('type', '') == 'function_call_output':
- call_id = item['call_id']
- call_dict[call_id] = item['output']
- elif isinstance(row, dict) and row.get('type', '') == 'function_call_output':
- call_id = row['call_id']
- call_dict[call_id] = row['output']
-
- # 然后处理每一行
- for row in result:
- if isinstance(row, list):
- # 递归处理列表中的每个项目
- for item in row:
- if isinstance(item, dict):
- process_row(item, process_list, call_dict, i)
- i += 1
- else:
- # 直接处理字典项
- process_row(row, process_list, call_dict, i)
- i += 1
-
- process_str = '\n'.join(process_list)
- return process_str
- def process_row(row, process_list, call_dict, i):
- """处理单个行项目,添加到处理列表中"""
- def format_output(in_str):
- return in_str.replace('\n\n', '\n').replace('\\"', '"')
-
- if not isinstance(row, dict):
- return
-
- action = ''
- out = ''
- call_id = ''
- role_ = row.get('role', '')
- type_ = row.get('type', '')
-
- if type_ == 'function_call':
- action = f'工具调用-{row.get("name")}'
- out = row['arguments']
- call_id = row['call_id']
- elif type_ == 'function_call_output':
- return # 跳过函数调用输出,它们已经被收集到call_dict中
- elif role_ in ('user', 'assistant'):
- action = role_
- if isinstance(row['content'], str):
- out = row['content']
- else:
- content_text = ""
- for this_c in row['content']:
- if isinstance(this_c, dict) and 'text' in this_c:
- content_text += this_c['text']
- out = content_text
-
- process_list.append('\n\n' + f'{i+1}. ' + '## ' + action + ' ' * 4 + '-' * 32 + '\n')
- process_list.append(format_output(str(out)))
-
- # 如果存在对应的工具输出,添加它
- if call_id and call_id in call_dict:
- process_list.append('\n\n' + f'{i+2}. ' + '## ' + '工具输出' + ' ' * 4 + '-' * 32 + '\n')
- process_list.append(format_output(call_dict[call_id]))
|