utils.py 21 KB


  1. from typing import List, Dict, Any
  2. import json
  3. from .my_trace import get_current_time
  4. import re
  5. import uuid
  6. import datetime
  7. def parse_json_from_text(text: str) -> dict:
  8. """
  9. 从文本中解析JSON,支持多种格式的JSON代码块
  10. Args:
  11. text (str): 包含JSON的文本
  12. Returns:
  13. dict: 解析后的JSON数据,解析失败返回空字典
  14. """
  15. if not text or not isinstance(text, str):
  16. return {}
  17. # 去除首尾空白字符
  18. text = text.strip()
  19. # 定义可能的JSON代码块标记
  20. json_markers = [
  21. ("'''json", "'''"),
  22. ('"""json', '"""'),
  23. ("```json", "```"),
  24. ("```", "```")
  25. ]
  26. # 尝试提取JSON代码块
  27. json_content = text
  28. for start_marker, end_marker in json_markers:
  29. if text.startswith(start_marker):
  30. # 找到开始标记,查找结束标记
  31. start_pos = len(start_marker)
  32. end_pos = text.find(end_marker, start_pos)
  33. if end_pos != -1:
  34. json_content = text[start_pos:end_pos].strip()
  35. break
  36. # 如果没有找到代码块标记,检查是否以结束标记结尾并移除
  37. if json_content == text:
  38. for _, end_marker in json_markers:
  39. if text.endswith(end_marker):
  40. json_content = text[:-len(end_marker)].strip()
  41. break
  42. # 尝试解析JSON
  43. try:
  44. return json.loads(json_content)
  45. except json.JSONDecodeError as e:
  46. # 打印详细的解析失败信息
  47. print(f"JSON解析失败: {e}")
  48. print(f"原始文本长度: {len(text)}")
  49. print(f"提取的JSON内容长度: {len(json_content)}")
  50. print(f"原始文本内容预览 (前500字符):\n{text[:500]}")
  51. print(f"提取的JSON内容预览 (前500字符):\n{json_content[:500]}")
  52. print("-" * 80)
  53. # 如果直接解析失败,尝试查找第一个{到最后一个}的内容
  54. try:
  55. first_brace = json_content.find('{')
  56. last_brace = json_content.rfind('}')
  57. if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
  58. json_part = json_content[first_brace:last_brace + 1]
  59. return json.loads(json_part)
  60. except json.JSONDecodeError as e2:
  61. print(f"二次解析也失败: {e2}")
  62. if first_brace != -1 and last_brace != -1:
  63. print(f"尝试解析的内容:\n{json_part[:500]}")
  64. return {}
  65. def get_safe_filename(filename: str) -> str:
  66. """
  67. 生成安全的文件名,移除不安全字符
  68. Args:
  69. filename: 原始文件名
  70. Returns:
  71. str: 安全的文件名
  72. """
  73. # 移除不安全的字符,只保留字母、数字、下划线、连字符和点
  74. return re.sub(r'[^\w\-\./]', '_', filename)
  75. def generate_image_filename(mime_type: str, prefix: str = "gemini_img") -> str:
  76. """
  77. 生成合理的图片文件名
  78. Args:
  79. mime_type: 文件MIME类型
  80. prefix: 文件名前缀
  81. Returns:
  82. str: 生成的文件名
  83. """
  84. # 获取当前时间戳
  85. timestamp = datetime.datetime.now().strftime("%Y%m%d/%H%M%S")
  86. # 获取文件扩展名
  87. extension = mime_type.split('/')[-1]
  88. if extension == "jpeg":
  89. extension = "jpg"
  90. # 生成唯一ID (短UUID)
  91. unique_id = str(uuid.uuid4())[:4]
  92. # 组合文件名
  93. filename = f"{prefix}/{timestamp}_{unique_id}.{extension}"
  94. # 确保文件名安全
  95. return get_safe_filename(filename)
  96. def parse_multimodal_content(content: str) -> List[Dict[str, Any]]:
  97. """解析多模态内容,保持上下文顺序,适用于AI参数传递 """
  98. result = []
  99. lines = content.split('\n')
  100. role = ''
  101. for line in lines:
  102. line = line.strip()
  103. if not line:
  104. continue
  105. # 分割前缀和内容
  106. if ':' in line:
  107. prefix, content = line.split(':', 1)
  108. prefix = prefix.strip().lower()
  109. content = content.strip()
  110. row = {}
  111. if prefix == 'image':
  112. row = {
  113. "type": "image_url",
  114. "image_url": {
  115. "url": content
  116. }
  117. }
  118. elif prefix == 'text':
  119. row = {
  120. "type": "text",
  121. "text": content
  122. }
  123. elif prefix == 'role':
  124. role = content
  125. if row:
  126. if role:
  127. row['role'] = role
  128. role = ''
  129. result.append(row)
  130. return result
  131. def read_json(file_path):
  132. """
  133. 读取JSON文件并返回解析后的数据
  134. Args:
  135. file_path: JSON文件路径
  136. Returns:
  137. 解析后的JSON数据
  138. """
  139. try:
  140. with open(file_path, 'r', encoding='utf-8') as f:
  141. return json.load(f)
  142. except Exception as e:
  143. print(f"读取JSON文件时出错: {e}")
  144. return None
  145. def save_json(data, file_path):
  146. """
  147. 保存数据到JSON文件
  148. Args:
  149. data: 要保存的数据
  150. file_path: 保存路径
  151. """
  152. with open(file_path, 'w', encoding='utf-8') as f:
  153. json.dump(data, f, ensure_ascii=False, indent=2)
  154. def get_script_data(file_path):
  155. """
  156. 读取JSON文件并返回解析后的数据
  157. Args:
  158. file_path: JSON文件路径
  159. """
  160. return read_json(file_path)['脚本']
  161. import os
  162. import xml.etree.ElementTree as ET
  163. from typing import Dict, List, Any
  164. import re
  165. import unicodedata
  166. def get_model(model_name):
  167. # return 'gemini/gemini-2.5-flash'
  168. # return 'litellm/gemini/gemini-2.5-flash'
  169. if model_name.startswith('litellm'):
  170. return model_name
  171. else:
  172. from openai import AsyncOpenAI
  173. from agents import OpenAIChatCompletionsModel
  174. BASE_URL = os.getenv("EXAMPLE_BASE_URL") or "https://openrouter.ai/api/v1"
  175. API_KEY = os.getenv("OPENROUTER_API_KEY") or ""
  176. client = AsyncOpenAI(
  177. base_url=BASE_URL,
  178. api_key=API_KEY,
  179. )
  180. return OpenAIChatCompletionsModel(
  181. # model='google/gemini-2.5-pro-preview',
  182. # model='google/gemini-2.5-flash-preview-05-20',
  183. # model='google/gemini-2.5-flash-preview-05-20',
  184. # model='google/gemini-2.5-flash',
  185. # model='google/gemini-2.5-flash',
  186. # model='google/gemini-2.5-flash-preview-05-20:thinking',
  187. # model='google/gemini-2.0-flash-001',
  188. model=model_name,
  189. openai_client=client,
  190. )
  191. def read_file_as_string(file_path):
  192. """读取文件内容并返回字符串"""
  193. try:
  194. with open(file_path, 'r', encoding='utf-8') as file:
  195. content = file.read().strip()
  196. return content
  197. except Exception as e:
  198. print(f"读取文件时出错: {e}")
  199. return None
  200. def save_file_as_string(file_path, content):
  201. """将字符串内容写入文件"""
  202. with open(file_path, 'w', encoding='utf-8') as f:
  203. f.write(content)
  204. def extract_html_from_markdown(text):
  205. """
  206. 从可能包含markdown或其他代码块的文本中提取HTML内容
  207. 参数:
  208. text: 可能包含各种格式的文本
  209. 返回:
  210. 提取出的纯HTML内容
  211. """
  212. # 处理```html```格式(反引号)
  213. backtick_pattern = r"```(?:html)?\s*([\s\S]*?)```"
  214. backtick_matches = re.findall(backtick_pattern, text)
  215. # 处理'''html'''格式(单引号)
  216. single_quote_pattern = r"'''(?:html)?\s*([\s\S]*?)'''"
  217. single_quote_matches = re.findall(single_quote_pattern, text)
  218. # 处理"""html"""格式(双引号)
  219. double_quote_pattern = r'"""(?:html)?\s*([\s\S]*?)"""'
  220. double_quote_matches = re.findall(double_quote_pattern, text)
  221. if backtick_matches:
  222. # 优先使用反引号格式
  223. return backtick_matches[0].strip()
  224. elif single_quote_matches:
  225. # 其次使用单引号格式
  226. return single_quote_matches[0].strip()
  227. elif double_quote_matches:
  228. # 再次使用双引号格式
  229. return double_quote_matches[0].strip()
  230. else:
  231. # 如果没有代码块格式,直接返回原get_current_time始文本
  232. return text
  233. def create_workspace_dir(current_time=None, make_dir=True):
  234. if not current_time:
  235. current_time = get_current_time()
  236. task_dir = f"result/{current_time}"
  237. if make_dir:
  238. os.makedirs(task_dir, exist_ok=True)
  239. task_dir_absolute = os.path.abspath(task_dir)
  240. # print(f"任务目录的绝对路径: {task_dir_absolute}")
  241. return task_dir_absolute, str(current_time)
  242. def extract_tag_content(text, tag_name):
  243. """
  244. 从文本中提取指定标签内的内容
  245. 参数:
  246. text (str): 要处理的文本
  247. tag_name (str): 要提取的标签名称
  248. 返回:
  249. str: 标签内的内容,如果未找到则返回空字符串
  250. """
  251. import re
  252. pattern = f"<{tag_name}>(.*?)</{tag_name}>"
  253. match = re.search(pattern, text, re.DOTALL)
  254. if match:
  255. return match.group(1).strip()
  256. return ""
  257. from typing import Dict, List, Optional
  258. def parse_tasks(tasks_xml: str) -> List[Dict]:
  259. """Parse XML tasks into a list of task dictionaries."""
  260. tasks = []
  261. current_task = {}
  262. for line in tasks_xml.split('\n'):
  263. line = line.strip()
  264. if not line:
  265. continue
  266. if line.startswith("<task>"):
  267. current_task = {}
  268. elif line.startswith("<name>"):
  269. current_task["name"] = line[6:-7].strip()
  270. elif line.startswith("<output>"):
  271. current_task["output"] = line[12:-13].strip()
  272. elif line.startswith("</task>"):
  273. if "description" in current_task:
  274. if "type" not in current_task:
  275. current_task["type"] = "default"
  276. tasks.append(current_task)
  277. return tasks
  278. def parse_xml_content(xml_string: str) -> Dict[str, Any]:
  279. """
  280. 将XML字符串解析成字典,提取main_task、thoughts、tasks和resources
  281. 参数:
  282. xml_string: 包含任务信息的XML字符串
  283. 返回:
  284. 包含所有解析信息的字典
  285. """
  286. # 创建结果字典
  287. result = {
  288. "main_task": {},
  289. "thoughts": "",
  290. "tasks": [],
  291. "resources": []
  292. }
  293. try:
  294. # 提取thoughts内容
  295. thoughts_match = re.search(r'<thoughts>(.*?)</thoughts>', xml_string, re.DOTALL)
  296. if thoughts_match:
  297. result["thoughts"] = thoughts_match.group(1).strip()
  298. # 提取main_task内容
  299. main_task_match = re.search(r'<main_task>(.*?)</main_task>', xml_string, re.DOTALL)
  300. if main_task_match:
  301. main_task_content = main_task_match.group(1)
  302. main_task = {}
  303. # 获取主任务名称
  304. name_match = re.search(r'<name>(.*?)</name>', main_task_content, re.DOTALL)
  305. if name_match:
  306. main_task['name'] = name_match.group(1).strip()
  307. # 获取主任务输出
  308. output_match = re.search(r'<output>(.*?)</output>', main_task_content, re.DOTALL)
  309. if output_match:
  310. main_task['output'] = output_match.group(1).strip()
  311. # 获取主任务描述
  312. description_match = re.search(r'<description>(.*?)</description>', main_task_content, re.DOTALL)
  313. if description_match:
  314. main_task['description'] = description_match.group(1).strip()
  315. result["main_task"] = main_task
  316. # 提取<tasks>...</tasks>部分
  317. tasks_pattern = re.compile(r'<tasks>(.*?)</tasks>', re.DOTALL)
  318. tasks_match = tasks_pattern.search(xml_string)
  319. if tasks_match:
  320. tasks_content = tasks_match.group(1)
  321. # 提取每个task块
  322. task_pattern = re.compile(r'<task>(.*?)</task>', re.DOTALL)
  323. task_matches = task_pattern.finditer(tasks_content)
  324. for task_match in task_matches:
  325. task_content = task_match.group(1)
  326. task_dict = {}
  327. # 获取任务名称
  328. name_match = re.search(r'<name>(.*?)</name>', task_content, re.DOTALL)
  329. if not name_match:
  330. continue # 跳过没有名称的任务
  331. name = name_match.group(1).strip()
  332. task_dict['name'] = name
  333. # 获取输出信息
  334. output_match = re.search(r'<output>(.*?)</output>', task_content, re.DOTALL)
  335. task_dict['output'] = output_match.group(1).strip() if output_match else ""
  336. # 获取描述信息
  337. description_match = re.search(r'<description>(.*?)</description>', task_content, re.DOTALL)
  338. task_dict['description'] = description_match.group(1).strip() if description_match else ""
  339. # 获取依赖任务列表
  340. depend_tasks = []
  341. depend_tasks_section = re.search(r'<depend_tasks>(.*?)</depend_tasks>', task_content, re.DOTALL)
  342. if depend_tasks_section:
  343. depend_task_matches = re.finditer(r'<depend_task>(.*?)</depend_task>',
  344. depend_tasks_section.group(1), re.DOTALL)
  345. for dt_match in depend_task_matches:
  346. if dt_match.group(1).strip():
  347. depend_tasks.append(dt_match.group(1).strip())
  348. task_dict['depend_tasks'] = depend_tasks
  349. # 获取依赖资源列表
  350. depend_resources = []
  351. resources_match = re.search(r'<depend_resources>(.*?)</depend_resources>', task_content, re.DOTALL)
  352. if resources_match and resources_match.group(1).strip():
  353. resources_text = resources_match.group(1).strip()
  354. depend_resources = [res.strip() for res in resources_text.split(',') if res.strip()]
  355. task_dict['depend_resources'] = depend_resources
  356. # 将任务添加到结果字典
  357. result["tasks"].append(task_dict)
  358. # 提取resources内容
  359. resources_pattern = re.compile(r'<resources>(.*?)</resources>', re.DOTALL)
  360. resources_match = resources_pattern.search(xml_string)
  361. if resources_match:
  362. resources_content = resources_match.group(1).strip()
  363. result["resources"] = resources_content
  364. return result
  365. except Exception as e:
  366. raise ValueError(f"处理XML数据时发生错误: {e}")
  367. def parse_planner_result(result):
  368. """
  369. 解析规划结果,并为每个任务添加任务目录名
  370. 参数:
  371. result: 包含thoughts、main_task、tasks和resources的规划结果字符串
  372. 返回:
  373. 解析后的完整规划信息字典
  374. """
  375. # 使用parse_xml_content解析完整内容
  376. parsed_result = parse_xml_content(result)
  377. task_name_to_index = {}
  378. task_dict = {
  379. 'tasks': {},
  380. 'max_index': 1,
  381. }
  382. # 为每个任务添加task_dir字段
  383. for i, task_info in enumerate(parsed_result["tasks"]):
  384. # 使用sanitize_filename生成目录名
  385. task_name = task_info.get("name", "task")
  386. depend_tasks_dir = []
  387. task_info['task_dir'] = get_task_dir(task_name, task_dict)
  388. for depend_task in task_info.get("depend_tasks", []):
  389. depend_tasks_dir.append(get_task_dir(depend_task, task_dict))
  390. task_info['depend_tasks_dir'] = depend_tasks_dir
  391. task_info['status'] = 'todo' # 任务状态,todo: 未开始,doing: 进行中,success: 已完成,fail: 失败
  392. task_name_to_index[task_name] = i
  393. # 为主任务也添加task_dir字段
  394. if parsed_result["main_task"]:
  395. main_task_name = parsed_result["main_task"].get("name", "main_task")
  396. parsed_result["main_task"]["task_dir"] = sanitize_filename(main_task_name)
  397. return parsed_result, task_name_to_index
  398. def get_task_dir(task_name, task_dict, append_index=True):
  399. max_index = task_dict.get('max_index', 1)
  400. if task_name in task_dict['tasks']:
  401. return task_dict['tasks'][task_name]
  402. max_index_str = f"{max_index:02d}"
  403. task_dir_raw = sanitize_filename(task_name)
  404. if append_index:
  405. task_dir = f"{max_index_str}_{task_dir_raw}"
  406. else:
  407. task_dir = task_dir_raw
  408. task_dict['tasks'][task_name] = task_dir
  409. task_dict['max_index'] = max_index + 1
  410. return task_dir
  411. def sanitize_filename(task_name: str, max_length: int = 20) -> str:
  412. """
  413. 将任务名称转换为适合作为文件夹名称的字符串
  414. 参数:
  415. task_name: 需要转换的任务名称
  416. max_length: 文件名最大长度限制,默认80个字符
  417. 返回:
  418. 处理后适合作为文件名/文件夹名的字符串
  419. """
  420. # 替换Windows和Unix系统中不允许的文件名字符
  421. # 替换 / \ : * ? " < > | 等字符为下划线
  422. sanitized = re.sub(r'[\\/*?:"<>|]', '_', task_name)
  423. # 替换连续的空白字符为单个下划线
  424. sanitized = re.sub(r'\s+', '_', sanitized)
  425. # 移除开头和结尾的点和空格
  426. sanitized = sanitized.strip('. ')
  427. # 如果名称过长,截断它
  428. if len(sanitized) > max_length:
  429. # 保留前面的部分和后面的部分,中间用...连接
  430. half_length = (max_length - 3) // 2
  431. sanitized = sanitized[:half_length] + '...' + sanitized[-half_length:]
  432. # 确保名称不为空
  433. if not sanitized:
  434. sanitized = "unnamed_task"
  435. return sanitized
  436. def write_json(data, file_path: str) -> None:
  437. """
  438. 将数据写入JSON文件
  439. 参数:
  440. data: 要写入的数据对象
  441. file_path: 目标文件路径
  442. 返回:
  443. """
  444. import json
  445. with open(file_path, 'w', encoding='utf-8') as f:
  446. json.dump(data, f, ensure_ascii=False, indent=2)
  447. def write_string_to_file(content: str, file_path: str) -> None:
  448. """
  449. 将字符串内容写入文件
  450. 参数:
  451. content: 要写入的字符串内容
  452. file_path: 目标文件路径
  453. 返回:
  454. """
  455. with open(file_path, 'w', encoding='utf-8') as f:
  456. f.write(content)
  457. def pretty_process(result):
  458. def format_output(in_str):
  459. return in_str.replace('\n\n', '\n').replace('\\"', '"')
  460. process_list = []
  461. i = 0
  462. call_dict = {}
  463. # 首先收集所有工具调用输出
  464. for row in result:
  465. if isinstance(row, list):
  466. # 处理列表:递归处理列表中的每个项目
  467. for item in row:
  468. if isinstance(item, dict) and item.get('type', '') == 'function_call_output':
  469. call_id = item['call_id']
  470. call_dict[call_id] = item['output']
  471. elif isinstance(row, dict) and row.get('type', '') == 'function_call_output':
  472. call_id = row['call_id']
  473. call_dict[call_id] = row['output']
  474. # 然后处理每一行
  475. for row in result:
  476. if isinstance(row, list):
  477. # 递归处理列表中的每个项目
  478. for item in row:
  479. if isinstance(item, dict):
  480. process_row(item, process_list, call_dict, i)
  481. i += 1
  482. else:
  483. # 直接处理字典项
  484. process_row(row, process_list, call_dict, i)
  485. i += 1
  486. process_str = '\n'.join(process_list)
  487. return process_str
  488. def process_row(row, process_list, call_dict, i):
  489. """处理单个行项目,添加到处理列表中"""
  490. def format_output(in_str):
  491. return in_str.replace('\n\n', '\n').replace('\\"', '"')
  492. if not isinstance(row, dict):
  493. return
  494. action = ''
  495. out = ''
  496. call_id = ''
  497. role_ = row.get('role', '')
  498. type_ = row.get('type', '')
  499. if type_ == 'function_call':
  500. action = f'工具调用-{row.get("name")}'
  501. out = row['arguments']
  502. call_id = row['call_id']
  503. elif type_ == 'function_call_output':
  504. return # 跳过函数调用输出,它们已经被收集到call_dict中
  505. elif role_ in ('user', 'assistant'):
  506. action = role_
  507. if isinstance(row['content'], str):
  508. out = row['content']
  509. else:
  510. content_text = ""
  511. for this_c in row['content']:
  512. if isinstance(this_c, dict) and 'text' in this_c:
  513. content_text += this_c['text']
  514. out = content_text
  515. process_list.append('\n\n' + f'{i+1}. ' + '## ' + action + ' ' * 4 + '-' * 32 + '\n')
  516. process_list.append(format_output(str(out)))
  517. # 如果存在对应的工具输出,添加它
  518. if call_id and call_id in call_dict:
  519. process_list.append('\n\n' + f'{i+2}. ' + '## ' + '工具输出' + ' ' * 4 + '-' * 32 + '\n')
  520. process_list.append(format_output(call_dict[call_id]))