server.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import asyncio
  2. import json
  3. from pathlib import Path
  4. from typing import Dict, List, Optional
  5. from datetime import datetime
  6. from collections import deque
  7. from fastapi import FastAPI, HTTPException, Request
  8. from fastapi.staticfiles import StaticFiles
  9. from fastapi.responses import HTMLResponse
  10. from pydantic import BaseModel
  11. import uvicorn
  12. import sys
  13. app = FastAPI(title="Pipeline Dashboard")
  14. BASE_DIR = Path(__file__).parent
  15. OUTPUT_DIR = BASE_DIR / "output"
  16. DB_PATH = BASE_DIR / "db_requirements.json"
  17. PROMPTS_DIR = BASE_DIR / "prompts"
  18. # In-memory storage for active runs
  19. class ActiveRun:
  20. def __init__(self):
  21. self.process: Optional[asyncio.subprocess.Process] = None
  22. self.logs: deque = deque(maxlen=200) # Keep last 200 lines
  23. self.status: str = "starting"
  24. self.start_time: str = datetime.now().isoformat()
  25. active_runs: Dict[int, ActiveRun] = {}
  26. class RunRequest(BaseModel):
  27. skip_research: bool = False
  28. research_only: bool = False
  29. platforms: str = "xhs,youtube,bili,x"
  30. use_claude_sdk: bool = False
  31. restart_mode: str = "smart"
  32. class MemoRequest(BaseModel):
  33. memo: str
  34. class PromptRequest(BaseModel):
  35. content: str
  36. @app.get("/api/requirements")
  37. def get_requirements():
  38. if not DB_PATH.exists():
  39. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  40. with open(DB_PATH, "r", encoding="utf-8") as f:
  41. reqs = json.load(f)
  42. results = []
  43. for i, req in enumerate(reqs):
  44. idx_str = f"{(i+1):03d}"
  45. dir_path = OUTPUT_DIR / idx_str
  46. has_strategy = (dir_path / "strategy.json").exists()
  47. has_blueprint = (dir_path / "blueprint.json").exists()
  48. has_caps = (dir_path / "capabilities_extracted.json").exists()
  49. raw_cases_count = 0
  50. raw_cases_dir = dir_path / "raw_cases"
  51. if raw_cases_dir.exists():
  52. raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
  53. status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
  54. if i in active_runs and active_runs[i].status == "running":
  55. status = "running"
  56. memo_content = ""
  57. memo_path = dir_path / "memo.txt"
  58. if memo_path.exists():
  59. with open(memo_path, "r", encoding="utf-8") as fm:
  60. memo_content = fm.read().strip()
  61. results.append({
  62. "index": i,
  63. "id": idx_str,
  64. "requirement": req,
  65. "status": status,
  66. "has_strategy": has_strategy,
  67. "has_blueprint": has_blueprint,
  68. "has_caps": has_caps,
  69. "raw_cases_count": raw_cases_count,
  70. "memo": memo_content
  71. })
  72. return results
  73. @app.get("/api/requirements/{index}/data")
  74. def get_requirement_data(index: int):
  75. idx_str = f"{(index+1):03d}"
  76. dir_path = OUTPUT_DIR / idx_str
  77. def safe_load_json(p: Path):
  78. if not p.exists():
  79. return None
  80. content = ""
  81. try:
  82. with open(p, "r", encoding="utf-8") as f:
  83. content = f.read()
  84. return json.loads(content)
  85. except Exception as e:
  86. return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
  87. data = {
  88. "strategy": safe_load_json(dir_path / "strategy.json"),
  89. "blueprint": safe_load_json(dir_path / "blueprint.json"),
  90. "capabilities": safe_load_json(dir_path / "capabilities_extracted.json"),
  91. "raw_cases": {}
  92. }
  93. raw_cases_dir = dir_path / "raw_cases"
  94. if raw_cases_dir.exists():
  95. for f in raw_cases_dir.glob("*.json"):
  96. data["raw_cases"][f.stem] = safe_load_json(f)
  97. return data
  98. @app.get("/api/requirements/{index}/memo")
  99. def get_memo(index: int):
  100. idx_str = f"{(index+1):03d}"
  101. memo_path = OUTPUT_DIR / idx_str / "memo.txt"
  102. if memo_path.exists():
  103. with open(memo_path, "r", encoding="utf-8") as f:
  104. return {"memo": f.read()}
  105. return {"memo": ""}
  106. @app.post("/api/requirements/{index}/memo")
  107. def save_memo(index: int, req: MemoRequest):
  108. idx_str = f"{(index+1):03d}"
  109. dir_path = OUTPUT_DIR / idx_str
  110. dir_path.mkdir(parents=True, exist_ok=True)
  111. memo_path = dir_path / "memo.txt"
  112. with open(memo_path, "w", encoding="utf-8") as f:
  113. f.write(req.memo)
  114. return {"status": "ok"}
  115. @app.get("/api/prompts")
  116. def list_prompts():
  117. if not PROMPTS_DIR.exists():
  118. return []
  119. return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
  120. @app.get("/api/prompts/{name}")
  121. def get_prompt(name: str):
  122. if "/" in name or "\\" in name:
  123. raise HTTPException(status_code=400, detail="Invalid prompt name")
  124. prompt_path = PROMPTS_DIR / name
  125. if not prompt_path.exists() or not prompt_path.is_file():
  126. raise HTTPException(status_code=404, detail="Prompt not found")
  127. with open(prompt_path, "r", encoding="utf-8") as f:
  128. return {"content": f.read()}
  129. @app.post("/api/prompts/{name}")
  130. def save_prompt(name: str, req: PromptRequest):
  131. if "/" in name or "\\" in name:
  132. raise HTTPException(status_code=400, detail="Invalid prompt name")
  133. prompt_path = PROMPTS_DIR / name
  134. if not prompt_path.exists() or not prompt_path.is_file():
  135. raise HTTPException(status_code=404, detail="Prompt not found")
  136. with open(prompt_path, "w", encoding="utf-8") as f:
  137. f.write(req.content)
  138. return {"status": "ok"}
  139. async def run_pipeline_task(index: int, run_req: RunRequest):
  140. run_state = active_runs[index]
  141. run_state.status = "running"
  142. dir_path = OUTPUT_DIR / f"{(index+1):03d}"
  143. mode = run_req.restart_mode
  144. if mode in ['phase1_platforms', 'phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3', 'single_strategy']:
  145. if (dir_path / "strategy.json").exists(): (dir_path / "strategy.json").unlink()
  146. if mode in ['phase1_platforms', 'phase2_all', 'phase2_blueprint', 'single_blueprint']:
  147. if (dir_path / "blueprint.json").exists(): (dir_path / "blueprint.json").unlink()
  148. if mode in ['phase1_platforms', 'phase2_all', 'phase2_capabilities', 'single_capabilities']:
  149. if (dir_path / "capabilities_extracted.json").exists(): (dir_path / "capabilities_extracted.json").unlink()
  150. if mode in ['phase1_platforms', 'single_platforms']:
  151. raw_cases_dir = dir_path / "raw_cases"
  152. if raw_cases_dir.exists() and run_req.platforms:
  153. plats = [p.strip() for p in run_req.platforms.split(",") if p.strip()]
  154. for p in plats:
  155. f = raw_cases_dir / f"case_{p}.json"
  156. if f.exists():
  157. f.unlink()
  158. run_req.skip_research = False
  159. elif mode in ['phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3', 'single_blueprint', 'single_capabilities', 'single_strategy']:
  160. run_req.skip_research = True
  161. # build command
  162. script_path = BASE_DIR / "run_pipeline.py"
  163. cmd = [sys.executable, str(script_path), "--index", str(index)]
  164. if run_req.skip_research:
  165. cmd.append("--skip-research")
  166. if run_req.research_only:
  167. cmd.append("--research-only")
  168. if run_req.platforms and not run_req.skip_research:
  169. cmd.extend(["--platforms", run_req.platforms])
  170. if run_req.use_claude_sdk:
  171. cmd.append("--use-claude-sdk")
  172. if run_req.restart_mode:
  173. cmd.extend(["--restart-mode", run_req.restart_mode])
  174. run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
  175. import threading
  176. import subprocess
  177. import os
  178. def run_process():
  179. try:
  180. process = subprocess.Popen(
  181. cmd,
  182. stdout=subprocess.PIPE,
  183. stderr=subprocess.STDOUT,
  184. text=True,
  185. encoding="utf-8",
  186. bufsize=1,
  187. cwd=str(BASE_DIR),
  188. env=dict(os.environ, PYTHONIOENCODING="utf-8")
  189. )
  190. run_state.process = process
  191. for line in iter(process.stdout.readline, ''):
  192. if line:
  193. run_state.logs.append(line)
  194. process.stdout.close()
  195. return_code = process.wait()
  196. run_state.logs.append(f"\nProcess exited with code {return_code}")
  197. run_state.status = "completed" if return_code == 0 else "failed"
  198. except Exception as e:
  199. run_state.logs.append(f"\nException occurred: {repr(e)}")
  200. run_state.status = "failed"
  201. thread = threading.Thread(target=run_process)
  202. thread.start()
  203. @app.post("/api/pipeline/run/{index}")
  204. async def trigger_pipeline(index: int, req: RunRequest):
  205. if index in active_runs and active_runs[index].status == "running":
  206. raise HTTPException(status_code=400, detail="Pipeline already running for this index")
  207. active_runs[index] = ActiveRun()
  208. asyncio.create_task(run_pipeline_task(index, req))
  209. return {"message": "Pipeline started", "index": index}
  210. @app.post("/api/pipeline/stop/{index}")
  211. async def stop_pipeline(index: int):
  212. if index not in active_runs or active_runs[index].status != "running":
  213. raise HTTPException(status_code=400, detail="No running pipeline found for this index")
  214. process = active_runs[index].process
  215. if process:
  216. try:
  217. process.terminate()
  218. active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
  219. active_runs[index].status = "failed"
  220. return {"status": "stopped"}
  221. except Exception as e:
  222. raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
  223. raise HTTPException(status_code=500, detail="Process handle missing")
  224. @app.get("/api/pipeline/status")
  225. def get_all_status():
  226. res = {}
  227. for idx, run in active_runs.items():
  228. res[idx] = {
  229. "status": run.status,
  230. "start_time": run.start_time,
  231. "logs": list(run.logs)
  232. }
  233. return res
  234. # Mount UI static files
  235. ui_dir = BASE_DIR / "ui"
  236. ui_dir.mkdir(exist_ok=True)
  237. app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
  238. @app.get("/")
  239. def serve_ui():
  240. index_html = ui_dir / "index.html"
  241. if not index_html.exists():
  242. return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
  243. with open(index_html, "r", encoding="utf-8") as f:
  244. return HTMLResponse(f.read())
  245. if __name__ == "__main__":
  246. print("Starting Pipeline Dashboard server on http://127.0.0.1:8080")
  247. uvicorn.run("server:app", host="0.0.0.0", port=8080, reload=False)