prepare-subtask.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. prepare-subtask.py — Phase 2 子 Agent 任务切片提取工具.
  5. 功能:
  6. 解析 workflow.json 与 input/case-N.json, 提取出 Phase 2A 和 Phase 2B 子任务所需的最小切片。
  7. 特别支持「按需图片分流」: 自动识别 IO 变量所关联的图片编号 (如 图05), 并从输入文件中映射出对应的 URL/本地路径。
  8. """
  9. import argparse
  10. import json
  11. import re
  12. import sys
  13. from pathlib import Path
  14. # Configure UTF-8
  15. for stream in (sys.stdout, sys.stderr):
  16. if hasattr(stream, 'reconfigure'):
  17. try:
  18. stream.reconfigure(encoding='utf-8', errors='replace')
  19. except Exception:
  20. pass
  21. def parse_args():
  22. ap = argparse.ArgumentParser(description="Phase 2 子 Agent 任务切片提取工具")
  23. ap.add_argument("--workflow", type=Path, required=True, help="目标 workflow.json 路径")
  24. ap.add_argument("--source", type=Path, required=True, help="对应的输入 case JSON 路径")
  25. ap.add_argument("--out-dir", type=Path, required=True, help="输出任务切片的目标文件夹 (通常是 outputs/case-N/_scratch)")
  26. return ap.parse_args()
  27. def extract_image_refs(text: str, image_list: list) -> list:
  28. """
  29. 通过正则匹配文本中的“图05”或“图 05”等字样,映射到输入 JSON 的 image_url_list 对应图片。
  30. """
  31. if not text or not image_list:
  32. return []
  33. # 匹配 "图05" "图 05" "图1" "图 10" 等
  34. matches = re.findall(r'图\s*0?(\d+)', text)
  35. if not matches:
  36. return []
  37. refs = []
  38. for m in matches:
  39. try:
  40. idx = int(m) - 1 # 通常文章中 "图1" 对应图集里的第1张图 (1-indexed)
  41. if 0 <= idx < len(image_list):
  42. img_item = image_list[idx]
  43. if isinstance(img_item, dict) and "image_url" in img_item:
  44. refs.append(img_item["image_url"])
  45. elif isinstance(img_item, str):
  46. refs.append(img_item)
  47. except Exception:
  48. pass
  49. return sorted(list(set(refs)))
  50. def main():
  51. args = parse_args()
  52. if not args.workflow.exists():
  53. sys.exit(f"Error: workflow.json 不存在: {args.workflow}")
  54. if not args.source.exists():
  55. sys.exit(f"Error: source json 不存在: {args.source}")
  56. # Create outputs folder if not exist
  57. args.out_dir.mkdir(parents=True, exist_ok=True)
  58. try:
  59. wf_data = json.loads(args.workflow.read_text(encoding='utf-8'))
  60. except Exception as e:
  61. sys.exit(f"Error reading workflow.json: {e}")
  62. try:
  63. src_data = json.loads(args.source.read_text(encoding='utf-8'))
  64. except Exception as e:
  65. sys.exit(f"Error reading source JSON: {e}")
  66. image_list = src_data.get("image_url_list", [])
  67. # ──── 提取 2A 子任务 (作用/动作/类型归一化) ──────────────────────────────────
  68. task_2a = {
  69. "case_id": args.workflow.parent.name,
  70. "task": "Phase 2A (effect/action/type normalization)",
  71. "image_url_list": image_list,
  72. "steps": []
  73. }
  74. # ──── 提取 2B 子任务 (实质/形式词表查询匹配) ──────────────────────────────────
  75. task_2b = {
  76. "case_id": args.workflow.parent.name,
  77. "task": "Phase 2B (substance/form matching)",
  78. "image_url_list": image_list,
  79. "items_to_match": []
  80. }
  81. for p_idx, proc in enumerate(wf_data.get("procedures", [])):
  82. proc_id = proc.get("id", f"p{p_idx+1}")
  83. # 2A: 包含 type_registry,供自定义类型校验
  84. type_registry = proc.get("type_registry", {})
  85. for s in proc.get("steps", []):
  86. sid = s.get("id")
  87. if not sid:
  88. continue
  89. # 2A Step Item
  90. step_item_2a = {
  91. "path": f"{proc_id}.{sid}",
  92. "name": s.get("name", ""),
  93. "action": s.get("action", ""),
  94. "effect": s.get("effect", ""),
  95. "kind": s.get("kind", "step"),
  96. "type_registry": type_registry,
  97. "inputs": [],
  98. "outputs": []
  99. }
  100. # Extract inputs
  101. for idx, io in enumerate(s.get("inputs", [])):
  102. name = io.get("name", "")
  103. val = io.get("value", "")
  104. io_type = io.get("type", "")
  105. # 2B Variable Item
  106. related_imgs = extract_image_refs(val, image_list)
  107. # 也从 name 里提取,防漏
  108. name_imgs = extract_image_refs(name, image_list)
  109. all_imgs = sorted(list(set(related_imgs + name_imgs)))
  110. step_item_2a["inputs"].append({
  111. "path": f"{proc_id}.{sid}.inputs[{idx}]",
  112. "name": name,
  113. "value": val,
  114. "type": io_type,
  115. "related_images": all_imgs
  116. })
  117. task_2b["items_to_match"].append({
  118. "path": f"{proc_id}.{sid}.inputs[{idx}]",
  119. "name": name,
  120. "value": val,
  121. "type": io_type,
  122. "related_images": all_imgs
  123. })
  124. # Extract outputs
  125. for idx, io in enumerate(s.get("outputs", [])):
  126. name = io.get("name", "")
  127. val = io.get("value", "")
  128. io_type = io.get("type", "")
  129. # 2B Variable Item
  130. related_imgs = extract_image_refs(val, image_list)
  131. name_imgs = extract_image_refs(name, image_list)
  132. all_imgs = sorted(list(set(related_imgs + name_imgs)))
  133. step_item_2a["outputs"].append({
  134. "path": f"{proc_id}.{sid}.outputs[{idx}]",
  135. "name": name,
  136. "value": val,
  137. "type": io_type,
  138. "related_images": all_imgs
  139. })
  140. task_2b["items_to_match"].append({
  141. "path": f"{proc_id}.{sid}.outputs[{idx}]",
  142. "name": name,
  143. "value": val,
  144. "type": io_type,
  145. "related_images": all_imgs
  146. })
  147. task_2a["steps"].append(step_item_2a)
  148. # 落盘 task_2a.json 与 task_2b.json
  149. file_2a = args.out_dir / "task_2a.json"
  150. file_2b = args.out_dir / "task_2b.json"
  151. file_2a.write_text(json.dumps(task_2a, ensure_ascii=False, indent=2), encoding='utf-8')
  152. file_2b.write_text(json.dumps(task_2b, ensure_ascii=False, indent=2), encoding='utf-8')
  153. print(f"[success] Generated subtask files in {args.out_dir}:")
  154. print(f" - task_2a.json ({len(task_2a['steps'])} steps)")
  155. print(f" - task_2b.json ({len(task_2b['items_to_match'])} items mapped with images)")
  156. if __name__ == "__main__":
  157. main()