import asyncio import json from pathlib import Path from typing import Dict, List, Optional from datetime import datetime from collections import deque from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, UploadFile, File from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from pydantic import BaseModel import uvicorn import sys app = FastAPI(title="Pipeline Dashboard") BASE_DIR = Path(__file__).parent OUTPUT_DIR = BASE_DIR / "output" DB_PATH = BASE_DIR / "db_requirements.json" PROMPTS_DIR = BASE_DIR / "prompts" # In-memory storage for active runs class ActiveRun: def __init__(self): self.process: Optional[asyncio.subprocess.Process] = None self.logs: deque = deque(maxlen=200) # Keep last 200 lines self.status: str = "starting" self.start_time: str = datetime.now().isoformat() active_runs: Dict[int, ActiveRun] = {} @app.post("/api/requirements/{index}/upload_source_ex") async def upload_source_ex(index: int, file: UploadFile = File(...)): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str / "raw_cases" dir_path.mkdir(parents=True, exist_ok=True) file_path = dir_path / "source_ex.json" content = await file.read() with open(file_path, "wb") as f: f.write(content) return {"status": "success", "filename": "source_ex.json"} class RunRequest(BaseModel): platforms: Optional[str] = None use_claude_sdk: bool = False restart_mode: Optional[str] = None phase: Optional[int] = None only_step: Optional[str] = None start_from: Optional[str] = None end_at: Optional[str] = None case_index: Optional[int] = None class MemoRequest(BaseModel): memo: str class RequirementUpdateRequest(BaseModel): requirement: str class PromptRequest(BaseModel): content: str schema_content: Optional[str] = None class ImportExternalRequest(BaseModel): case_id: str class CaseFilterRequest(BaseModel): case_id: str reason: Optional[str] = "manual_delete" @app.post("/api/requirements/{index}/import_source_ex") def import_source_ex(index: int, req: ImportExternalRequest): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str / "raw_cases" source_ex_path = dir_path / "source_ex.json" source_path = dir_path / "source.json" if not source_ex_path.exists(): raise HTTPException(status_code=404, detail="source_ex.json not found") with open(source_ex_path, "r", encoding="utf-8") as f: ex_data = json.load(f) ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or []) target_case = None for c in ex_cases: cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id")) if cId == req.case_id: target_case = c break if not target_case: raise HTTPException(status_code=404, detail="Case not found in external sources") source_cases = [] if source_path.exists(): with open(source_path, "r", encoding="utf-8") as f: s_data = json.load(f) source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or []) # Check if already exists for c in source_cases: cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id")) if cId == req.case_id: return {"status": "success", "message": "Already imported"} # Append source_cases.append(target_case) with open(source_path, "w", encoding="utf-8") as f: # Save as object format for source.json json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2) return {"status": "success"} @app.post("/api/requirements/{index}/import_all_source_ex") def import_all_source_ex(index: int): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str / "raw_cases" source_ex_path = dir_path / "source_ex.json" source_path = dir_path / "source.json" if not source_ex_path.exists(): raise HTTPException(status_code=404, detail="source_ex.json not found") with open(source_ex_path, "r", encoding="utf-8") as f: ex_data = json.load(f) ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or []) source_cases = [] if source_path.exists(): with open(source_path, "r", encoding="utf-8") as f: s_data = json.load(f) source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or []) existing_ids = set() for c in source_cases: cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id")) if cId: existing_ids.add(cId) added_count = 0 for c in ex_cases: cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id")) if cId and cId not in existing_ids: source_cases.append(c) existing_ids.add(cId) added_count += 1 if added_count > 0: with open(source_path, "w", encoding="utf-8") as f: json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2) return {"status": "success", "count": added_count} @app.get("/api/requirements") def get_requirements(): if not DB_PATH.exists(): raise HTTPException(status_code=404, detail="db_requirements.json not found") with open(DB_PATH, "r", encoding="utf-8") as f: reqs = json.load(f) results = [] for i, req in enumerate(reqs): idx_str = f"{(i+1):03d}" dir_path = OUTPUT_DIR / idx_str has_strategy = (dir_path / "strategy.json").exists() has_blueprint = (dir_path / "blueprint.json").exists() has_caps = (dir_path / "capabilities_extracted.json").exists() raw_cases_count = 0 case_json_path = dir_path / "case.json" if case_json_path.exists(): try: with open(case_json_path, 'r', encoding='utf-8') as f: case_data = json.load(f) raw_cases_count = len(case_data.get('cases', [])) except Exception: pass if raw_cases_count == 0: raw_cases_dir = dir_path / "raw_cases" if raw_cases_dir.exists(): raw_cases_count = len(list(raw_cases_dir.glob("case_*.json"))) status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending") if i in active_runs and active_runs[i].status == "running": status = "running" memo_content = "" memo_path = dir_path / "memo.txt" if memo_path.exists(): with open(memo_path, "r", encoding="utf-8") as fm: memo_content = fm.read().strip() results.append({ "index": i, "id": idx_str, "requirement": req, "status": status, "has_strategy": has_strategy, "has_blueprint": has_blueprint, "has_caps": has_caps, "raw_cases_count": raw_cases_count, "memo": memo_content }) return results @app.put("/api/requirements/{index}") def update_requirement(index: int, req: RequirementUpdateRequest): if not DB_PATH.exists(): raise HTTPException(status_code=404, detail="db_requirements.json not found") with open(DB_PATH, "r", encoding="utf-8") as f: reqs = json.load(f) if index < 0 or index >= len(reqs): raise HTTPException(status_code=404, detail="Requirement index out of range") reqs[index] = req.requirement with open(DB_PATH, "w", encoding="utf-8") as f: json.dump(reqs, f, ensure_ascii=False, indent=2) return {"status": "success", "requirement": req.requirement} @app.post("/api/requirements/{index}/cases/filter") def filter_case(index: int, req: CaseFilterRequest): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str / "raw_cases" source_path = dir_path / "source.json" filtered_path = dir_path / "filtered_cases.json" if not source_path.exists(): raise HTTPException(status_code=404, detail="source.json not found") with open(source_path, "r", encoding="utf-8") as f: source_data = json.load(f) target_case = None new_sources = [] for c in source_data.get("sources", []): c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id")) if c_id == req.case_id: target_case = c else: new_sources.append(c) if not target_case: raise HTTPException(status_code=404, detail="Case not found in source.json") source_data["sources"] = new_sources source_data["total"] = len(new_sources) with open(source_path, "w", encoding="utf-8") as f: json.dump(source_data, f, ensure_ascii=False, indent=2) filtered_data = {"total": 0, "by_reason": {}} if filtered_path.exists(): with open(filtered_path, "r", encoding="utf-8") as f: filtered_data = json.load(f) reason = req.reason or "manual_delete" if reason not in filtered_data["by_reason"]: filtered_data["by_reason"][reason] = {"total": 0, "sources": []} # Copy and add filter_reason target_case["filter_reason"] = reason filtered_data["by_reason"][reason]["sources"].append(target_case) filtered_data["by_reason"][reason]["total"] = len(filtered_data["by_reason"][reason]["sources"]) total_filtered = sum(r.get("total", 0) for r in filtered_data["by_reason"].values()) filtered_data["total"] = total_filtered with open(filtered_path, "w", encoding="utf-8") as f: json.dump(filtered_data, f, ensure_ascii=False, indent=2) # Also attempt to remove from case.json if it exists case_path = dir_path.parent / "case.json" if case_path.exists(): with open(case_path, "r", encoding="utf-8") as f: case_data = json.load(f) if "cases" in case_data: original_len = len(case_data["cases"]) case_data["cases"] = [c for c in case_data["cases"] if c.get("case_id") != req.case_id and c.get("_raw", {}).get("case_id") != req.case_id] if len(case_data["cases"]) != original_len: with open(case_path, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) return {"status": "success"} @app.post("/api/requirements/{index}/cases/restore") def restore_case(index: int, req: CaseFilterRequest): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str / "raw_cases" source_path = dir_path / "source.json" filtered_path = dir_path / "filtered_cases.json" if not filtered_path.exists(): raise HTTPException(status_code=404, detail="filtered_cases.json not found") with open(filtered_path, "r", encoding="utf-8") as f: filtered_data = json.load(f) target_case = None # Find and remove for reason, reason_obj in filtered_data.get("by_reason", {}).items(): new_sources = [] for c in reason_obj.get("sources", []): c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id")) if c_id == req.case_id: target_case = c else: new_sources.append(c) if target_case: reason_obj["sources"] = new_sources reason_obj["total"] = len(new_sources) break if not target_case: raise HTTPException(status_code=404, detail="Case not found in filtered_cases.json") total_filtered = sum(r.get("total", 0) for r in filtered_data.get("by_reason", {}).values()) filtered_data["total"] = total_filtered with open(filtered_path, "w", encoding="utf-8") as f: json.dump(filtered_data, f, ensure_ascii=False, indent=2) source_data = {"total": 0, "sources": []} if source_path.exists(): with open(source_path, "r", encoding="utf-8") as f: source_data = json.load(f) # Clean up filter_reason target_case.pop("filter_reason", None) source_data["sources"].append(target_case) source_data["total"] = len(source_data["sources"]) with open(source_path, "w", encoding="utf-8") as f: json.dump(source_data, f, ensure_ascii=False, indent=2) # Note: We do not automatically inject it back into case.json because case.json # requires format normalization (which generate_case.py does). The user must rerun step 1.6. return {"status": "success"} @app.post("/api/requirements/{index}/upload_cluster") async def upload_cluster(index: int, file: UploadFile = File(...)): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str dir_path.mkdir(parents=True, exist_ok=True) cluster_path = dir_path / "cluster.json" content = await file.read() try: json_data = json.loads(content) # Load existing data or start new list existing_data = [] if cluster_path.exists(): with open(cluster_path, "r", encoding="utf-8") as f: try: existing = json.load(f) if isinstance(existing, list): existing_data = existing else: existing_data = [existing] # Wrap old format except: pass existing_data.append(json_data) with open(cluster_path, "w", encoding="utf-8") as f: json.dump(existing_data, f, ensure_ascii=False, indent=2) return {"status": "success"} except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") @app.post("/api/requirements/{index}/save_cluster") async def save_cluster(index: int, request: Request): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str dir_path.mkdir(parents=True, exist_ok=True) cluster_path = dir_path / "cluster.json" try: data = await request.json() with open(cluster_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return {"status": "success"} except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to save JSON: {str(e)}") @app.post("/api/requirements") def add_requirement(req: RequirementUpdateRequest): if not DB_PATH.exists(): raise HTTPException(status_code=404, detail="db_requirements.json not found") with open(DB_PATH, "r", encoding="utf-8") as f: reqs = json.load(f) reqs.append(req.requirement) new_index = len(reqs) - 1 with open(DB_PATH, "w", encoding="utf-8") as f: json.dump(reqs, f, ensure_ascii=False, indent=2) # Pre-create output directory idx_str = f"{(new_index+1):03d}" dir_path = OUTPUT_DIR / idx_str dir_path.mkdir(parents=True, exist_ok=True) return {"status": "success", "index": new_index, "requirement": req.requirement} @app.get("/api/requirements/{index}/data") def get_requirement_data(index: int): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str def safe_load_json(p: Path): if not p.exists(): return None content = "" try: with open(p, "r", encoding="utf-8") as f: content = f.read() return json.loads(content) except Exception as e: return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)} data = { "strategy": safe_load_json(dir_path / "strategy.json"), "blueprint": safe_load_json(dir_path / "process.json") or safe_load_json(dir_path / "blueprint.json"), "blueprint_temp": safe_load_json(dir_path / "blueprint_temp.json"), "cluster": safe_load_json(dir_path / "cluster.json"), "capabilities": safe_load_json(dir_path / "capabilities.json") or safe_load_json(dir_path / "capabilities_extracted.json"), "capabilities_temp": safe_load_json(dir_path / "capabilities_temp.json"), "raw_cases": { "case": safe_load_json(dir_path / "case.json") } } raw_cases_dir = dir_path / "raw_cases" if raw_cases_dir.exists(): for f in raw_cases_dir.glob("*.json"): data["raw_cases"][f.stem] = safe_load_json(f) return data @app.get("/api/requirements/{index}/memo") def get_memo(index: int): idx_str = f"{(index+1):03d}" memo_path = OUTPUT_DIR / idx_str / "memo.txt" if memo_path.exists(): with open(memo_path, "r", encoding="utf-8") as f: return {"memo": f.read()} return {"memo": ""} @app.get("/api/requirements/{index}/pipeline-status") def get_pipeline_status(index: int): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str raw_cases_dir = dir_path / "raw_cases" status = { "research": False, "source": False, "generate-case": False, "workflow-extract": False, "capability-extract-1": False, "process-cluster": False, "process-score": False, "capability-extract": False, "capability-enrich": False, "strategy": False } if raw_cases_dir.exists(): if list(raw_cases_dir.glob("case_*.json")): status["research"] = True if (raw_cases_dir / "source.json").exists(): status["source"] = True case_file = dir_path / "case.json" legacy_detailed = raw_cases_dir / "case_detailed.json" has_workflow = False has_capability = False if case_file.exists(): status["generate-case"] = True try: with open(case_file, "r", encoding="utf-8") as f: cdata = json.load(f) if cdata.get("cases"): for c in cdata["cases"]: if c.get("workflow"): has_workflow = True if c.get("capability"): has_capability = True if has_workflow and has_capability: break except Exception: pass if legacy_detailed.exists(): status["generate-case"] = True has_workflow = True if has_workflow: status["workflow-extract"] = True if has_capability: status["capability-extract-1"] = True if (dir_path / "blueprint_temp.json").exists(): status["process-cluster"] = True if (dir_path / "process.json").exists(): status["process-score"] = True if (dir_path / "capabilities_temp.json").exists(): status["capability-extract"] = True if (dir_path / "capabilities.json").exists(): status["capability-enrich"] = True if (dir_path / "strategy.json").exists(): status["strategy"] = True return status @app.post("/api/requirements/{index}/memo") def save_memo(index: int, req: MemoRequest): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str dir_path.mkdir(parents=True, exist_ok=True) memo_path = dir_path / "memo.txt" with open(memo_path, "w", encoding="utf-8") as f: f.write(req.memo) return {"status": "ok"} @app.get("/api/prompts") def list_prompts(): if not PROMPTS_DIR.exists(): return [] return [f.name for f in PROMPTS_DIR.glob("*.prompt")] @app.get("/api/prompts/{name}") def get_prompt(name: str): if "/" in name or "\\" in name: raise HTTPException(status_code=400, detail="Invalid prompt name") prompt_path = PROMPTS_DIR / name if not prompt_path.exists() or not prompt_path.is_file(): raise HTTPException(status_code=404, detail="Prompt not found") schema_name = name.replace(".prompt", ".schema.json") schema_path = PROMPTS_DIR / schema_name schema_content = "" if schema_path.exists() and schema_path.is_file(): with open(schema_path, "r", encoding="utf-8") as f: schema_content = f.read() with open(prompt_path, "r", encoding="utf-8") as f: return {"content": f.read(), "schema_content": schema_content} @app.post("/api/prompts/{name}") def save_prompt(name: str, req: PromptRequest): if "/" in name or "\\" in name: raise HTTPException(status_code=400, detail="Invalid prompt name") prompt_path = PROMPTS_DIR / name if not prompt_path.exists() or not prompt_path.is_file(): raise HTTPException(status_code=404, detail="Prompt not found") schema_name = name.replace(".prompt", ".schema.json") schema_path = PROMPTS_DIR / schema_name if req.schema_content is not None: if req.schema_content.strip(): try: parsed_schema = json.loads(req.schema_content) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid Schema JSON: {str(e)}") with open(schema_path, "w", encoding="utf-8") as f: json.dump(parsed_schema, f, ensure_ascii=False, indent=2) else: if schema_path.exists(): schema_path.unlink() # remove if empty with open(prompt_path, "w", encoding="utf-8", newline="\n") as f: # Enforce Unix LF line endings to avoid unnecessary git diffs # and format inconsistencies between the file system and UI. f.write(req.content.replace('\r\n', '\n')) return {"status": "ok"} @app.post("/api/prompts/{name}/update_schema") async def api_update_schema(name: str): if "/" in name or "\\" in name: raise HTTPException(status_code=400, detail="Invalid prompt name") prompt_name = name.replace(".prompt", "") from script.update_schema import update_schema as script_update_schema try: # Calls the script to update the schema (this writes to the file) await script_update_schema(prompt_name=prompt_name, model="openai/gpt-5.4", dry_run=False) schema_name = name.replace(".prompt", ".schema.json") schema_path = PROMPTS_DIR / schema_name schema_content = "" if schema_path.exists() and schema_path.is_file(): with open(schema_path, "r", encoding="utf-8") as f: schema_content = f.read() return {"status": "ok", "schema_content": schema_content} except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Failed to update schema: {str(e)}") async def run_pipeline_task(index: int, run_req: RunRequest): run_state = active_runs[index] run_state.status = "running" dir_path = OUTPUT_DIR / f"{(index+1):03d}" # We no longer manually delete files here for most modes, # run_pipeline.py handles overwriting when --only-step or --start-from is passed. # build command script_path = BASE_DIR / "run_pipeline.py" cmd = [sys.executable, str(script_path), "--index", str(index)] if run_req.platforms: cmd.extend(["--platforms", run_req.platforms]) if run_req.use_claude_sdk: cmd.append("--use-claude-sdk") if run_req.only_step: cmd.extend(["--only-step", run_req.only_step]) if run_req.case_index is not None: cmd.extend(["--case-index", str(run_req.case_index)]) elif run_req.phase is not None: cmd.extend(["--phase", str(run_req.phase)]) else: if run_req.start_from: cmd.extend(["--start-from", run_req.start_from]) if run_req.end_at: cmd.extend(["--end-at", run_req.end_at]) if run_req.restart_mode and run_req.restart_mode == "smart": pass # Smart mode requires no extra args run_state.logs.append(f"Starting command: {' '.join(cmd)}\n") import threading import subprocess import os def run_process(): try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", bufsize=1, cwd=str(BASE_DIR), env=dict(os.environ, PYTHONIOENCODING="utf-8") ) run_state.process = process for line in iter(process.stdout.readline, ''): if line: run_state.logs.append(line) process.stdout.close() return_code = process.wait() run_state.logs.append(f"\nProcess exited with code {return_code}") run_state.status = "completed" if return_code == 0 else "failed" except Exception as e: run_state.logs.append(f"\nException occurred: {repr(e)}") run_state.status = "failed" thread = threading.Thread(target=run_process) thread.start() @app.post("/api/pipeline/run/{index}") async def trigger_pipeline(index: int, req: RunRequest): if index in active_runs and active_runs[index].status == "running": raise HTTPException(status_code=400, detail="Pipeline already running for this index") active_runs[index] = ActiveRun() asyncio.create_task(run_pipeline_task(index, req)) return {"message": "Pipeline started", "index": index} @app.post("/api/pipeline/stop/{index}") async def stop_pipeline(index: int): if index not in active_runs or active_runs[index].status != "running": raise HTTPException(status_code=400, detail="No running pipeline found for this index") process = active_runs[index].process if process: try: process.terminate() active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n") active_runs[index].status = "failed" return {"status": "stopped"} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}") raise HTTPException(status_code=500, detail="Process handle missing") @app.get("/api/pipeline/status") def get_all_status(): res = {} for idx, run in active_runs.items(): res[idx] = { "status": run.status, "start_time": run.start_time, "logs": list(run.logs) } return res # Mount UI static files ui_dir = BASE_DIR / "ui" ui_dir.mkdir(exist_ok=True) app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static") # Mount output directory for local resources (like images) app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output") @app.get("/") def serve_ui(): index_html = ui_dir / "index.html" if not index_html.exists(): return HTMLResponse("UI not found. Please create ui/index.html", status_code=404) with open(index_html, "r", encoding="utf-8") as f: return HTMLResponse(f.read()) if __name__ == "__main__": print("Starting Pipeline Dashboard server on http://127.0.0.0:18080") uvicorn.run("server:app", host="0.0.0.0", port=18080, reload=False)