import asyncio import json from pathlib import Path from typing import Dict, List, Optional from datetime import datetime from collections import deque from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from pydantic import BaseModel import uvicorn import sys app = FastAPI(title="Pipeline Dashboard") BASE_DIR = Path(__file__).parent OUTPUT_DIR = BASE_DIR / "output" DB_PATH = BASE_DIR / "db_requirements.json" PROMPTS_DIR = BASE_DIR / "prompts" # In-memory storage for active runs class ActiveRun: def __init__(self): self.process: Optional[asyncio.subprocess.Process] = None self.logs: deque = deque(maxlen=200) # Keep last 200 lines self.status: str = "starting" self.start_time: str = datetime.now().isoformat() active_runs: Dict[int, ActiveRun] = {} class RunRequest(BaseModel): skip_research: bool = False research_only: bool = False platforms: str = "xhs,youtube,bili,x" use_claude_sdk: bool = False restart_mode: str = "smart" class MemoRequest(BaseModel): memo: str class PromptRequest(BaseModel): content: str @app.get("/api/requirements") def get_requirements(): if not DB_PATH.exists(): raise HTTPException(status_code=404, detail="db_requirements.json not found") with open(DB_PATH, "r", encoding="utf-8") as f: reqs = json.load(f) results = [] for i, req in enumerate(reqs): idx_str = f"{(i+1):03d}" dir_path = OUTPUT_DIR / idx_str has_strategy = (dir_path / "strategy.json").exists() has_blueprint = (dir_path / "blueprint.json").exists() has_caps = (dir_path / "capabilities_extracted.json").exists() raw_cases_count = 0 raw_cases_dir = dir_path / "raw_cases" if raw_cases_dir.exists(): raw_cases_count = len(list(raw_cases_dir.glob("case_*.json"))) status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending") if i in active_runs and active_runs[i].status == "running": status = "running" memo_content = "" memo_path = dir_path / "memo.txt" if memo_path.exists(): with open(memo_path, "r", encoding="utf-8") as fm: memo_content = fm.read().strip() results.append({ "index": i, "id": idx_str, "requirement": req, "status": status, "has_strategy": has_strategy, "has_blueprint": has_blueprint, "has_caps": has_caps, "raw_cases_count": raw_cases_count, "memo": memo_content }) return results @app.get("/api/requirements/{index}/data") def get_requirement_data(index: int): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str def safe_load_json(p: Path): if not p.exists(): return None try: with open(p, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {"error": "Failed to parse JSON"} data = { "strategy": safe_load_json(dir_path / "strategy.json"), "blueprint": safe_load_json(dir_path / "blueprint.json"), "capabilities": safe_load_json(dir_path / "capabilities_extracted.json"), "raw_cases": {} } raw_cases_dir = dir_path / "raw_cases" if raw_cases_dir.exists(): for f in raw_cases_dir.glob("case_*.json"): data["raw_cases"][f.stem] = safe_load_json(f) return data @app.get("/api/requirements/{index}/memo") def get_memo(index: int): idx_str = f"{(index+1):03d}" memo_path = OUTPUT_DIR / idx_str / "memo.txt" if memo_path.exists(): with open(memo_path, "r", encoding="utf-8") as f: return {"memo": f.read()} return {"memo": ""} @app.post("/api/requirements/{index}/memo") def save_memo(index: int, req: MemoRequest): idx_str = f"{(index+1):03d}" dir_path = OUTPUT_DIR / idx_str dir_path.mkdir(parents=True, exist_ok=True) memo_path = dir_path / "memo.txt" with open(memo_path, "w", encoding="utf-8") as f: f.write(req.memo) return {"status": "ok"} @app.get("/api/prompts") def list_prompts(): if not PROMPTS_DIR.exists(): return [] return [f.name for f in PROMPTS_DIR.glob("*.prompt")] @app.get("/api/prompts/{name}") def get_prompt(name: str): if "/" in name or "\\" in name: raise HTTPException(status_code=400, detail="Invalid prompt name") prompt_path = PROMPTS_DIR / name if not prompt_path.exists() or not prompt_path.is_file(): raise HTTPException(status_code=404, detail="Prompt not found") with open(prompt_path, "r", encoding="utf-8") as f: return {"content": f.read()} @app.post("/api/prompts/{name}") def save_prompt(name: str, req: PromptRequest): if "/" in name or "\\" in name: raise HTTPException(status_code=400, detail="Invalid prompt name") prompt_path = PROMPTS_DIR / name if not prompt_path.exists() or not prompt_path.is_file(): raise HTTPException(status_code=404, detail="Prompt not found") with open(prompt_path, "w", encoding="utf-8") as f: f.write(req.content) return {"status": "ok"} async def run_pipeline_task(index: int, run_req: RunRequest): run_state = active_runs[index] run_state.status = "running" dir_path = OUTPUT_DIR / f"{(index+1):03d}" mode = run_req.restart_mode if mode in ['phase1_platforms', 'phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3']: if (dir_path / "strategy.json").exists(): (dir_path / "strategy.json").unlink() if mode in ['phase1_platforms', 'phase2_all', 'phase2_blueprint']: if (dir_path / "blueprint.json").exists(): (dir_path / "blueprint.json").unlink() if mode in ['phase1_platforms', 'phase2_all', 'phase2_capabilities']: if (dir_path / "capabilities_extracted.json").exists(): (dir_path / "capabilities_extracted.json").unlink() if mode == 'phase1_platforms': raw_cases_dir = dir_path / "raw_cases" if raw_cases_dir.exists() and run_req.platforms: plats = [p.strip() for p in run_req.platforms.split(",") if p.strip()] for p in plats: f = raw_cases_dir / f"case_{p}.json" if f.exists(): f.unlink() run_req.skip_research = False elif mode in ['phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3']: run_req.skip_research = True # build command script_path = BASE_DIR / "run_pipeline.py" cmd = [sys.executable, str(script_path), "--index", str(index)] if run_req.skip_research: cmd.append("--skip-research") if run_req.research_only: cmd.append("--research-only") if run_req.platforms: cmd.extend(["--platforms", run_req.platforms]) if run_req.use_claude_sdk: cmd.append("--use-claude-sdk") run_state.logs.append(f"Starting command: {' '.join(cmd)}\n") import threading import subprocess import os def run_process(): try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", bufsize=1, cwd=str(BASE_DIR), env=dict(os.environ, PYTHONIOENCODING="utf-8") ) run_state.process = process for line in iter(process.stdout.readline, ''): if line: run_state.logs.append(line) process.stdout.close() return_code = process.wait() run_state.logs.append(f"\nProcess exited with code {return_code}") run_state.status = "completed" if return_code == 0 else "failed" except Exception as e: run_state.logs.append(f"\nException occurred: {repr(e)}") run_state.status = "failed" thread = threading.Thread(target=run_process) thread.start() @app.post("/api/pipeline/run/{index}") async def trigger_pipeline(index: int, req: RunRequest): if index in active_runs and active_runs[index].status == "running": raise HTTPException(status_code=400, detail="Pipeline already running for this index") active_runs[index] = ActiveRun() asyncio.create_task(run_pipeline_task(index, req)) return {"message": "Pipeline started", "index": index} @app.get("/api/pipeline/status") def get_all_status(): res = {} for idx, run in active_runs.items(): res[idx] = { "status": run.status, "start_time": run.start_time, "logs": list(run.logs) } return res # Mount UI static files ui_dir = BASE_DIR / "ui" ui_dir.mkdir(exist_ok=True) app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static") @app.get("/") def serve_ui(): index_html = ui_dir / "index.html" if not index_html.exists(): return HTMLResponse("UI not found. Please create ui/index.html", status_code=404) with open(index_html, "r", encoding="utf-8") as f: return HTMLResponse(f.read()) if __name__ == "__main__": print("Starting Pipeline Dashboard server on http://127.0.0.1:8080") uvicorn.run("server:app", host="0.0.0.0", port=8080, reload=False)