| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752 |
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional
- from datetime import datetime
- from collections import deque
- from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, UploadFile, File
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import HTMLResponse
- from pydantic import BaseModel
- import uvicorn
- import sys
- app = FastAPI(title="Pipeline Dashboard")
- BASE_DIR = Path(__file__).parent
- OUTPUT_DIR = BASE_DIR / "output"
- DB_PATH = BASE_DIR / "db_requirements.json"
- PROMPTS_DIR = BASE_DIR / "prompts"
- # In-memory storage for active runs
- class ActiveRun:
- def __init__(self):
- self.process: Optional[asyncio.subprocess.Process] = None
- self.logs: deque = deque(maxlen=200) # Keep last 200 lines
- self.status: str = "starting"
- self.start_time: str = datetime.now().isoformat()
- active_runs: Dict[int, ActiveRun] = {}
- @app.post("/api/requirements/{index}/upload_source_ex")
- async def upload_source_ex(index: int, file: UploadFile = File(...)):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- dir_path.mkdir(parents=True, exist_ok=True)
-
- file_path = dir_path / "source_ex.json"
- content = await file.read()
- with open(file_path, "wb") as f:
- f.write(content)
-
- return {"status": "success", "filename": "source_ex.json"}
- class RunRequest(BaseModel):
- platforms: Optional[str] = None
- use_claude_sdk: bool = False
- restart_mode: Optional[str] = None
- phase: Optional[int] = None
- only_step: Optional[str] = None
- start_from: Optional[str] = None
- end_at: Optional[str] = None
- case_index: Optional[int] = None
- class MemoRequest(BaseModel):
- memo: str
- class RequirementUpdateRequest(BaseModel):
- requirement: str
- class PromptRequest(BaseModel):
- content: str
- schema_content: Optional[str] = None
- class ImportExternalRequest(BaseModel):
- case_id: str
- class CaseFilterRequest(BaseModel):
- case_id: str
- reason: Optional[str] = "manual_delete"
- @app.post("/api/requirements/{index}/import_source_ex")
- def import_source_ex(index: int, req: ImportExternalRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_ex_path = dir_path / "source_ex.json"
- source_path = dir_path / "source.json"
-
- if not source_ex_path.exists():
- raise HTTPException(status_code=404, detail="source_ex.json not found")
-
- with open(source_ex_path, "r", encoding="utf-8") as f:
- ex_data = json.load(f)
-
- ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
-
- target_case = None
- for c in ex_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId == req.case_id:
- target_case = c
- break
-
- if not target_case:
- raise HTTPException(status_code=404, detail="Case not found in external sources")
-
- source_cases = []
- if source_path.exists():
- with open(source_path, "r", encoding="utf-8") as f:
- s_data = json.load(f)
- source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
-
- # Check if already exists
- for c in source_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId == req.case_id:
- return {"status": "success", "message": "Already imported"}
-
- # Append
- source_cases.append(target_case)
-
- with open(source_path, "w", encoding="utf-8") as f:
- # Save as object format for source.json
- json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
-
- return {"status": "success"}
- @app.post("/api/requirements/{index}/import_all_source_ex")
- def import_all_source_ex(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_ex_path = dir_path / "source_ex.json"
- source_path = dir_path / "source.json"
-
- if not source_ex_path.exists():
- raise HTTPException(status_code=404, detail="source_ex.json not found")
-
- with open(source_ex_path, "r", encoding="utf-8") as f:
- ex_data = json.load(f)
-
- ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
-
- source_cases = []
- if source_path.exists():
- with open(source_path, "r", encoding="utf-8") as f:
- s_data = json.load(f)
- source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
-
- existing_ids = set()
- for c in source_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId:
- existing_ids.add(cId)
-
- added_count = 0
- for c in ex_cases:
- cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
- if cId and cId not in existing_ids:
- source_cases.append(c)
- existing_ids.add(cId)
- added_count += 1
-
- if added_count > 0:
- with open(source_path, "w", encoding="utf-8") as f:
- json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
-
- return {"status": "success", "count": added_count}
- @app.get("/api/requirements")
- def get_requirements():
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- results = []
- for i, req in enumerate(reqs):
- idx_str = f"{(i+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
-
- has_strategy = (dir_path / "strategy.json").exists()
- has_blueprint = (dir_path / "blueprint.json").exists()
- has_caps = (dir_path / "capabilities_extracted.json").exists()
-
- raw_cases_count = 0
- case_json_path = dir_path / "case.json"
- if case_json_path.exists():
- try:
- with open(case_json_path, 'r', encoding='utf-8') as f:
- case_data = json.load(f)
- raw_cases_count = len(case_data.get('cases', []))
- except Exception:
- pass
-
- if raw_cases_count == 0:
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists():
- raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
-
- status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
- if i in active_runs and active_runs[i].status == "running":
- status = "running"
-
- memo_content = ""
- memo_path = dir_path / "memo.txt"
- if memo_path.exists():
- with open(memo_path, "r", encoding="utf-8") as fm:
- memo_content = fm.read().strip()
- results.append({
- "index": i,
- "id": idx_str,
- "requirement": req,
- "status": status,
- "has_strategy": has_strategy,
- "has_blueprint": has_blueprint,
- "has_caps": has_caps,
- "raw_cases_count": raw_cases_count,
- "memo": memo_content
- })
-
- return results
- @app.put("/api/requirements/{index}")
- def update_requirement(index: int, req: RequirementUpdateRequest):
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- if index < 0 or index >= len(reqs):
- raise HTTPException(status_code=404, detail="Requirement index out of range")
-
- reqs[index] = req.requirement
-
- with open(DB_PATH, "w", encoding="utf-8") as f:
- json.dump(reqs, f, ensure_ascii=False, indent=2)
-
- return {"status": "success", "requirement": req.requirement}
- @app.post("/api/requirements/{index}/cases/filter")
- def filter_case(index: int, req: CaseFilterRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_path = dir_path / "source.json"
- filtered_path = dir_path / "filtered_cases.json"
-
- if not source_path.exists():
- raise HTTPException(status_code=404, detail="source.json not found")
-
- with open(source_path, "r", encoding="utf-8") as f:
- source_data = json.load(f)
-
- target_case = None
- new_sources = []
- for c in source_data.get("sources", []):
- c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id"))
- if c_id == req.case_id:
- target_case = c
- else:
- new_sources.append(c)
-
- if not target_case:
- raise HTTPException(status_code=404, detail="Case not found in source.json")
-
- source_data["sources"] = new_sources
- source_data["total"] = len(new_sources)
- with open(source_path, "w", encoding="utf-8") as f:
- json.dump(source_data, f, ensure_ascii=False, indent=2)
-
- filtered_data = {"total": 0, "by_reason": {}}
- if filtered_path.exists():
- with open(filtered_path, "r", encoding="utf-8") as f:
- filtered_data = json.load(f)
-
- reason = req.reason or "manual_delete"
- if reason not in filtered_data["by_reason"]:
- filtered_data["by_reason"][reason] = {"total": 0, "sources": []}
-
- # Copy and add filter_reason
- target_case["filter_reason"] = reason
- filtered_data["by_reason"][reason]["sources"].append(target_case)
- filtered_data["by_reason"][reason]["total"] = len(filtered_data["by_reason"][reason]["sources"])
-
- total_filtered = sum(r.get("total", 0) for r in filtered_data["by_reason"].values())
- filtered_data["total"] = total_filtered
-
- with open(filtered_path, "w", encoding="utf-8") as f:
- json.dump(filtered_data, f, ensure_ascii=False, indent=2)
-
- # Also attempt to remove from case.json if it exists
- case_path = dir_path.parent / "case.json"
- if case_path.exists():
- with open(case_path, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- if "cases" in case_data:
- original_len = len(case_data["cases"])
- case_data["cases"] = [c for c in case_data["cases"] if c.get("case_id") != req.case_id and c.get("_raw", {}).get("case_id") != req.case_id]
- if len(case_data["cases"]) != original_len:
- with open(case_path, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
-
- return {"status": "success"}
- @app.post("/api/requirements/{index}/cases/restore")
- def restore_case(index: int, req: CaseFilterRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str / "raw_cases"
- source_path = dir_path / "source.json"
- filtered_path = dir_path / "filtered_cases.json"
-
- if not filtered_path.exists():
- raise HTTPException(status_code=404, detail="filtered_cases.json not found")
-
- with open(filtered_path, "r", encoding="utf-8") as f:
- filtered_data = json.load(f)
-
- target_case = None
- # Find and remove
- for reason, reason_obj in filtered_data.get("by_reason", {}).items():
- new_sources = []
- for c in reason_obj.get("sources", []):
- c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id"))
- if c_id == req.case_id:
- target_case = c
- else:
- new_sources.append(c)
- if target_case:
- reason_obj["sources"] = new_sources
- reason_obj["total"] = len(new_sources)
- break
-
- if not target_case:
- raise HTTPException(status_code=404, detail="Case not found in filtered_cases.json")
-
- total_filtered = sum(r.get("total", 0) for r in filtered_data.get("by_reason", {}).values())
- filtered_data["total"] = total_filtered
-
- with open(filtered_path, "w", encoding="utf-8") as f:
- json.dump(filtered_data, f, ensure_ascii=False, indent=2)
-
- source_data = {"total": 0, "sources": []}
- if source_path.exists():
- with open(source_path, "r", encoding="utf-8") as f:
- source_data = json.load(f)
-
- # Clean up filter_reason
- target_case.pop("filter_reason", None)
-
- source_data["sources"].append(target_case)
- source_data["total"] = len(source_data["sources"])
-
- with open(source_path, "w", encoding="utf-8") as f:
- json.dump(source_data, f, ensure_ascii=False, indent=2)
-
- # Note: We do not automatically inject it back into case.json because case.json
- # requires format normalization (which generate_case.py does). The user must rerun step 1.6.
-
- return {"status": "success"}
- @app.post("/api/requirements/{index}/upload_cluster")
- async def upload_cluster(index: int, file: UploadFile = File(...)):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- cluster_path = dir_path / "cluster.json"
-
- content = await file.read()
- try:
- json_data = json.loads(content)
-
- # Load existing data or start new list
- existing_data = []
- if cluster_path.exists():
- with open(cluster_path, "r", encoding="utf-8") as f:
- try:
- existing = json.load(f)
- if isinstance(existing, list):
- existing_data = existing
- else:
- existing_data = [existing] # Wrap old format
- except:
- pass
-
- existing_data.append(json_data)
-
- with open(cluster_path, "w", encoding="utf-8") as f:
- json.dump(existing_data, f, ensure_ascii=False, indent=2)
-
- return {"status": "success"}
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
- @app.post("/api/requirements/{index}/save_cluster")
- async def save_cluster(index: int, request: Request):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- cluster_path = dir_path / "cluster.json"
-
- try:
- data = await request.json()
- with open(cluster_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- return {"status": "success"}
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Failed to save JSON: {str(e)}")
- @app.post("/api/requirements")
- def add_requirement(req: RequirementUpdateRequest):
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- reqs.append(req.requirement)
- new_index = len(reqs) - 1
-
- with open(DB_PATH, "w", encoding="utf-8") as f:
- json.dump(reqs, f, ensure_ascii=False, indent=2)
-
- # Pre-create output directory
- idx_str = f"{(new_index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
-
- return {"status": "success", "index": new_index, "requirement": req.requirement}
- @app.get("/api/requirements/{index}/data")
- def get_requirement_data(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
-
- def safe_load_json(p: Path):
- if not p.exists():
- return None
- content = ""
- try:
- with open(p, "r", encoding="utf-8") as f:
- content = f.read()
- return json.loads(content)
- except Exception as e:
- return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
- data = {
- "strategy": safe_load_json(dir_path / "strategy.json"),
- "blueprint": safe_load_json(dir_path / "process.json") or safe_load_json(dir_path / "blueprint.json"),
- "blueprint_temp": safe_load_json(dir_path / "blueprint_temp.json"),
- "cluster": safe_load_json(dir_path / "cluster.json"),
- "capabilities": safe_load_json(dir_path / "capabilities.json") or safe_load_json(dir_path / "capabilities_extracted.json"),
- "capabilities_temp": safe_load_json(dir_path / "capabilities_temp.json"),
- "raw_cases": {
- "case": safe_load_json(dir_path / "case.json")
- }
- }
-
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists():
- for f in raw_cases_dir.glob("*.json"):
- data["raw_cases"][f.stem] = safe_load_json(f)
-
- return data
- @app.get("/api/requirements/{index}/memo")
- def get_memo(index: int):
- idx_str = f"{(index+1):03d}"
- memo_path = OUTPUT_DIR / idx_str / "memo.txt"
- if memo_path.exists():
- with open(memo_path, "r", encoding="utf-8") as f:
- return {"memo": f.read()}
- return {"memo": ""}
- @app.get("/api/requirements/{index}/pipeline-status")
- def get_pipeline_status(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- raw_cases_dir = dir_path / "raw_cases"
-
- status = {
- "research": False,
- "source": False,
- "generate-case": False,
- "workflow-extract": False,
- "capability-extract-1": False,
- "process-cluster": False,
- "process-score": False,
- "capability-extract": False,
- "capability-enrich": False,
- "strategy": False
- }
-
- if raw_cases_dir.exists():
- if list(raw_cases_dir.glob("case_*.json")):
- status["research"] = True
- if (raw_cases_dir / "source.json").exists():
- status["source"] = True
-
- case_file = dir_path / "case.json"
- legacy_detailed = raw_cases_dir / "case_detailed.json"
-
- has_workflow = False
- has_capability = False
-
- if case_file.exists():
- status["generate-case"] = True
- try:
- with open(case_file, "r", encoding="utf-8") as f:
- cdata = json.load(f)
- if cdata.get("cases"):
- for c in cdata["cases"]:
- if c.get("workflow_groups"):
- has_workflow = True
- if any(
- isinstance(group, dict)
- and group.get("capability")
- for group in (c.get("workflow_groups") or [])
- ):
- has_capability = True
- if has_workflow and has_capability:
- break
- except Exception:
- pass
-
- if legacy_detailed.exists():
- status["generate-case"] = True
- has_workflow = True
-
- if has_workflow:
- status["workflow-extract"] = True
- if has_capability:
- status["capability-extract-1"] = True
-
- if (dir_path / "blueprint_temp.json").exists():
- status["process-cluster"] = True
- if (dir_path / "process.json").exists():
- status["process-score"] = True
- if (dir_path / "capabilities_temp.json").exists():
- status["capability-extract"] = True
- if (dir_path / "capabilities.json").exists():
- status["capability-enrich"] = True
- if (dir_path / "strategy.json").exists():
- status["strategy"] = True
-
- return status
- @app.post("/api/requirements/{index}/memo")
- def save_memo(index: int, req: MemoRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- memo_path = dir_path / "memo.txt"
- with open(memo_path, "w", encoding="utf-8") as f:
- f.write(req.memo)
- return {"status": "ok"}
- @app.get("/api/prompts")
- def list_prompts():
- if not PROMPTS_DIR.exists():
- return []
- return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
- @app.get("/api/prompts/{name}")
- def get_prompt(name: str):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_path = PROMPTS_DIR / name
- if not prompt_path.exists() or not prompt_path.is_file():
- raise HTTPException(status_code=404, detail="Prompt not found")
-
- schema_name = name.replace(".prompt", ".schema.json")
- schema_path = PROMPTS_DIR / schema_name
-
- schema_content = ""
- if schema_path.exists() and schema_path.is_file():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_content = f.read()
- with open(prompt_path, "r", encoding="utf-8") as f:
- return {"content": f.read(), "schema_content": schema_content}
- @app.post("/api/prompts/{name}")
- def save_prompt(name: str, req: PromptRequest):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_path = PROMPTS_DIR / name
- if not prompt_path.exists() or not prompt_path.is_file():
- raise HTTPException(status_code=404, detail="Prompt not found")
-
- schema_name = name.replace(".prompt", ".schema.json")
- schema_path = PROMPTS_DIR / schema_name
-
- if req.schema_content is not None:
- if req.schema_content.strip():
- try:
- parsed_schema = json.loads(req.schema_content)
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Invalid Schema JSON: {str(e)}")
- with open(schema_path, "w", encoding="utf-8") as f:
- json.dump(parsed_schema, f, ensure_ascii=False, indent=2)
- else:
- if schema_path.exists():
- schema_path.unlink() # remove if empty
-
- with open(prompt_path, "w", encoding="utf-8", newline="\n") as f:
- # Enforce Unix LF line endings to avoid unnecessary git diffs
- # and format inconsistencies between the file system and UI.
- f.write(req.content.replace('\r\n', '\n'))
- return {"status": "ok"}
- @app.post("/api/prompts/{name}/update_schema")
- async def api_update_schema(name: str):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_name = name.replace(".prompt", "")
-
- from script.update_schema import update_schema as script_update_schema
- try:
- # Calls the script to update the schema (this writes to the file)
- await script_update_schema(prompt_name=prompt_name, model="openai/gpt-5.4", dry_run=False)
-
- schema_name = name.replace(".prompt", ".schema.json")
- schema_path = PROMPTS_DIR / schema_name
-
- schema_content = ""
- if schema_path.exists() and schema_path.is_file():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_content = f.read()
-
- return {"status": "ok", "schema_content": schema_content}
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Failed to update schema: {str(e)}")
- async def run_pipeline_task(index: int, run_req: RunRequest):
- run_state = active_runs[index]
- run_state.status = "running"
-
- dir_path = OUTPUT_DIR / f"{(index+1):03d}"
-
- # We no longer manually delete files here for most modes,
- # run_pipeline.py handles overwriting when --only-step or --start-from is passed.
-
- # build command
- script_path = BASE_DIR / "run_pipeline.py"
- cmd = [sys.executable, str(script_path), "--index", str(index)]
-
- if run_req.platforms:
- cmd.extend(["--platforms", run_req.platforms])
- if run_req.use_claude_sdk:
- cmd.append("--use-claude-sdk")
-
- if run_req.only_step:
- cmd.extend(["--only-step", run_req.only_step])
- if run_req.case_index is not None:
- cmd.extend(["--case-index", str(run_req.case_index)])
- elif run_req.phase is not None:
- cmd.extend(["--phase", str(run_req.phase)])
- else:
- if run_req.start_from:
- cmd.extend(["--start-from", run_req.start_from])
- if run_req.end_at:
- cmd.extend(["--end-at", run_req.end_at])
- if run_req.restart_mode and run_req.restart_mode == "smart":
- pass # Smart mode requires no extra args
-
- run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
-
- import threading
- import subprocess
- import os
-
- def run_process():
- try:
- process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True,
- encoding="utf-8",
- bufsize=1,
- cwd=str(BASE_DIR),
- env=dict(os.environ, PYTHONIOENCODING="utf-8")
- )
- run_state.process = process
-
- for line in iter(process.stdout.readline, ''):
- if line:
- run_state.logs.append(line)
-
- process.stdout.close()
- return_code = process.wait()
-
- run_state.logs.append(f"\nProcess exited with code {return_code}")
- run_state.status = "completed" if return_code == 0 else "failed"
- except Exception as e:
- run_state.logs.append(f"\nException occurred: {repr(e)}")
- run_state.status = "failed"
-
- thread = threading.Thread(target=run_process)
- thread.start()
- @app.post("/api/pipeline/run/{index}")
- async def trigger_pipeline(index: int, req: RunRequest):
- if index in active_runs and active_runs[index].status == "running":
- raise HTTPException(status_code=400, detail="Pipeline already running for this index")
-
- active_runs[index] = ActiveRun()
- asyncio.create_task(run_pipeline_task(index, req))
- return {"message": "Pipeline started", "index": index}
- @app.post("/api/pipeline/stop/{index}")
- async def stop_pipeline(index: int):
- if index not in active_runs or active_runs[index].status != "running":
- raise HTTPException(status_code=400, detail="No running pipeline found for this index")
-
- process = active_runs[index].process
- if process:
- try:
- process.terminate()
- active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
- active_runs[index].status = "failed"
- return {"status": "stopped"}
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
- raise HTTPException(status_code=500, detail="Process handle missing")
- @app.get("/api/pipeline/status")
- def get_all_status():
- res = {}
- for idx, run in active_runs.items():
- res[idx] = {
- "status": run.status,
- "start_time": run.start_time,
- "logs": list(run.logs)
- }
- return res
- # Mount UI static files
- ui_dir = BASE_DIR / "ui"
- ui_dir.mkdir(exist_ok=True)
- app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
- # Mount output directory for local resources (like images)
- app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
- @app.get("/")
- def serve_ui():
- index_html = ui_dir / "index.html"
- if not index_html.exists():
- return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
- with open(index_html, "r", encoding="utf-8") as f:
- return HTMLResponse(f.read())
- if __name__ == "__main__":
- print("Starting Pipeline Dashboard server on http://127.0.0.0:18080")
- uvicorn.run("server:app", host="0.0.0.0", port=18080, reload=False)
|