#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ prepare-subtask.py — Phase 2 子 Agent 任务切片提取工具. 功能: 解析 workflow.json 与 input/case-N.json, 提取出 Phase 2A 和 Phase 2B 子任务所需的最小切片。 特别支持「按需图片分流」: 自动识别 IO 变量所关联的图片编号 (如 图05), 并从输入文件中映射出对应的 URL/本地路径。 """ import argparse import json import re import sys from pathlib import Path # Configure UTF-8 for stream in (sys.stdout, sys.stderr): if hasattr(stream, 'reconfigure'): try: stream.reconfigure(encoding='utf-8', errors='replace') except Exception: pass def parse_args(): ap = argparse.ArgumentParser(description="Phase 2 子 Agent 任务切片提取工具") ap.add_argument("--workflow", type=Path, required=True, help="目标 workflow.json 路径") ap.add_argument("--source", type=Path, required=True, help="对应的输入 case JSON 路径") ap.add_argument("--out-dir", type=Path, required=True, help="输出任务切片的目标文件夹 (通常是 outputs/case-N/_scratch)") return ap.parse_args() def extract_image_refs(text: str, image_list: list) -> list: """ 通过正则匹配文本中的“图05”或“图 05”等字样,映射到输入 JSON 的 image_url_list 对应图片。 """ if not text or not image_list: return [] # 匹配 "图05" "图 05" "图1" "图 10" 等 matches = re.findall(r'图\s*0?(\d+)', text) if not matches: return [] refs = [] for m in matches: try: idx = int(m) - 1 # 通常文章中 "图1" 对应图集里的第1张图 (1-indexed) if 0 <= idx < len(image_list): img_item = image_list[idx] if isinstance(img_item, dict) and "image_url" in img_item: refs.append(img_item["image_url"]) elif isinstance(img_item, str): refs.append(img_item) except Exception: pass return sorted(list(set(refs))) def main(): args = parse_args() if not args.workflow.exists(): sys.exit(f"Error: workflow.json 不存在: {args.workflow}") if not args.source.exists(): sys.exit(f"Error: source json 不存在: {args.source}") # Create outputs folder if not exist args.out_dir.mkdir(parents=True, exist_ok=True) try: wf_data = json.loads(args.workflow.read_text(encoding='utf-8')) except Exception as e: sys.exit(f"Error reading workflow.json: {e}") try: src_data = json.loads(args.source.read_text(encoding='utf-8')) except Exception as e: sys.exit(f"Error reading source JSON: {e}") image_list = src_data.get("image_url_list", []) # ──── 提取 2A 子任务 (作用/动作/类型归一化) ────────────────────────────────── task_2a = { "case_id": args.workflow.parent.name, "task": "Phase 2A (effect/action/type normalization)", "image_url_list": image_list, "steps": [] } # ──── 提取 2B 子任务 (实质/形式词表查询匹配) ────────────────────────────────── task_2b = { "case_id": args.workflow.parent.name, "task": "Phase 2B (substance/form matching)", "image_url_list": image_list, "items_to_match": [] } for p_idx, proc in enumerate(wf_data.get("procedures", [])): proc_id = proc.get("id", f"p{p_idx+1}") # 2A: 包含 type_registry,供自定义类型校验 type_registry = proc.get("type_registry", {}) for s in proc.get("steps", []): sid = s.get("id") if not sid: continue # 2A Step Item step_item_2a = { "path": f"{proc_id}.{sid}", "name": s.get("name", ""), "action": s.get("action", ""), "effect": s.get("effect", ""), "kind": s.get("kind", "step"), "type_registry": type_registry, "inputs": [], "outputs": [] } # Extract inputs for idx, io in enumerate(s.get("inputs", [])): name = io.get("name", "") val = io.get("value", "") io_type = io.get("type", "") # 2B Variable Item related_imgs = extract_image_refs(val, image_list) # 也从 name 里提取,防漏 name_imgs = extract_image_refs(name, image_list) all_imgs = sorted(list(set(related_imgs + name_imgs))) step_item_2a["inputs"].append({ "path": f"{proc_id}.{sid}.inputs[{idx}]", "name": name, "value": val, "type": io_type, "related_images": all_imgs }) task_2b["items_to_match"].append({ "path": f"{proc_id}.{sid}.inputs[{idx}]", "name": name, "value": val, "type": io_type, "related_images": all_imgs }) # Extract outputs for idx, io in enumerate(s.get("outputs", [])): name = io.get("name", "") val = io.get("value", "") io_type = io.get("type", "") # 2B Variable Item related_imgs = extract_image_refs(val, image_list) name_imgs = extract_image_refs(name, image_list) all_imgs = sorted(list(set(related_imgs + name_imgs))) step_item_2a["outputs"].append({ "path": f"{proc_id}.{sid}.outputs[{idx}]", "name": name, "value": val, "type": io_type, "related_images": all_imgs }) task_2b["items_to_match"].append({ "path": f"{proc_id}.{sid}.outputs[{idx}]", "name": name, "value": val, "type": io_type, "related_images": all_imgs }) task_2a["steps"].append(step_item_2a) # 落盘 task_2a.json 与 task_2b.json file_2a = args.out_dir / "task_2a.json" file_2b = args.out_dir / "task_2b.json" file_2a.write_text(json.dumps(task_2a, ensure_ascii=False, indent=2), encoding='utf-8') file_2b.write_text(json.dumps(task_2b, ensure_ascii=False, indent=2), encoding='utf-8') print(f"[success] Generated subtask files in {args.out_dir}:") print(f" - task_2a.json ({len(task_2a['steps'])} steps)") print(f" - task_2b.json ({len(task_2b['items_to_match'])} items mapped with images)") if __name__ == "__main__": main()