| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional
- from datetime import datetime
- from collections import deque
- from fastapi import FastAPI, HTTPException, Request
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import HTMLResponse
- from pydantic import BaseModel
- import uvicorn
- import sys
- app = FastAPI(title="Pipeline Dashboard")
- BASE_DIR = Path(__file__).parent
- OUTPUT_DIR = BASE_DIR / "output"
- DB_PATH = BASE_DIR / "db_requirements.json"
- PROMPTS_DIR = BASE_DIR / "prompts"
- # In-memory storage for active runs
- class ActiveRun:
- def __init__(self):
- self.process: Optional[asyncio.subprocess.Process] = None
- self.logs: deque = deque(maxlen=200) # Keep last 200 lines
- self.status: str = "starting"
- self.start_time: str = datetime.now().isoformat()
- active_runs: Dict[int, ActiveRun] = {}
- class RunRequest(BaseModel):
- skip_research: bool = False
- research_only: bool = False
- platforms: str = "xhs,youtube,bili,x"
- use_claude_sdk: bool = False
- restart_mode: str = "smart"
- class MemoRequest(BaseModel):
- memo: str
- class PromptRequest(BaseModel):
- content: str
- @app.get("/api/requirements")
- def get_requirements():
- if not DB_PATH.exists():
- raise HTTPException(status_code=404, detail="db_requirements.json not found")
-
- with open(DB_PATH, "r", encoding="utf-8") as f:
- reqs = json.load(f)
-
- results = []
- for i, req in enumerate(reqs):
- idx_str = f"{(i+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
-
- has_strategy = (dir_path / "strategy.json").exists()
- has_blueprint = (dir_path / "blueprint.json").exists()
- has_caps = (dir_path / "capabilities_extracted.json").exists()
-
- raw_cases_count = 0
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists():
- raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
-
- status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
- if i in active_runs and active_runs[i].status == "running":
- status = "running"
-
- memo_content = ""
- memo_path = dir_path / "memo.txt"
- if memo_path.exists():
- with open(memo_path, "r", encoding="utf-8") as fm:
- memo_content = fm.read().strip()
- results.append({
- "index": i,
- "id": idx_str,
- "requirement": req,
- "status": status,
- "has_strategy": has_strategy,
- "has_blueprint": has_blueprint,
- "has_caps": has_caps,
- "raw_cases_count": raw_cases_count,
- "memo": memo_content
- })
-
- return results
- @app.get("/api/requirements/{index}/data")
- def get_requirement_data(index: int):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
-
- def safe_load_json(p: Path):
- if not p.exists():
- return None
- content = ""
- try:
- with open(p, "r", encoding="utf-8") as f:
- content = f.read()
- return json.loads(content)
- except Exception as e:
- return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
- data = {
- "strategy": safe_load_json(dir_path / "strategy.json"),
- "blueprint": safe_load_json(dir_path / "blueprint.json"),
- "capabilities": safe_load_json(dir_path / "capabilities_extracted.json"),
- "raw_cases": {}
- }
-
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists():
- for f in raw_cases_dir.glob("*.json"):
- data["raw_cases"][f.stem] = safe_load_json(f)
-
- return data
- @app.get("/api/requirements/{index}/memo")
- def get_memo(index: int):
- idx_str = f"{(index+1):03d}"
- memo_path = OUTPUT_DIR / idx_str / "memo.txt"
- if memo_path.exists():
- with open(memo_path, "r", encoding="utf-8") as f:
- return {"memo": f.read()}
- return {"memo": ""}
- @app.post("/api/requirements/{index}/memo")
- def save_memo(index: int, req: MemoRequest):
- idx_str = f"{(index+1):03d}"
- dir_path = OUTPUT_DIR / idx_str
- dir_path.mkdir(parents=True, exist_ok=True)
- memo_path = dir_path / "memo.txt"
- with open(memo_path, "w", encoding="utf-8") as f:
- f.write(req.memo)
- return {"status": "ok"}
- @app.get("/api/prompts")
- def list_prompts():
- if not PROMPTS_DIR.exists():
- return []
- return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
- @app.get("/api/prompts/{name}")
- def get_prompt(name: str):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_path = PROMPTS_DIR / name
- if not prompt_path.exists() or not prompt_path.is_file():
- raise HTTPException(status_code=404, detail="Prompt not found")
- with open(prompt_path, "r", encoding="utf-8") as f:
- return {"content": f.read()}
- @app.post("/api/prompts/{name}")
- def save_prompt(name: str, req: PromptRequest):
- if "/" in name or "\\" in name:
- raise HTTPException(status_code=400, detail="Invalid prompt name")
- prompt_path = PROMPTS_DIR / name
- if not prompt_path.exists() or not prompt_path.is_file():
- raise HTTPException(status_code=404, detail="Prompt not found")
- with open(prompt_path, "w", encoding="utf-8") as f:
- f.write(req.content)
- return {"status": "ok"}
- async def run_pipeline_task(index: int, run_req: RunRequest):
- run_state = active_runs[index]
- run_state.status = "running"
-
- dir_path = OUTPUT_DIR / f"{(index+1):03d}"
-
- mode = run_req.restart_mode
-
- if mode in ['phase1_platforms', 'phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3', 'single_strategy']:
- if (dir_path / "strategy.json").exists(): (dir_path / "strategy.json").unlink()
-
- if mode in ['phase1_platforms', 'phase2_all', 'phase2_blueprint', 'single_blueprint']:
- if (dir_path / "blueprint.json").exists(): (dir_path / "blueprint.json").unlink()
-
- if mode in ['phase1_platforms', 'phase2_all', 'phase2_capabilities', 'single_capabilities']:
- if (dir_path / "capabilities_extracted.json").exists(): (dir_path / "capabilities_extracted.json").unlink()
-
- if mode in ['phase1_platforms', 'single_platforms']:
- raw_cases_dir = dir_path / "raw_cases"
- if raw_cases_dir.exists() and run_req.platforms:
- plats = [p.strip() for p in run_req.platforms.split(",") if p.strip()]
- for p in plats:
- f = raw_cases_dir / f"case_{p}.json"
- if f.exists():
- f.unlink()
- run_req.skip_research = False
- elif mode in ['phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3', 'single_blueprint', 'single_capabilities', 'single_strategy']:
- run_req.skip_research = True
-
- # build command
- script_path = BASE_DIR / "run_pipeline.py"
- cmd = [sys.executable, str(script_path), "--index", str(index)]
- if run_req.skip_research:
- cmd.append("--skip-research")
- if run_req.research_only:
- cmd.append("--research-only")
- if run_req.platforms and not run_req.skip_research:
- cmd.extend(["--platforms", run_req.platforms])
- if run_req.use_claude_sdk:
- cmd.append("--use-claude-sdk")
- if run_req.restart_mode:
- cmd.extend(["--restart-mode", run_req.restart_mode])
-
- run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
-
- import threading
- import subprocess
- import os
-
- def run_process():
- try:
- process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True,
- encoding="utf-8",
- bufsize=1,
- cwd=str(BASE_DIR),
- env=dict(os.environ, PYTHONIOENCODING="utf-8")
- )
- run_state.process = process
-
- for line in iter(process.stdout.readline, ''):
- if line:
- run_state.logs.append(line)
-
- process.stdout.close()
- return_code = process.wait()
-
- run_state.logs.append(f"\nProcess exited with code {return_code}")
- run_state.status = "completed" if return_code == 0 else "failed"
- except Exception as e:
- run_state.logs.append(f"\nException occurred: {repr(e)}")
- run_state.status = "failed"
-
- thread = threading.Thread(target=run_process)
- thread.start()
- @app.post("/api/pipeline/run/{index}")
- async def trigger_pipeline(index: int, req: RunRequest):
- if index in active_runs and active_runs[index].status == "running":
- raise HTTPException(status_code=400, detail="Pipeline already running for this index")
-
- active_runs[index] = ActiveRun()
- asyncio.create_task(run_pipeline_task(index, req))
- return {"message": "Pipeline started", "index": index}
- @app.post("/api/pipeline/stop/{index}")
- async def stop_pipeline(index: int):
- if index not in active_runs or active_runs[index].status != "running":
- raise HTTPException(status_code=400, detail="No running pipeline found for this index")
-
- process = active_runs[index].process
- if process:
- try:
- process.terminate()
- active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
- active_runs[index].status = "failed"
- return {"status": "stopped"}
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
- raise HTTPException(status_code=500, detail="Process handle missing")
- @app.get("/api/pipeline/status")
- def get_all_status():
- res = {}
- for idx, run in active_runs.items():
- res[idx] = {
- "status": run.status,
- "start_time": run.start_time,
- "logs": list(run.logs)
- }
- return res
- # Mount UI static files
- ui_dir = BASE_DIR / "ui"
- ui_dir.mkdir(exist_ok=True)
- app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
- @app.get("/")
- def serve_ui():
- index_html = ui_dir / "index.html"
- if not index_html.exists():
- return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
- with open(index_html, "r", encoding="utf-8") as f:
- return HTMLResponse(f.read())
- if __name__ == "__main__":
- print("Starting Pipeline Dashboard server on http://127.0.0.1:8080")
- uvicorn.run("server:app", host="0.0.0.0", port=8080, reload=False)
|