| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- prepare-subtask.py — Phase 2 子 Agent 任务切片提取工具.
- 功能:
- 解析 workflow.json 与 input/case-N.json, 提取出 Phase 2A 和 Phase 2B 子任务所需的最小切片。
- 特别支持「按需图片分流」: 自动识别 IO 变量所关联的图片编号 (如 图05), 并从输入文件中映射出对应的 URL/本地路径。
- """
- import argparse
- import json
- import re
- import sys
- from pathlib import Path
- # Configure UTF-8
- for stream in (sys.stdout, sys.stderr):
- if hasattr(stream, 'reconfigure'):
- try:
- stream.reconfigure(encoding='utf-8', errors='replace')
- except Exception:
- pass
- def parse_args():
- ap = argparse.ArgumentParser(description="Phase 2 子 Agent 任务切片提取工具")
- ap.add_argument("--workflow", type=Path, required=True, help="目标 workflow.json 路径")
- ap.add_argument("--source", type=Path, required=True, help="对应的输入 case JSON 路径")
- ap.add_argument("--out-dir", type=Path, required=True, help="输出任务切片的目标文件夹 (通常是 outputs/case-N/_scratch)")
- return ap.parse_args()
- def extract_image_refs(text: str, image_list: list) -> list:
- """
- 通过正则匹配文本中的“图05”或“图 05”等字样,映射到输入 JSON 的 image_url_list 对应图片。
- """
- if not text or not image_list:
- return []
-
- # 匹配 "图05" "图 05" "图1" "图 10" 等
- matches = re.findall(r'图\s*0?(\d+)', text)
- if not matches:
- return []
-
- refs = []
- for m in matches:
- try:
- idx = int(m) - 1 # 通常文章中 "图1" 对应图集里的第1张图 (1-indexed)
- if 0 <= idx < len(image_list):
- img_item = image_list[idx]
- if isinstance(img_item, dict) and "image_url" in img_item:
- refs.append(img_item["image_url"])
- elif isinstance(img_item, str):
- refs.append(img_item)
- except Exception:
- pass
- return sorted(list(set(refs)))
- def main():
- args = parse_args()
-
- if not args.workflow.exists():
- sys.exit(f"Error: workflow.json 不存在: {args.workflow}")
- if not args.source.exists():
- sys.exit(f"Error: source json 不存在: {args.source}")
-
- # Create outputs folder if not exist
- args.out_dir.mkdir(parents=True, exist_ok=True)
-
- try:
- wf_data = json.loads(args.workflow.read_text(encoding='utf-8'))
- except Exception as e:
- sys.exit(f"Error reading workflow.json: {e}")
-
- try:
- src_data = json.loads(args.source.read_text(encoding='utf-8'))
- except Exception as e:
- sys.exit(f"Error reading source JSON: {e}")
-
- image_list = src_data.get("image_url_list", [])
-
- # ──── 提取 2A 子任务 (作用/动作/类型归一化) ──────────────────────────────────
- task_2a = {
- "case_id": args.workflow.parent.name,
- "task": "Phase 2A (effect/action/type normalization)",
- "image_url_list": image_list,
- "steps": []
- }
-
- # ──── 提取 2B 子任务 (实质/形式词表查询匹配) ──────────────────────────────────
- task_2b = {
- "case_id": args.workflow.parent.name,
- "task": "Phase 2B (substance/form matching)",
- "image_url_list": image_list,
- "items_to_match": []
- }
-
- for p_idx, proc in enumerate(wf_data.get("procedures", [])):
- proc_id = proc.get("id", f"p{p_idx+1}")
-
- # 2A: 包含 type_registry,供自定义类型校验
- type_registry = proc.get("type_registry", {})
-
- for s in proc.get("steps", []):
- sid = s.get("id")
- if not sid:
- continue
-
- # 2A Step Item
- step_item_2a = {
- "path": f"{proc_id}.{sid}",
- "name": s.get("name", ""),
- "action": s.get("action", ""),
- "effect": s.get("effect", ""),
- "kind": s.get("kind", "step"),
- "type_registry": type_registry,
- "inputs": [],
- "outputs": []
- }
-
- # Extract inputs
- for idx, io in enumerate(s.get("inputs", [])):
- name = io.get("name", "")
- val = io.get("value", "")
- io_type = io.get("type", "")
-
- # 2B Variable Item
- related_imgs = extract_image_refs(val, image_list)
- # 也从 name 里提取,防漏
- name_imgs = extract_image_refs(name, image_list)
- all_imgs = sorted(list(set(related_imgs + name_imgs)))
-
- step_item_2a["inputs"].append({
- "path": f"{proc_id}.{sid}.inputs[{idx}]",
- "name": name,
- "value": val,
- "type": io_type,
- "related_images": all_imgs
- })
-
- task_2b["items_to_match"].append({
- "path": f"{proc_id}.{sid}.inputs[{idx}]",
- "name": name,
- "value": val,
- "type": io_type,
- "related_images": all_imgs
- })
-
- # Extract outputs
- for idx, io in enumerate(s.get("outputs", [])):
- name = io.get("name", "")
- val = io.get("value", "")
- io_type = io.get("type", "")
-
- # 2B Variable Item
- related_imgs = extract_image_refs(val, image_list)
- name_imgs = extract_image_refs(name, image_list)
- all_imgs = sorted(list(set(related_imgs + name_imgs)))
-
- step_item_2a["outputs"].append({
- "path": f"{proc_id}.{sid}.outputs[{idx}]",
- "name": name,
- "value": val,
- "type": io_type,
- "related_images": all_imgs
- })
-
- task_2b["items_to_match"].append({
- "path": f"{proc_id}.{sid}.outputs[{idx}]",
- "name": name,
- "value": val,
- "type": io_type,
- "related_images": all_imgs
- })
-
- task_2a["steps"].append(step_item_2a)
- # 落盘 task_2a.json 与 task_2b.json
- file_2a = args.out_dir / "task_2a.json"
- file_2b = args.out_dir / "task_2b.json"
-
- file_2a.write_text(json.dumps(task_2a, ensure_ascii=False, indent=2), encoding='utf-8')
- file_2b.write_text(json.dumps(task_2b, ensure_ascii=False, indent=2), encoding='utf-8')
-
- print(f"[success] Generated subtask files in {args.out_dir}:")
- print(f" - task_2a.json ({len(task_2a['steps'])} steps)")
- print(f" - task_2b.json ({len(task_2b['items_to_match'])} items mapped with images)")
- if __name__ == "__main__":
- main()
|