| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150 |
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional
- from datetime import datetime
- from collections import deque
- from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, UploadFile, File
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import HTMLResponse
- from pydantic import BaseModel
- import uvicorn
- import sys
- app = FastAPI(title="Pipeline Dashboard")
- BASE_DIR = Path(__file__).parent
- OUTPUT_DIR = BASE_DIR / "output"
- DB_PATH = BASE_DIR / "db_requirements.json"
- PROMPTS_DIR = BASE_DIR / "prompts"
- SCRATCH_DIR = BASE_DIR / "scratch"
- SCRATCH_DIR.mkdir(exist_ok=True)
- # In-memory storage for active runs
- class ActiveRun:
- def __init__(self):
- self.process: Optional[asyncio.subprocess.Process] = None
- self.logs: deque = deque(maxlen=200) # Keep last 200 lines
- self.status: str = "starting"
- self.start_time: str = datetime.now().isoformat()
- active_runs: Dict[int, ActiveRun] = {}
- @app.post("/api/requirements/{index}/upload_source_ex")
- async def upload_source_ex(index: int, file: UploadFile = File(...)):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- dir_path.mkdir(parents=True, exist_ok=True)
-
- file_path = dir_path / "source_ex.json"
- content = await file.read()
- with open(file_path, "wb") as f:
- f.write(content)
-
- return {"status": "success", "filename": "source_ex.json"}
- class RunRequest(BaseModel):
- platforms: Optional[str] = None
- use_claude_sdk: bool = False
- restart_mode: Optional[str] = None
- phase: Optional[int] = None
- only_step: Optional[str] = None
- start_from: Optional[str] = None
- end_at: Optional[str] = None
- case_index: Optional[int] = None
- model: Optional[str] = None
- class MemoRequest(BaseModel):
- memo: str
- class RequirementUpdateRequest(BaseModel):
- requirement: str
- class PromptRequest(BaseModel):
- content: str
- schema_content: Optional[str] = None
- class ImportExternalRequest(BaseModel):
- case_id: str
- class CaseFilterRequest(BaseModel):
- case_id: str
- reason: Optional[str] = "manual_delete"
- from typing import Dict, List, Optional, Any
- class RunScriptRequest(BaseModel):
- folder: str
- filename: str
- args: List[Dict[str, Any]]
- @app.post("/api/requirements/{index}/import_source_ex")
- def import_source_ex(index: int, req: ImportExternalRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_ex_path = dir_path / "source_ex.json"
- source_path = dir_path / "source.json"
-
- if not source_ex_path.exists():
- raise HTTPException(status_code=404, detail="source_ex.json not found")
-
- with open(source_ex_path, "r", encoding="utf-8") as f:
- ex_data = json.load(f)
-
- ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
-
- target_case = None
- for c in ex_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId == req.case_id:
- target_case = c
- break
-
- if not target_case:
- raise HTTPException(status_code=404, detail="Case not found in external sources")
-
- source_cases = []
- if source_path.exists():
- with open(source_path, "r", encoding="utf-8") as f:
- s_data = json.load(f)
- source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
-
- # Check if already exists
- for c in source_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId == req.case_id:
- return {"status": "success", "message": "Already imported"}
-
- # Append
- source_cases.append(target_case)
-
- with open(source_path, "w", encoding="utf-8") as f:
- # Save as object format for source.json
- json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
-
- return {"status": "success"}
- @app.post("/api/requirements/{index}/import_all_source_ex")
- def import_all_source_ex(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_ex_path = dir_path / "source_ex.json"
- source_path = dir_path / "source.json"
-
- if not source_ex_path.exists():
- raise HTTPException(status_code=404, detail="source_ex.json not found")
-
- with open(source_ex_path, "r", encoding="utf-8") as f:
- ex_data = json.load(f)
-
- ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
-
- source_cases = []
- if source_path.exists():
- with open(source_path, "r", encoding="utf-8") as f:
- s_data = json.load(f)
- source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
-
- existing_ids = set()
- for c in source_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId:
- existing_ids.add(cId)
-
- added_count = 0
- for c in ex_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId and cId not in existing_ids:
- source_cases.append(c)
- existing_ids.add(cId)
- added_count += 1
-
- if added_count > 0:
- with open(source_path, "w", encoding="utf-8") as f:
- json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
-
- return {"status": "success", "count": added_count}
- @app.get("/api/requirements")
- def get_requirements():
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- results = []
- for i, req in enumerate(reqs):
- idx_str = f"{(i+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
-
- has_strategy = (dir_path / "strategy.json").exists()
- has_blueprint = (dir_path / "blueprint.json").exists()
- has_caps = (dir_path / "capabilities_extracted.json").exists()
-
- raw_cases_count = 0
- case_json_path = dir_path / "case.json"
- if case_json_path.exists():
- try:
- with open(case_json_path, 'r', encoding='utf-8') as f:
- case_data = json.load(f)
- raw_cases_count = len(case_data.get('cases', []))
- except Exception:
- pass
-
- if raw_cases_count == 0:
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists():
- raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
-
- status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
- if i in active_runs and active_runs[i].status == "running":
- status = "running"
-
- memo_content = ""
- memo_path = dir_path / "memo.txt"
- if memo_path.exists():
- with open(memo_path, "r", encoding="utf-8") as fm:
- memo_content = fm.read().strip()
- results.append({
- "index": i,
- "id": idx_str,
- "requirement": req,
- "status": status,
- "has_strategy": has_strategy,
- "has_blueprint": has_blueprint,
- "has_caps": has_caps,
- "raw_cases_count": raw_cases_count,
- "memo": memo_content
- })
-
- return results
- @app.put("/api/requirements/{index}")
- def update_requirement(index: int, req: RequirementUpdateRequest):
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- if index < 0 or index >= len(reqs):
- raise HTTPException(status_code=404, detail="Requirement index out of range")
-
- reqs[index] = req.requirement
-
- with open(DB_PATH, "w", encoding="utf-8") as f:
- json.dump(reqs, f, ensure_ascii=False, indent=2)
-
- return {"status": "success", "requirement": req.requirement}
- @app.post("/api/requirements/{index}/cases/filter")
- def filter_case(index: int, req: CaseFilterRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_path = dir_path / "source.json"
- filtered_path = dir_path / "filtered_cases.json"
-
- if not source_path.exists():
- raise HTTPException(status_code=404, detail="source.json not found")
-
- with open(source_path, "r", encoding="utf-8") as f:
- source_data = json.load(f)
-
- target_case = None
- new_sources = []
- for c in source_data.get("sources", []):
- c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id"))
- if c_id == req.case_id:
- target_case = c
- else:
- new_sources.append(c)
-
- if not target_case:
- raise HTTPException(status_code=404, detail="Case not found in source.json")
-
- source_data["sources"] = new_sources
- source_data["total"] = len(new_sources)
- with open(source_path, "w", encoding="utf-8") as f:
- json.dump(source_data, f, ensure_ascii=False, indent=2)
-
- filtered_data = {"total": 0, "by_reason": {}}
- if filtered_path.exists():
- with open(filtered_path, "r", encoding="utf-8") as f:
- filtered_data = json.load(f)
-
- reason = req.reason or "manual_delete"
- if reason not in filtered_data["by_reason"]:
- filtered_data["by_reason"][reason] = {"total": 0, "sources": []}
-
- # Copy and add filter_reason
- target_case["filter_reason"] = reason
- filtered_data["by_reason"][reason]["sources"].append(target_case)
- filtered_data["by_reason"][reason]["total"] = len(filtered_data["by_reason"][reason]["sources"])
-
- total_filtered = sum(r.get("total", 0) for r in filtered_data["by_reason"].values())
- filtered_data["total"] = total_filtered
-
- with open(filtered_path, "w", encoding="utf-8") as f:
- json.dump(filtered_data, f, ensure_ascii=False, indent=2)
-
- # Also attempt to remove from case.json if it exists
- case_path = dir_path.parent / "case.json"
- if case_path.exists():
- with open(case_path, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- if "cases" in case_data:
- original_len = len(case_data["cases"])
- case_data["cases"] = [c for c in case_data["cases"] if c.get("case_id") != req.case_id and c.get("_raw", {}).get("case_id") != req.case_id]
- if len(case_data["cases"]) != original_len:
- with open(case_path, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
-
- return {"status": "success"}
- @app.post("/api/requirements/{index}/cases/restore")
- def restore_case(index: int, req: CaseFilterRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_path = dir_path / "source.json"
- filtered_path = dir_path / "filtered_cases.json"
-
- if not filtered_path.exists():
- raise HTTPException(status_code=404, detail="filtered_cases.json not found")
-
- with open(filtered_path, "r", encoding="utf-8") as f:
- filtered_data = json.load(f)
-
- target_case = None
- # Find and remove
- for reason, reason_obj in filtered_data.get("by_reason", {}).items():
- new_sources = []
- for c in reason_obj.get("sources", []):
- c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id"))
- if c_id == req.case_id:
- target_case = c
- else:
- new_sources.append(c)
- if target_case:
- reason_obj["sources"] = new_sources
- reason_obj["total"] = len(new_sources)
- break
-
- if not target_case:
- raise HTTPException(status_code=404, detail="Case not found in filtered_cases.json")
-
- total_filtered = sum(r.get("total", 0) for r in filtered_data.get("by_reason", {}).values())
- filtered_data["total"] = total_filtered
-
- with open(filtered_path, "w", encoding="utf-8") as f:
- json.dump(filtered_data, f, ensure_ascii=False, indent=2)
-
- source_data = {"total": 0, "sources": []}
- if source_path.exists():
- with open(source_path, "r", encoding="utf-8") as f:
- source_data = json.load(f)
-
- # Clean up filter_reason
- target_case.pop("filter_reason", None)
-
- source_data["sources"].append(target_case)
- source_data["total"] = len(source_data["sources"])
-
- with open(source_path, "w", encoding="utf-8") as f:
- json.dump(source_data, f, ensure_ascii=False, indent=2)
-
- # Note: We do not automatically inject it back into case.json because case.json
- # requires format normalization (which generate_case.py does). The user must rerun step 1.6.
-
- return {"status": "success"}
- @app.post("/api/requirements/{index}/upload_cluster")
- async def upload_cluster(index: int, file: UploadFile = File(...)):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- cluster_path = dir_path / "cluster.json"
-
- content = await file.read()
- try:
- json_data = json.loads(content)
-
- # Load existing data or start new list
- existing_data = []
- if cluster_path.exists():
- with open(cluster_path, "r", encoding="utf-8") as f:
- try:
- existing = json.load(f)
- if isinstance(existing, list):
- existing_data = existing
- else:
- existing_data = [existing] # Wrap old format
- except:
- pass
-
- existing_data.append(json_data)
-
- with open(cluster_path, "w", encoding="utf-8") as f:
- json.dump(existing_data, f, ensure_ascii=False, indent=2)
-
- return {"status": "success"}
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
- @app.post("/api/requirements/{index}/save_cluster")
- async def save_cluster(index: int, request: Request):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- cluster_path = dir_path / "cluster.json"
-
- try:
- data = await request.json()
- with open(cluster_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- return {"status": "success"}
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Failed to save JSON: {str(e)}")
- @app.post("/api/requirements")
- def add_requirement(req: RequirementUpdateRequest):
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- reqs.append(req.requirement)
- new_index = len(reqs) - 1
-
- with open(DB_PATH, "w", encoding="utf-8") as f:
- json.dump(reqs, f, ensure_ascii=False, indent=2)
-
- # Pre-create output directory
- idx_str = f"{(new_index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
-
- return {"status": "success", "index": new_index, "requirement": req.requirement}
- @app.get("/api/requirements/{index}/data")
- def get_requirement_data(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
-
- def safe_load_json(p: Path):
- if not p.exists():
- return None
- content = ""
- try:
- with open(p, "r", encoding="utf-8") as f:
- content = f.read()
- return json.loads(content)
- except Exception as e:
- return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
- data = {
- "strategy": safe_load_json(dir_path / "strategy.json"),
- "blueprint": safe_load_json(dir_path / "process.json") or safe_load_json(dir_path / "blueprint.json"),
- "blueprint_temp": safe_load_json(dir_path / "blueprint_temp.json"),
- "cluster": safe_load_json(dir_path / "cluster.json"),
- "capabilities": safe_load_json(dir_path / "capabilities.json") or safe_load_json(dir_path / "capabilities_extracted.json"),
- "capabilities_temp": safe_load_json(dir_path / "capabilities_temp.json"),
- "raw_cases": {
- "case": safe_load_json(dir_path / "case.json")
- }
- }
-
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists():
- for f in raw_cases_dir.glob("*.json"):
- data["raw_cases"][f.stem] = safe_load_json(f)
-
- return data
- @app.get("/api/requirements/{index}/memo")
- def get_memo(index: int):
- idx_str = f"{(index+1):03d}"
- memo_path = OUTPUT_DIR / idx_str / "memo.txt"
- if memo_path.exists():
- with open(memo_path, "r", encoding="utf-8") as f:
- return {"memo": f.read()}
- return {"memo": ""}
- @app.get("/api/requirements/{index}/pipeline-status")
- def get_pipeline_status(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- raw_cases_dir = dir_path / "raw_cases"
-
- status = {
- "research": False,
- "source": False,
- "generate-case": False,
- "workflow-extract": False,
- "capability-extract-1": False,
- "process-cluster": False,
- "process-score": False,
- "capability-extract": False,
- "capability-enrich": False,
- "strategy": False
- }
-
- if raw_cases_dir.exists():
- if list(raw_cases_dir.glob("case_*.json")):
- status["research"] = True
- if (raw_cases_dir / "source.json").exists():
- status["source"] = True
-
- case_file = dir_path / "case.json"
- legacy_detailed = raw_cases_dir / "case_detailed.json"
-
- has_workflow = False
- has_capability = False
-
- if case_file.exists():
- status["generate-case"] = True
- try:
- with open(case_file, "r", encoding="utf-8") as f:
- cdata = json.load(f)
- if cdata.get("cases"):
- for c in cdata["cases"]:
- if c.get("workflow_groups"):
- has_workflow = True
- if any(
- isinstance(group, dict)
- and group.get("capability")
- for group in (c.get("workflow_groups") or [])
- ):
- has_capability = True
- if has_workflow and has_capability:
- break
- except Exception:
- pass
-
- if legacy_detailed.exists():
- status["generate-case"] = True
- has_workflow = True
-
- if has_workflow:
- status["workflow-extract"] = True
- if has_capability:
- status["capability-extract-1"] = True
-
- if (dir_path / "blueprint_temp.json").exists():
- status["process-cluster"] = True
- if (dir_path / "process.json").exists():
- status["process-score"] = True
- if (dir_path / "capabilities_temp.json").exists():
- status["capability-extract"] = True
- if (dir_path / "capabilities.json").exists():
- status["capability-enrich"] = True
- if (dir_path / "strategy.json").exists():
- status["strategy"] = True
-
- return status
- @app.post("/api/requirements/{index}/memo")
- def save_memo(index: int, req: MemoRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- memo_path = dir_path / "memo.txt"
- with open(memo_path, "w", encoding="utf-8") as f:
- f.write(req.memo)
- return {"status": "ok"}
- @app.get("/api/requirements/{index}/files")
- def list_req_files(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- if not dir_path.exists():
- return {"files": []}
-
- files = []
- for p in dir_path.rglob("*.*"):
- if p.is_file() and p.name not in [".DS_Store"]:
- try:
- rel_path = p.relative_to(dir_path)
- files.append({
- "path": str(rel_path).replace("\\", "/"),
- "name": p.name,
- "size": p.stat().st_size,
- "mtime": p.stat().st_mtime
- })
- except:
- pass
- return {"files": files}
- @app.get("/api/requirements/{index}/files/raw")
- def get_req_file_raw(index: int, path: str):
- idx_str = f"{(index+1):03d}"
- if ".." in path or path.startswith("/"):
- raise HTTPException(status_code=400, detail="Invalid path")
- file_path = OUTPUT_DIR / idx_str / path
- if not file_path.exists() or not file_path.is_file():
- raise HTTPException(status_code=404, detail="File not found")
- from fastapi.responses import FileResponse
- return FileResponse(file_path)
- @app.get("/api/requirements/{index}/files/content")
- def get_req_file_content(index: int, path: str):
- idx_str = f"{(index+1):03d}"
- # prevent path traversal
- if ".." in path or path.startswith("/"):
- raise HTTPException(status_code=400, detail="Invalid path")
-
- file_path = OUTPUT_DIR / idx_str / path
- if not file_path.exists() or not file_path.is_file():
- raise HTTPException(status_code=404, detail="File not found")
-
- try:
- content = file_path.read_text(encoding="utf-8", errors="replace")
- return {"content": content}
- except Exception as e:
- return {"content": f"Unable to read file: {e}"}
- def parse_script_args(script_path: Path):
- args = []
- raw_help = ""
- try:
- import subprocess
- import os
- is_python = script_path.name.endswith('.py')
- parsed_via_ast = False
-
- env = os.environ.copy()
- project_root = str(BASE_DIR.parent.parent)
- if "PYTHONPATH" in env:
- env["PYTHONPATH"] = project_root + os.pathsep + env["PYTHONPATH"]
- else:
- env["PYTHONPATH"] = project_root
-
- if is_python:
- INSPECTOR_CODE = """
- import sys, json, argparse, runpy
- def _mock_parse_args(self, args=None, namespace=None):
- actions_info = []
- for action in self._actions:
- if action.dest == 'help': continue
- actions_info.append({
- 'names': action.option_strings if action.option_strings else [action.dest],
- 'dest': action.dest,
- 'required': action.required,
- 'default': action.default if action.default is not argparse.SUPPRESS else None,
- 'desc': action.help,
- 'type': str(action.type.__name__) if hasattr(action.type, '__name__') else str(action.type),
- 'action_type': action.__class__.__name__,
- 'choices': action.choices,
- 'is_positional': not action.option_strings
- })
- print('ARGPARSE_METADATA_START')
- print(json.dumps(actions_info, ensure_ascii=False))
- print('ARGPARSE_METADATA_END')
- sys.exit(0)
- argparse.ArgumentParser.parse_args = _mock_parse_args
- try:
- runpy.run_path(sys.argv[1], run_name='__main__')
- except SystemExit:
- pass
- except Exception:
- pass
- """
- result = subprocess.run(
- [sys.executable, "-c", INSPECTOR_CODE, str(script_path)],
- capture_output=True, text=True, encoding="utf-8", timeout=5, env=env
- )
- out = result.stdout
- if 'ARGPARSE_METADATA_START' in out and 'ARGPARSE_METADATA_END' in out:
- json_str = out.split('ARGPARSE_METADATA_START')[1].split('ARGPARSE_METADATA_END')[0].strip()
- args_data = json.loads(json_str)
- for action in args_data:
- args.append({
- "names": action["names"],
- "desc": action["desc"],
- "required": action["required"],
- "default": action["default"],
- "choices": action["choices"],
- "is_positional": action["is_positional"],
- "action_type": action["action_type"]
- })
- parsed_via_ast = True
- if not parsed_via_ast:
- result = subprocess.run(
- [sys.executable if is_python else "bash", str(script_path), "--help"],
- capture_output=True, text=True, encoding="utf-8", timeout=5, env=env
- )
- help_text = result.stdout + result.stderr
- raw_help = help_text
-
- current_arg = None
- for line in help_text.splitlines():
- stripped = line.strip()
- if stripped.startswith("-") and not stripped.startswith("--help"):
- parts = stripped.split(" ")
- arg_name_part = parts[0].strip()
- desc = parts[-1].strip() if len(parts) > 1 else ""
- names = [x.strip().rstrip(",") for x in arg_name_part.split() if x.startswith("-") or x.startswith("--")]
- if names:
- current_arg = {
- "names": names,
- "desc": desc,
- "required": False,
- "default": None,
- "is_positional": False
- }
- args.append(current_arg)
- elif current_arg and stripped and not stripped.startswith("-"):
- if current_arg["desc"]:
- current_arg["desc"] += " " + stripped
- else:
- current_arg["desc"] = stripped
- except Exception as e:
- raw_help = f"Failed to parse help: {e}"
- args = []
-
- return args, raw_help
- @app.get("/api/scripts")
- async def list_scripts():
- scripts = []
- if SCRATCH_DIR.exists():
- for d in SCRATCH_DIR.iterdir():
- if d.is_dir():
- main_scripts = list(d.glob("*.py")) + list(d.glob("*.sh"))
- for s in main_scripts:
- scripts.append({"name": s.name, "folder": d.name})
- return {"scripts": scripts}
- @app.get("/api/scripts/{folder}/{filename}/parse")
- async def parse_existing_script(folder: str, filename: str):
- script_path = SCRATCH_DIR / folder / filename
- if not script_path.exists():
- raise HTTPException(status_code=404, detail="Script not found")
- args, raw_help = parse_script_args(script_path)
- return {"status": "success", "filename": filename, "folder": folder, "args": args, "raw_help": raw_help}
- @app.post("/api/scripts/upload")
- async def upload_script(file: UploadFile = File(...)):
- content = await file.read()
- script_stem = Path(file.filename).stem
- script_dir = SCRATCH_DIR / script_stem
- script_dir.mkdir(parents=True, exist_ok=True)
-
- script_path = script_dir / file.filename
- script_path.write_bytes(content)
-
- args, raw_help = parse_script_args(script_path)
- return {"status": "success", "filename": file.filename, "folder": script_stem, "args": args, "raw_help": raw_help}
- RUNNING_PROCESSES = {}
- @app.post("/api/scripts/run")
- async def run_script(req: RunScriptRequest):
- script_dir = SCRATCH_DIR / req.folder
- script_path = script_dir / req.filename
- if not script_path.exists():
- raise HTTPException(status_code=404, detail="Script not found")
-
- (script_dir / "inputs").mkdir(exist_ok=True)
- (script_dir / "outputs").mkdir(exist_ok=True)
-
- cmd = [sys.executable if req.filename.endswith('.py') else "bash", str(script_path)]
-
- for arg in req.args:
- name = arg.get("name")
- val = arg.get("value")
- is_pos = arg.get("is_positional", False)
-
- if is_pos:
- if isinstance(val, str) and val.strip():
- # Positional arguments split by space to simulate shell splitting
- cmd.extend(val.split())
- else:
- if not name:
- continue
- if isinstance(val, bool):
- if val:
- cmd.append(name)
- else:
- if str(val).strip():
- cmd.append(name)
- cmd.append(str(val))
-
- import time
- import os
- start_time = time.time()
-
- env = os.environ.copy()
- project_root = str(BASE_DIR.parent.parent)
- if "PYTHONPATH" in env:
- env["PYTHONPATH"] = project_root + os.pathsep + env["PYTHONPATH"]
- else:
- env["PYTHONPATH"] = project_root
-
- env["PYTHONUNBUFFERED"] = "1"
-
- async def generate_output():
- import asyncio
- import json
- import time
- start_time = time.time()
-
- try:
- process = await asyncio.create_subprocess_exec(
- *cmd,
- cwd=str(script_dir),
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=env,
- # On Windows, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP allows sending CTRL_BREAK_EVENT
- )
- RUNNING_PROCESSES[req.folder] = process
-
- q = asyncio.Queue()
-
- async def enqueue_stream(stream, stream_type):
- while True:
- line = await stream.readline()
- if not line:
- break
- await q.put(json.dumps({stream_type: line.decode('utf-8', errors='replace')}) + "\n")
- await q.put(None)
-
- asyncio.create_task(enqueue_stream(process.stdout, "stdout"))
- asyncio.create_task(enqueue_stream(process.stderr, "stderr"))
-
- eof_count = 0
- while eof_count < 2:
- item = await q.get()
- if item is None:
- eof_count += 1
- else:
- yield item
-
- await process.wait()
-
- generated_files = []
- for p in script_dir.iterdir():
- if p.is_file() and p.stat().st_mtime >= start_time - 2:
- if p.name != req.filename:
- generated_files.append({"name": f"{req.folder}/{p.name}", "size": p.stat().st_size})
-
- yield json.dumps({"returncode": process.returncode, "files": generated_files}) + "\n"
- except asyncio.CancelledError:
- if process.returncode is None:
- try:
- process.terminate()
- except Exception:
- pass
- raise
- except Exception as e:
- yield json.dumps({"stderr": str(e), "returncode": -1, "files": []}) + "\n"
- finally:
- if req.folder in RUNNING_PROCESSES:
- del RUNNING_PROCESSES[req.folder]
- if process.returncode is None:
- try:
- process.terminate()
- except Exception:
- pass
-
- from fastapi.responses import StreamingResponse
- return StreamingResponse(generate_output(), media_type="application/x-ndjson")
- @app.post("/api/scripts/{folder}/stop")
- async def stop_script(folder: str):
- if folder in RUNNING_PROCESSES:
- process = RUNNING_PROCESSES[folder]
- if process.returncode is None:
- try:
- import psutil
- parent = psutil.Process(process.pid)
- for child in parent.children(recursive=True):
- child.kill()
- parent.kill()
- except Exception:
- try:
- process.terminate()
- except Exception:
- pass
- return {"status": "success", "message": "Process stopped"}
- return {"status": "not_running"}
- @app.get("/api/scripts/files/{filename:path}")
- async def download_script_file(filename: str, inline: bool = False):
- if ".." in filename or filename.startswith("/"):
- raise HTTPException(status_code=400, detail="Invalid filename")
-
- file_path = SCRATCH_DIR / filename
- if not file_path.exists() or not file_path.is_file():
- raise HTTPException(status_code=404, detail="File not found")
-
- from fastapi.responses import FileResponse
- if inline:
- return FileResponse(file_path, content_disposition_type="inline")
- return FileResponse(file_path, filename=file_path.name)
- @app.delete("/api/scripts/files/{filename:path}")
- async def delete_script_file(filename: str):
- if ".." in filename or filename.startswith("/"):
- raise HTTPException(status_code=400, detail="Invalid filename")
- file_path = SCRATCH_DIR / filename
- if not file_path.exists() or not file_path.is_file():
- raise HTTPException(status_code=404, detail="File not found")
- file_path.unlink()
- return {"status": "success"}
- @app.get("/api/scripts/{folder}/files")
- async def list_script_folder_files(folder: str):
- folder_path = SCRATCH_DIR / folder
- if not folder_path.exists() or not folder_path.is_dir():
- return {"files": []}
-
- files = []
- for f in folder_path.rglob("*"):
- if f.is_file():
- # Calculate relative path to the scratch dir for the download URL
- rel_path = f.relative_to(SCRATCH_DIR)
- files.append({
- "name": f.name,
- "path": str(rel_path).replace("\\", "/"),
- "size": f.stat().st_size
- })
- return {"files": files}
- import shutil
- @app.delete("/api/scripts/{folder}")
- async def delete_script(folder: str):
- if ".." in folder or folder.startswith("/"):
- raise HTTPException(status_code=400, detail="Invalid folder")
- script_dir = SCRATCH_DIR / folder
- if script_dir.exists() and script_dir.is_dir():
- shutil.rmtree(script_dir)
- return {"status": "success"}
- @app.post("/api/scripts/{folder}/upload_data")
- async def upload_script_data(folder: str, file: UploadFile = File(...)):
- if ".." in folder or folder.startswith("/"):
- raise HTTPException(status_code=400, detail="Invalid folder")
- script_dir = SCRATCH_DIR / folder
- if not script_dir.exists() or not script_dir.is_dir():
- raise HTTPException(status_code=404, detail="Script folder not found")
-
- inputs_dir = script_dir / "inputs"
- inputs_dir.mkdir(exist_ok=True)
- file_path = inputs_dir / file.filename
- content = await file.read()
- file_path.write_bytes(content)
- return {"status": "success", "filename": f"inputs/{file.filename}"}
- @app.get("/api/prompts")
- def list_prompts():
- if not PROMPTS_DIR.exists():
- return []
- return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
- @app.get("/api/prompts/{name}")
- def get_prompt(name: str):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_path = PROMPTS_DIR / name
- if not prompt_path.exists() or not prompt_path.is_file():
- raise HTTPException(status_code=404, detail="Prompt not found")
-
- schema_name = name.replace(".prompt", ".schema.json")
- schema_path = PROMPTS_DIR / schema_name
-
- schema_content = ""
- if schema_path.exists() and schema_path.is_file():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_content = f.read()
- with open(prompt_path, "r", encoding="utf-8") as f:
- return {"content": f.read(), "schema_content": schema_content}
- @app.post("/api/prompts/{name}")
- def save_prompt(name: str, req: PromptRequest):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_path = PROMPTS_DIR / name
- if not prompt_path.exists() or not prompt_path.is_file():
- raise HTTPException(status_code=404, detail="Prompt not found")
-
- schema_name = name.replace(".prompt", ".schema.json")
- schema_path = PROMPTS_DIR / schema_name
-
- if req.schema_content is not None:
- if req.schema_content.strip():
- try:
- parsed_schema = json.loads(req.schema_content)
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Invalid Schema JSON: {str(e)}")
- with open(schema_path, "w", encoding="utf-8") as f:
- json.dump(parsed_schema, f, ensure_ascii=False, indent=2)
- else:
- if schema_path.exists():
- schema_path.unlink() # remove if empty
-
- with open(prompt_path, "w", encoding="utf-8", newline="\n") as f:
- # Enforce Unix LF line endings to avoid unnecessary git diffs
- # and format inconsistencies between the file system and UI.
- f.write(req.content.replace('\r\n', '\n'))
- return {"status": "ok"}
- @app.post("/api/prompts/{name}/update_schema")
- async def api_update_schema(name: str):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_name = name.replace(".prompt", "")
-
- from script.update_schema import update_schema as script_update_schema
- try:
- # Calls the script to update the schema (this writes to the file)
- await script_update_schema(prompt_name=prompt_name, model="openai/gpt-5.4", dry_run=False)
-
- schema_name = name.replace(".prompt", ".schema.json")
- schema_path = PROMPTS_DIR / schema_name
-
- schema_content = ""
- if schema_path.exists() and schema_path.is_file():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_content = f.read()
-
- return {"status": "ok", "schema_content": schema_content}
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Failed to update schema: {str(e)}")
- async def run_pipeline_task(index: int, run_req: RunRequest):
- run_state = active_runs[index]
- run_state.status = "running"
-
- dir_path = OUTPUT_DIR / f"{(index+1):03d}"
-
- # We no longer manually delete files here for most modes,
- # run_pipeline.py handles overwriting when --only-step or --start-from is passed.
-
- # build command
- script_path = BASE_DIR / "run_pipeline.py"
- cmd = [sys.executable, str(script_path), "--index", str(index)]
-
- if run_req.platforms:
- cmd.extend(["--platforms", run_req.platforms])
- if run_req.use_claude_sdk:
- cmd.append("--use-claude-sdk")
- if run_req.model:
- cmd.extend(["--model", run_req.model])
-
- if run_req.only_step:
- cmd.extend(["--only-step", run_req.only_step])
- if run_req.case_index is not None:
- cmd.extend(["--case-index", str(run_req.case_index)])
- elif run_req.phase is not None:
- cmd.extend(["--phase", str(run_req.phase)])
- else:
- if run_req.start_from:
- cmd.extend(["--start-from", run_req.start_from])
- if run_req.end_at:
- cmd.extend(["--end-at", run_req.end_at])
- if run_req.restart_mode and run_req.restart_mode == "smart":
- pass # Smart mode requires no extra args
-
- run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
-
- import threading
- import subprocess
- import os
-
- def run_process():
- try:
- process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True,
- encoding="utf-8",
- bufsize=1,
- cwd=str(BASE_DIR),
- env=dict(os.environ, PYTHONIOENCODING="utf-8")
- )
- run_state.process = process
-
- for line in iter(process.stdout.readline, ''):
- if line:
- run_state.logs.append(line)
-
- process.stdout.close()
- return_code = process.wait()
-
- run_state.logs.append(f"\nProcess exited with code {return_code}")
- run_state.status = "completed" if return_code == 0 else "failed"
- except Exception as e:
- run_state.logs.append(f"\nException occurred: {repr(e)}")
- run_state.status = "failed"
-
- thread = threading.Thread(target=run_process)
- thread.start()
- @app.post("/api/pipeline/run/{index}")
- async def trigger_pipeline(index: int, req: RunRequest):
- if index in active_runs and active_runs[index].status == "running":
- raise HTTPException(status_code=400, detail="Pipeline already running for this index")
-
- active_runs[index] = ActiveRun()
- asyncio.create_task(run_pipeline_task(index, req))
- return {"message": "Pipeline started", "index": index}
- @app.post("/api/pipeline/stop/{index}")
- async def stop_pipeline(index: int):
- if index not in active_runs or active_runs[index].status != "running":
- raise HTTPException(status_code=400, detail="No running pipeline found for this index")
-
- process = active_runs[index].process
- if process:
- try:
- process.terminate()
- active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
- active_runs[index].status = "failed"
- return {"status": "stopped"}
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
- raise HTTPException(status_code=500, detail="Process handle missing")
- @app.get("/api/pipeline/status")
- def get_all_status():
- res = {}
- for idx, run in active_runs.items():
- res[idx] = {
- "status": run.status,
- "start_time": run.start_time,
- "logs": list(run.logs)
- }
- return res
- # Mount UI static files
- ui_dir = BASE_DIR / "ui"
- ui_dir.mkdir(exist_ok=True)
- app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
- # Mount output directory for local resources (like images)
- app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
- @app.get("/")
- def serve_ui():
- index_html = ui_dir / "index.html"
- if not index_html.exists():
- return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
- with open(index_html, "r", encoding="utf-8") as f:
- return HTMLResponse(f.read())
- if __name__ == "__main__":
- print("Starting Pipeline Dashboard server on http://127.0.0.0:18080")
- uvicorn.run("server:app", host="0.0.0.0", port=18080, reload=False)
|