|
|
@@ -0,0 +1,624 @@
|
|
|
+from typing import List, Dict, Any
|
|
|
+import json
|
|
|
+from .my_trace import get_current_time
|
|
|
+import re
|
|
|
+import uuid
|
|
|
+import datetime
|
|
|
+
|
|
|
+def parse_json_from_text(text: str) -> dict:
|
|
|
+ """
|
|
|
+ 从文本中解析JSON,支持多种格式的JSON代码块
|
|
|
+
|
|
|
+ Args:
|
|
|
+ text (str): 包含JSON的文本
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 解析后的JSON数据,解析失败返回空字典
|
|
|
+ """
|
|
|
+ if not text or not isinstance(text, str):
|
|
|
+ return {}
|
|
|
+
|
|
|
+ # 去除首尾空白字符
|
|
|
+ text = text.strip()
|
|
|
+
|
|
|
+ # 定义可能的JSON代码块标记
|
|
|
+ json_markers = [
|
|
|
+ ("'''json", "'''"),
|
|
|
+ ('"""json', '"""'),
|
|
|
+ ("```json", "```"),
|
|
|
+ ("```", "```")
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 尝试提取JSON代码块
|
|
|
+ json_content = text
|
|
|
+ for start_marker, end_marker in json_markers:
|
|
|
+ if text.startswith(start_marker):
|
|
|
+ # 找到开始标记,查找结束标记
|
|
|
+ start_pos = len(start_marker)
|
|
|
+ end_pos = text.find(end_marker, start_pos)
|
|
|
+ if end_pos != -1:
|
|
|
+ json_content = text[start_pos:end_pos].strip()
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果没有找到代码块标记,检查是否以结束标记结尾并移除
|
|
|
+ if json_content == text:
|
|
|
+ for _, end_marker in json_markers:
|
|
|
+ if text.endswith(end_marker):
|
|
|
+ json_content = text[:-len(end_marker)].strip()
|
|
|
+ break
|
|
|
+
|
|
|
+ # 尝试解析JSON
|
|
|
+ try:
|
|
|
+ return json.loads(json_content)
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"JSON解析失败: {e}")
|
|
|
+ # 如果直接解析失败,尝试查找第一个{到最后一个}的内容
|
|
|
+ try:
|
|
|
+ first_brace = json_content.find('{')
|
|
|
+ last_brace = json_content.rfind('}')
|
|
|
+ if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
|
|
|
+ json_part = json_content[first_brace:last_brace + 1]
|
|
|
+ return json.loads(json_part)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ return {}
|
|
|
+
|
|
|
+
|
|
|
+def get_safe_filename(filename: str) -> str:
|
|
|
+ """
|
|
|
+ 生成安全的文件名,移除不安全字符
|
|
|
+
|
|
|
+ Args:
|
|
|
+ filename: 原始文件名
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: 安全的文件名
|
|
|
+ """
|
|
|
+ # 移除不安全的字符,只保留字母、数字、下划线、连字符和点
|
|
|
+ return re.sub(r'[^\w\-\./]', '_', filename)
|
|
|
+
|
|
|
+
|
|
|
+def generate_image_filename(mime_type: str, prefix: str = "gemini_img") -> str:
|
|
|
+ """
|
|
|
+ 生成合理的图片文件名
|
|
|
+
|
|
|
+ Args:
|
|
|
+ mime_type: 文件MIME类型
|
|
|
+ prefix: 文件名前缀
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: 生成的文件名
|
|
|
+ """
|
|
|
+ # 获取当前时间戳
|
|
|
+ timestamp = datetime.datetime.now().strftime("%Y%m%d/%H%M%S")
|
|
|
+
|
|
|
+ # 获取文件扩展名
|
|
|
+ extension = mime_type.split('/')[-1]
|
|
|
+ if extension == "jpeg":
|
|
|
+ extension = "jpg"
|
|
|
+
|
|
|
+ # 生成唯一ID (短UUID)
|
|
|
+ unique_id = str(uuid.uuid4())[:4]
|
|
|
+
|
|
|
+ # 组合文件名
|
|
|
+ filename = f"{prefix}/{timestamp}_{unique_id}.{extension}"
|
|
|
+
|
|
|
+ # 确保文件名安全
|
|
|
+ return get_safe_filename(filename)
|
|
|
+
|
|
|
+def parse_multimodal_content(content: str) -> List[Dict[str, Any]]:
|
|
|
+ """解析多模态内容,保持上下文顺序,适用于AI参数传递 """
|
|
|
+
|
|
|
+ result = []
|
|
|
+ lines = content.split('\n')
|
|
|
+ role = ''
|
|
|
+
|
|
|
+ for line in lines:
|
|
|
+ line = line.strip()
|
|
|
+ if not line:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 分割前缀和内容
|
|
|
+ if ':' in line:
|
|
|
+ prefix, content = line.split(':', 1)
|
|
|
+ prefix = prefix.strip().lower()
|
|
|
+ content = content.strip()
|
|
|
+ row = {}
|
|
|
+ if prefix == 'image':
|
|
|
+ row = {
|
|
|
+ "type": "image_url",
|
|
|
+ "image_url": {
|
|
|
+ "url": content
|
|
|
+ }
|
|
|
+ }
|
|
|
+ elif prefix == 'text':
|
|
|
+ row = {
|
|
|
+ "type": "text",
|
|
|
+ "text": content
|
|
|
+ }
|
|
|
+ elif prefix == 'role':
|
|
|
+ role = content
|
|
|
+ if row:
|
|
|
+ if role:
|
|
|
+ row['role'] = role
|
|
|
+ role = ''
|
|
|
+ result.append(row)
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def read_json(file_path):
|
|
|
+ """
|
|
|
+ 读取JSON文件并返回解析后的数据
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: JSON文件路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 解析后的JSON数据
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
+ return json.load(f)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"读取JSON文件时出错: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+def save_json(data, file_path):
|
|
|
+ """
|
|
|
+ 保存数据到JSON文件
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data: 要保存的数据
|
|
|
+ file_path: 保存路径
|
|
|
+ """
|
|
|
+ with open(file_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+
|
|
|
+def get_script_data(file_path):
|
|
|
+ """
|
|
|
+ 读取JSON文件并返回解析后的数据
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: JSON文件路径
|
|
|
+ """
|
|
|
+ return read_json(file_path)['脚本']
|
|
|
+
|
|
|
+import os
|
|
|
+import xml.etree.ElementTree as ET
|
|
|
+from typing import Dict, List, Any
|
|
|
+import re
|
|
|
+import unicodedata
|
|
|
+
|
|
|
+
|
|
|
+def get_model(model_name):
|
|
|
+ # return 'gemini/gemini-2.5-flash'
|
|
|
+ # return 'litellm/gemini/gemini-2.5-flash'
|
|
|
+ if model_name.startswith('litellm'):
|
|
|
+ return model_name
|
|
|
+ else:
|
|
|
+ from openai import AsyncOpenAI
|
|
|
+ from agents import OpenAIChatCompletionsModel
|
|
|
+ BASE_URL = os.getenv("EXAMPLE_BASE_URL") or "https://openrouter.ai/api/v1"
|
|
|
+ API_KEY = os.getenv("OPENROUTER_API_KEY") or ""
|
|
|
+ client = AsyncOpenAI(
|
|
|
+ base_url=BASE_URL,
|
|
|
+ api_key=API_KEY,
|
|
|
+ )
|
|
|
+ return OpenAIChatCompletionsModel(
|
|
|
+ # model='google/gemini-2.5-pro-preview',
|
|
|
+ # model='google/gemini-2.5-flash-preview-05-20',
|
|
|
+ # model='google/gemini-2.5-flash-preview-05-20',
|
|
|
+ # model='google/gemini-2.5-flash',
|
|
|
+ # model='google/gemini-2.5-flash',
|
|
|
+ # model='google/gemini-2.5-flash-preview-05-20:thinking',
|
|
|
+ # model='google/gemini-2.0-flash-001',
|
|
|
+ model=model_name,
|
|
|
+ openai_client=client,
|
|
|
+ )
|
|
|
+
|
|
|
+def read_file_as_string(file_path):
|
|
|
+ """读取文件内容并返回字符串"""
|
|
|
+ try:
|
|
|
+ with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
+ content = file.read().strip()
|
|
|
+ return content
|
|
|
+ except Exception as e:
|
|
|
+ print(f"读取文件时出错: {e}")
|
|
|
+ return None
|
|
|
+def save_file_as_string(file_path, content):
|
|
|
+ """将字符串内容写入文件"""
|
|
|
+ with open(file_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(content)
|
|
|
+
|
|
|
+def extract_html_from_markdown(text):
|
|
|
+ """
|
|
|
+ 从可能包含markdown或其他代码块的文本中提取HTML内容
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ text: 可能包含各种格式的文本
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 提取出的纯HTML内容
|
|
|
+ """
|
|
|
+ # 处理```html```格式(反引号)
|
|
|
+ backtick_pattern = r"```(?:html)?\s*([\s\S]*?)```"
|
|
|
+ backtick_matches = re.findall(backtick_pattern, text)
|
|
|
+
|
|
|
+ # 处理'''html'''格式(单引号)
|
|
|
+ single_quote_pattern = r"'''(?:html)?\s*([\s\S]*?)'''"
|
|
|
+ single_quote_matches = re.findall(single_quote_pattern, text)
|
|
|
+
|
|
|
+ # 处理"""html"""格式(双引号)
|
|
|
+ double_quote_pattern = r'"""(?:html)?\s*([\s\S]*?)"""'
|
|
|
+ double_quote_matches = re.findall(double_quote_pattern, text)
|
|
|
+
|
|
|
+ if backtick_matches:
|
|
|
+ # 优先使用反引号格式
|
|
|
+ return backtick_matches[0].strip()
|
|
|
+ elif single_quote_matches:
|
|
|
+ # 其次使用单引号格式
|
|
|
+ return single_quote_matches[0].strip()
|
|
|
+ elif double_quote_matches:
|
|
|
+ # 再次使用双引号格式
|
|
|
+ return double_quote_matches[0].strip()
|
|
|
+ else:
|
|
|
+ # 如果没有代码块格式,直接返回原get_current_time始文本
|
|
|
+ return text
|
|
|
+
|
|
|
+def create_workspace_dir(current_time=None, make_dir=True):
|
|
|
+ if not current_time:
|
|
|
+ current_time = get_current_time()
|
|
|
+ task_dir = f"result/{current_time}"
|
|
|
+ if make_dir:
|
|
|
+ os.makedirs(task_dir, exist_ok=True)
|
|
|
+ task_dir_absolute = os.path.abspath(task_dir)
|
|
|
+ # print(f"任务目录的绝对路径: {task_dir_absolute}")
|
|
|
+ return task_dir_absolute, str(current_time)
|
|
|
+
|
|
|
+
|
|
|
+def extract_tag_content(text, tag_name):
|
|
|
+ """
|
|
|
+ 从文本中提取指定标签内的内容
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ text (str): 要处理的文本
|
|
|
+ tag_name (str): 要提取的标签名称
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ str: 标签内的内容,如果未找到则返回空字符串
|
|
|
+ """
|
|
|
+ import re
|
|
|
+ pattern = f"<{tag_name}>(.*?)</{tag_name}>"
|
|
|
+ match = re.search(pattern, text, re.DOTALL)
|
|
|
+ if match:
|
|
|
+ return match.group(1).strip()
|
|
|
+ return ""
|
|
|
+
|
|
|
+from typing import Dict, List, Optional
|
|
|
+def parse_tasks(tasks_xml: str) -> List[Dict]:
|
|
|
+ """Parse XML tasks into a list of task dictionaries."""
|
|
|
+ tasks = []
|
|
|
+ current_task = {}
|
|
|
+
|
|
|
+ for line in tasks_xml.split('\n'):
|
|
|
+ line = line.strip()
|
|
|
+ if not line:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if line.startswith("<task>"):
|
|
|
+ current_task = {}
|
|
|
+ elif line.startswith("<name>"):
|
|
|
+ current_task["name"] = line[6:-7].strip()
|
|
|
+ elif line.startswith("<output>"):
|
|
|
+ current_task["output"] = line[12:-13].strip()
|
|
|
+ elif line.startswith("</task>"):
|
|
|
+ if "description" in current_task:
|
|
|
+ if "type" not in current_task:
|
|
|
+ current_task["type"] = "default"
|
|
|
+ tasks.append(current_task)
|
|
|
+
|
|
|
+ return tasks
|
|
|
+
|
|
|
+
|
|
|
+def parse_xml_content(xml_string: str) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 将XML字符串解析成字典,提取main_task、thoughts、tasks和resources
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ xml_string: 包含任务信息的XML字符串
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 包含所有解析信息的字典
|
|
|
+ """
|
|
|
+ # 创建结果字典
|
|
|
+ result = {
|
|
|
+ "main_task": {},
|
|
|
+ "thoughts": "",
|
|
|
+ "tasks": [],
|
|
|
+ "resources": []
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 提取thoughts内容
|
|
|
+ thoughts_match = re.search(r'<thoughts>(.*?)</thoughts>', xml_string, re.DOTALL)
|
|
|
+ if thoughts_match:
|
|
|
+ result["thoughts"] = thoughts_match.group(1).strip()
|
|
|
+
|
|
|
+ # 提取main_task内容
|
|
|
+ main_task_match = re.search(r'<main_task>(.*?)</main_task>', xml_string, re.DOTALL)
|
|
|
+ if main_task_match:
|
|
|
+ main_task_content = main_task_match.group(1)
|
|
|
+ main_task = {}
|
|
|
+
|
|
|
+ # 获取主任务名称
|
|
|
+ name_match = re.search(r'<name>(.*?)</name>', main_task_content, re.DOTALL)
|
|
|
+ if name_match:
|
|
|
+ main_task['name'] = name_match.group(1).strip()
|
|
|
+
|
|
|
+ # 获取主任务输出
|
|
|
+ output_match = re.search(r'<output>(.*?)</output>', main_task_content, re.DOTALL)
|
|
|
+ if output_match:
|
|
|
+ main_task['output'] = output_match.group(1).strip()
|
|
|
+
|
|
|
+ # 获取主任务描述
|
|
|
+ description_match = re.search(r'<description>(.*?)</description>', main_task_content, re.DOTALL)
|
|
|
+ if description_match:
|
|
|
+ main_task['description'] = description_match.group(1).strip()
|
|
|
+
|
|
|
+ result["main_task"] = main_task
|
|
|
+
|
|
|
+ # 提取<tasks>...</tasks>部分
|
|
|
+ tasks_pattern = re.compile(r'<tasks>(.*?)</tasks>', re.DOTALL)
|
|
|
+ tasks_match = tasks_pattern.search(xml_string)
|
|
|
+
|
|
|
+ if tasks_match:
|
|
|
+ tasks_content = tasks_match.group(1)
|
|
|
+
|
|
|
+ # 提取每个task块
|
|
|
+ task_pattern = re.compile(r'<task>(.*?)</task>', re.DOTALL)
|
|
|
+ task_matches = task_pattern.finditer(tasks_content)
|
|
|
+
|
|
|
+ for task_match in task_matches:
|
|
|
+ task_content = task_match.group(1)
|
|
|
+ task_dict = {}
|
|
|
+
|
|
|
+ # 获取任务名称
|
|
|
+ name_match = re.search(r'<name>(.*?)</name>', task_content, re.DOTALL)
|
|
|
+ if not name_match:
|
|
|
+ continue # 跳过没有名称的任务
|
|
|
+
|
|
|
+ name = name_match.group(1).strip()
|
|
|
+ task_dict['name'] = name
|
|
|
+ # 获取输出信息
|
|
|
+ output_match = re.search(r'<output>(.*?)</output>', task_content, re.DOTALL)
|
|
|
+ task_dict['output'] = output_match.group(1).strip() if output_match else ""
|
|
|
+
|
|
|
+ # 获取描述信息
|
|
|
+ description_match = re.search(r'<description>(.*?)</description>', task_content, re.DOTALL)
|
|
|
+ task_dict['description'] = description_match.group(1).strip() if description_match else ""
|
|
|
+
|
|
|
+ # 获取依赖任务列表
|
|
|
+ depend_tasks = []
|
|
|
+ depend_tasks_section = re.search(r'<depend_tasks>(.*?)</depend_tasks>', task_content, re.DOTALL)
|
|
|
+ if depend_tasks_section:
|
|
|
+ depend_task_matches = re.finditer(r'<depend_task>(.*?)</depend_task>',
|
|
|
+ depend_tasks_section.group(1), re.DOTALL)
|
|
|
+ for dt_match in depend_task_matches:
|
|
|
+ if dt_match.group(1).strip():
|
|
|
+ depend_tasks.append(dt_match.group(1).strip())
|
|
|
+
|
|
|
+ task_dict['depend_tasks'] = depend_tasks
|
|
|
+
|
|
|
+ # 获取依赖资源列表
|
|
|
+ depend_resources = []
|
|
|
+ resources_match = re.search(r'<depend_resources>(.*?)</depend_resources>', task_content, re.DOTALL)
|
|
|
+ if resources_match and resources_match.group(1).strip():
|
|
|
+ resources_text = resources_match.group(1).strip()
|
|
|
+ depend_resources = [res.strip() for res in resources_text.split(',') if res.strip()]
|
|
|
+
|
|
|
+ task_dict['depend_resources'] = depend_resources
|
|
|
+
|
|
|
+ # 将任务添加到结果字典
|
|
|
+ result["tasks"].append(task_dict)
|
|
|
+
|
|
|
+ # 提取resources内容
|
|
|
+ resources_pattern = re.compile(r'<resources>(.*?)</resources>', re.DOTALL)
|
|
|
+ resources_match = resources_pattern.search(xml_string)
|
|
|
+
|
|
|
+ if resources_match:
|
|
|
+ resources_content = resources_match.group(1).strip()
|
|
|
+ result["resources"] = resources_content
|
|
|
+ return result
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ raise ValueError(f"处理XML数据时发生错误: {e}")
|
|
|
+
|
|
|
+
|
|
|
+def parse_planner_result(result):
|
|
|
+ """
|
|
|
+ 解析规划结果,并为每个任务添加任务目录名
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ result: 包含thoughts、main_task、tasks和resources的规划结果字符串
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 解析后的完整规划信息字典
|
|
|
+ """
|
|
|
+ # 使用parse_xml_content解析完整内容
|
|
|
+ parsed_result = parse_xml_content(result)
|
|
|
+ task_name_to_index = {}
|
|
|
+ task_dict = {
|
|
|
+ 'tasks': {},
|
|
|
+ 'max_index': 1,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 为每个任务添加task_dir字段
|
|
|
+ for i, task_info in enumerate(parsed_result["tasks"]):
|
|
|
+ # 使用sanitize_filename生成目录名
|
|
|
+ task_name = task_info.get("name", "task")
|
|
|
+ depend_tasks_dir = []
|
|
|
+ task_info['task_dir'] = get_task_dir(task_name, task_dict)
|
|
|
+ for depend_task in task_info.get("depend_tasks", []):
|
|
|
+ depend_tasks_dir.append(get_task_dir(depend_task, task_dict))
|
|
|
+ task_info['depend_tasks_dir'] = depend_tasks_dir
|
|
|
+ task_info['status'] = 'todo' # 任务状态,todo: 未开始,doing: 进行中,success: 已完成,fail: 失败
|
|
|
+ task_name_to_index[task_name] = i
|
|
|
+
|
|
|
+ # 为主任务也添加task_dir字段
|
|
|
+ if parsed_result["main_task"]:
|
|
|
+ main_task_name = parsed_result["main_task"].get("name", "main_task")
|
|
|
+ parsed_result["main_task"]["task_dir"] = sanitize_filename(main_task_name)
|
|
|
+
|
|
|
+ return parsed_result, task_name_to_index
|
|
|
+def get_task_dir(task_name, task_dict, append_index=True):
|
|
|
+ max_index = task_dict.get('max_index', 1)
|
|
|
+ if task_name in task_dict['tasks']:
|
|
|
+ return task_dict['tasks'][task_name]
|
|
|
+ max_index_str = f"{max_index:02d}"
|
|
|
+ task_dir_raw = sanitize_filename(task_name)
|
|
|
+ if append_index:
|
|
|
+ task_dir = f"{max_index_str}_{task_dir_raw}"
|
|
|
+ else:
|
|
|
+ task_dir = task_dir_raw
|
|
|
+ task_dict['tasks'][task_name] = task_dir
|
|
|
+ task_dict['max_index'] = max_index + 1
|
|
|
+ return task_dir
|
|
|
+
|
|
|
+def sanitize_filename(task_name: str, max_length: int = 20) -> str:
|
|
|
+ """
|
|
|
+ 将任务名称转换为适合作为文件夹名称的字符串
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ task_name: 需要转换的任务名称
|
|
|
+ max_length: 文件名最大长度限制,默认80个字符
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 处理后适合作为文件名/文件夹名的字符串
|
|
|
+ """
|
|
|
+ # 替换Windows和Unix系统中不允许的文件名字符
|
|
|
+ # 替换 / \ : * ? " < > | 等字符为下划线
|
|
|
+ sanitized = re.sub(r'[\\/*?:"<>|]', '_', task_name)
|
|
|
+
|
|
|
+ # 替换连续的空白字符为单个下划线
|
|
|
+ sanitized = re.sub(r'\s+', '_', sanitized)
|
|
|
+
|
|
|
+ # 移除开头和结尾的点和空格
|
|
|
+ sanitized = sanitized.strip('. ')
|
|
|
+
|
|
|
+ # 如果名称过长,截断它
|
|
|
+ if len(sanitized) > max_length:
|
|
|
+ # 保留前面的部分和后面的部分,中间用...连接
|
|
|
+ half_length = (max_length - 3) // 2
|
|
|
+ sanitized = sanitized[:half_length] + '...' + sanitized[-half_length:]
|
|
|
+
|
|
|
+ # 确保名称不为空
|
|
|
+ if not sanitized:
|
|
|
+ sanitized = "unnamed_task"
|
|
|
+
|
|
|
+ return sanitized
|
|
|
+
|
|
|
+def write_json(data, file_path: str) -> None:
|
|
|
+ """
|
|
|
+ 将数据写入JSON文件
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ data: 要写入的数据对象
|
|
|
+ file_path: 目标文件路径
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 无
|
|
|
+ """
|
|
|
+ import json
|
|
|
+ with open(file_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
+def write_string_to_file(content: str, file_path: str) -> None:
|
|
|
+ """
|
|
|
+ 将字符串内容写入文件
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ content: 要写入的字符串内容
|
|
|
+ file_path: 目标文件路径
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 无
|
|
|
+ """
|
|
|
+ with open(file_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(content)
|
|
|
+
|
|
|
+def pretty_process(result):
|
|
|
+ def format_output(in_str):
|
|
|
+ return in_str.replace('\n\n', '\n').replace('\\"', '"')
|
|
|
+ process_list = []
|
|
|
+ i = 0
|
|
|
+ call_dict = {}
|
|
|
+
|
|
|
+ # 首先收集所有工具调用输出
|
|
|
+ for row in result:
|
|
|
+ if isinstance(row, list):
|
|
|
+ # 处理列表:递归处理列表中的每个项目
|
|
|
+ for item in row:
|
|
|
+ if isinstance(item, dict) and item.get('type', '') == 'function_call_output':
|
|
|
+ call_id = item['call_id']
|
|
|
+ call_dict[call_id] = item['output']
|
|
|
+ elif isinstance(row, dict) and row.get('type', '') == 'function_call_output':
|
|
|
+ call_id = row['call_id']
|
|
|
+ call_dict[call_id] = row['output']
|
|
|
+
|
|
|
+ # 然后处理每一行
|
|
|
+ for row in result:
|
|
|
+ if isinstance(row, list):
|
|
|
+ # 递归处理列表中的每个项目
|
|
|
+ for item in row:
|
|
|
+ if isinstance(item, dict):
|
|
|
+ process_row(item, process_list, call_dict, i)
|
|
|
+ i += 1
|
|
|
+ else:
|
|
|
+ # 直接处理字典项
|
|
|
+ process_row(row, process_list, call_dict, i)
|
|
|
+ i += 1
|
|
|
+
|
|
|
+ process_str = '\n'.join(process_list)
|
|
|
+ return process_str
|
|
|
+
|
|
|
+def process_row(row, process_list, call_dict, i):
|
|
|
+ """处理单个行项目,添加到处理列表中"""
|
|
|
+ def format_output(in_str):
|
|
|
+ return in_str.replace('\n\n', '\n').replace('\\"', '"')
|
|
|
+
|
|
|
+ if not isinstance(row, dict):
|
|
|
+ return
|
|
|
+
|
|
|
+ action = ''
|
|
|
+ out = ''
|
|
|
+ call_id = ''
|
|
|
+ role_ = row.get('role', '')
|
|
|
+ type_ = row.get('type', '')
|
|
|
+
|
|
|
+ if type_ == 'function_call':
|
|
|
+ action = f'工具调用-{row.get("name")}'
|
|
|
+ out = row['arguments']
|
|
|
+ call_id = row['call_id']
|
|
|
+ elif type_ == 'function_call_output':
|
|
|
+ return # 跳过函数调用输出,它们已经被收集到call_dict中
|
|
|
+ elif role_ in ('user', 'assistant'):
|
|
|
+ action = role_
|
|
|
+ if isinstance(row['content'], str):
|
|
|
+ out = row['content']
|
|
|
+ else:
|
|
|
+ content_text = ""
|
|
|
+ for this_c in row['content']:
|
|
|
+ if isinstance(this_c, dict) and 'text' in this_c:
|
|
|
+ content_text += this_c['text']
|
|
|
+ out = content_text
|
|
|
+
|
|
|
+ process_list.append('\n\n' + f'{i+1}. ' + '## ' + action + ' ' * 4 + '-' * 32 + '\n')
|
|
|
+ process_list.append(format_output(str(out)))
|
|
|
+
|
|
|
+ # 如果存在对应的工具输出,添加它
|
|
|
+ if call_id and call_id in call_dict:
|
|
|
+ process_list.append('\n\n' + f'{i+2}. ' + '## ' + '工具输出' + ' ' * 4 + '-' * 32 + '\n')
|
|
|
+ process_list.append(format_output(call_dict[call_id]))
|
|
|
+
|