server.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. import asyncio
  2. import json
  3. from pathlib import Path
  4. from typing import Dict, List, Optional
  5. from datetime import datetime
  6. from collections import deque
  7. from fastapi import FastAPI, HTTPException, Request
  8. from fastapi.staticfiles import StaticFiles
  9. from fastapi.responses import HTMLResponse
  10. from pydantic import BaseModel
  11. import uvicorn
  12. import sys
  13. app = FastAPI(title="Pipeline Dashboard")
  14. BASE_DIR = Path(__file__).parent
  15. OUTPUT_DIR = BASE_DIR / "output"
  16. DB_PATH = BASE_DIR / "db_requirements.json"
  17. PROMPTS_DIR = BASE_DIR / "prompts"
  18. # In-memory storage for active runs
  19. class ActiveRun:
  20. def __init__(self):
  21. self.process: Optional[asyncio.subprocess.Process] = None
  22. self.logs: deque = deque(maxlen=200) # Keep last 200 lines
  23. self.status: str = "starting"
  24. self.start_time: str = datetime.now().isoformat()
  25. active_runs: Dict[int, ActiveRun] = {}
  26. class RunRequest(BaseModel):
  27. skip_research: bool = False
  28. research_only: bool = False
  29. platforms: str = "xhs,youtube,bili,x"
  30. use_claude_sdk: bool = False
  31. restart_mode: str = "smart"
  32. class MemoRequest(BaseModel):
  33. memo: str
  34. class PromptRequest(BaseModel):
  35. content: str
  36. @app.get("/api/requirements")
  37. def get_requirements():
  38. if not DB_PATH.exists():
  39. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  40. with open(DB_PATH, "r", encoding="utf-8") as f:
  41. reqs = json.load(f)
  42. results = []
  43. for i, req in enumerate(reqs):
  44. idx_str = f"{(i+1):03d}"
  45. dir_path = OUTPUT_DIR / idx_str
  46. has_strategy = (dir_path / "strategy.json").exists()
  47. has_blueprint = (dir_path / "blueprint.json").exists()
  48. has_caps = (dir_path / "capabilities_extracted.json").exists()
  49. raw_cases_count = 0
  50. raw_cases_dir = dir_path / "raw_cases"
  51. if raw_cases_dir.exists():
  52. raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
  53. status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
  54. if i in active_runs and active_runs[i].status == "running":
  55. status = "running"
  56. memo_content = ""
  57. memo_path = dir_path / "memo.txt"
  58. if memo_path.exists():
  59. with open(memo_path, "r", encoding="utf-8") as fm:
  60. memo_content = fm.read().strip()
  61. results.append({
  62. "index": i,
  63. "id": idx_str,
  64. "requirement": req,
  65. "status": status,
  66. "has_strategy": has_strategy,
  67. "has_blueprint": has_blueprint,
  68. "has_caps": has_caps,
  69. "raw_cases_count": raw_cases_count,
  70. "memo": memo_content
  71. })
  72. return results
  73. @app.get("/api/requirements/{index}/data")
  74. def get_requirement_data(index: int):
  75. idx_str = f"{(index+1):03d}"
  76. dir_path = OUTPUT_DIR / idx_str
  77. def safe_load_json(p: Path):
  78. if not p.exists():
  79. return None
  80. try:
  81. with open(p, "r", encoding="utf-8") as f:
  82. return json.load(f)
  83. except Exception:
  84. return {"error": "Failed to parse JSON"}
  85. data = {
  86. "strategy": safe_load_json(dir_path / "strategy.json"),
  87. "blueprint": safe_load_json(dir_path / "blueprint.json"),
  88. "capabilities": safe_load_json(dir_path / "capabilities_extracted.json"),
  89. "raw_cases": {}
  90. }
  91. raw_cases_dir = dir_path / "raw_cases"
  92. if raw_cases_dir.exists():
  93. for f in raw_cases_dir.glob("case_*.json"):
  94. data["raw_cases"][f.stem] = safe_load_json(f)
  95. return data
  96. @app.get("/api/requirements/{index}/memo")
  97. def get_memo(index: int):
  98. idx_str = f"{(index+1):03d}"
  99. memo_path = OUTPUT_DIR / idx_str / "memo.txt"
  100. if memo_path.exists():
  101. with open(memo_path, "r", encoding="utf-8") as f:
  102. return {"memo": f.read()}
  103. return {"memo": ""}
  104. @app.post("/api/requirements/{index}/memo")
  105. def save_memo(index: int, req: MemoRequest):
  106. idx_str = f"{(index+1):03d}"
  107. dir_path = OUTPUT_DIR / idx_str
  108. dir_path.mkdir(parents=True, exist_ok=True)
  109. memo_path = dir_path / "memo.txt"
  110. with open(memo_path, "w", encoding="utf-8") as f:
  111. f.write(req.memo)
  112. return {"status": "ok"}
  113. @app.get("/api/prompts")
  114. def list_prompts():
  115. if not PROMPTS_DIR.exists():
  116. return []
  117. return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
  118. @app.get("/api/prompts/{name}")
  119. def get_prompt(name: str):
  120. if "/" in name or "\\" in name:
  121. raise HTTPException(status_code=400, detail="Invalid prompt name")
  122. prompt_path = PROMPTS_DIR / name
  123. if not prompt_path.exists() or not prompt_path.is_file():
  124. raise HTTPException(status_code=404, detail="Prompt not found")
  125. with open(prompt_path, "r", encoding="utf-8") as f:
  126. return {"content": f.read()}
  127. @app.post("/api/prompts/{name}")
  128. def save_prompt(name: str, req: PromptRequest):
  129. if "/" in name or "\\" in name:
  130. raise HTTPException(status_code=400, detail="Invalid prompt name")
  131. prompt_path = PROMPTS_DIR / name
  132. if not prompt_path.exists() or not prompt_path.is_file():
  133. raise HTTPException(status_code=404, detail="Prompt not found")
  134. with open(prompt_path, "w", encoding="utf-8") as f:
  135. f.write(req.content)
  136. return {"status": "ok"}
  137. async def run_pipeline_task(index: int, run_req: RunRequest):
  138. run_state = active_runs[index]
  139. run_state.status = "running"
  140. dir_path = OUTPUT_DIR / f"{(index+1):03d}"
  141. mode = run_req.restart_mode
  142. if mode in ['phase1_platforms', 'phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3']:
  143. if (dir_path / "strategy.json").exists(): (dir_path / "strategy.json").unlink()
  144. if mode in ['phase1_platforms', 'phase2_all', 'phase2_blueprint']:
  145. if (dir_path / "blueprint.json").exists(): (dir_path / "blueprint.json").unlink()
  146. if mode in ['phase1_platforms', 'phase2_all', 'phase2_capabilities']:
  147. if (dir_path / "capabilities_extracted.json").exists(): (dir_path / "capabilities_extracted.json").unlink()
  148. if mode == 'phase1_platforms':
  149. raw_cases_dir = dir_path / "raw_cases"
  150. if raw_cases_dir.exists() and run_req.platforms:
  151. plats = [p.strip() for p in run_req.platforms.split(",") if p.strip()]
  152. for p in plats:
  153. f = raw_cases_dir / f"case_{p}.json"
  154. if f.exists():
  155. f.unlink()
  156. run_req.skip_research = False
  157. elif mode in ['phase2_capabilities', 'phase2_blueprint', 'phase2_all', 'phase3']:
  158. run_req.skip_research = True
  159. # build command
  160. script_path = BASE_DIR / "run_pipeline.py"
  161. cmd = [sys.executable, str(script_path), "--index", str(index)]
  162. if run_req.skip_research:
  163. cmd.append("--skip-research")
  164. if run_req.research_only:
  165. cmd.append("--research-only")
  166. if run_req.platforms:
  167. cmd.extend(["--platforms", run_req.platforms])
  168. if run_req.use_claude_sdk:
  169. cmd.append("--use-claude-sdk")
  170. run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
  171. import threading
  172. import subprocess
  173. import os
  174. def run_process():
  175. try:
  176. process = subprocess.Popen(
  177. cmd,
  178. stdout=subprocess.PIPE,
  179. stderr=subprocess.STDOUT,
  180. text=True,
  181. encoding="utf-8",
  182. bufsize=1,
  183. cwd=str(BASE_DIR),
  184. env=dict(os.environ, PYTHONIOENCODING="utf-8")
  185. )
  186. run_state.process = process
  187. for line in iter(process.stdout.readline, ''):
  188. if line:
  189. run_state.logs.append(line)
  190. process.stdout.close()
  191. return_code = process.wait()
  192. run_state.logs.append(f"\nProcess exited with code {return_code}")
  193. run_state.status = "completed" if return_code == 0 else "failed"
  194. except Exception as e:
  195. run_state.logs.append(f"\nException occurred: {repr(e)}")
  196. run_state.status = "failed"
  197. thread = threading.Thread(target=run_process)
  198. thread.start()
  199. @app.post("/api/pipeline/run/{index}")
  200. async def trigger_pipeline(index: int, req: RunRequest):
  201. if index in active_runs and active_runs[index].status == "running":
  202. raise HTTPException(status_code=400, detail="Pipeline already running for this index")
  203. active_runs[index] = ActiveRun()
  204. asyncio.create_task(run_pipeline_task(index, req))
  205. return {"message": "Pipeline started", "index": index}
  206. @app.get("/api/pipeline/status")
  207. def get_all_status():
  208. res = {}
  209. for idx, run in active_runs.items():
  210. res[idx] = {
  211. "status": run.status,
  212. "start_time": run.start_time,
  213. "logs": list(run.logs)
  214. }
  215. return res
  216. # Mount UI static files
  217. ui_dir = BASE_DIR / "ui"
  218. ui_dir.mkdir(exist_ok=True)
  219. app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
  220. @app.get("/")
  221. def serve_ui():
  222. index_html = ui_dir / "index.html"
  223. if not index_html.exists():
  224. return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
  225. with open(index_html, "r", encoding="utf-8") as f:
  226. return HTMLResponse(f.read())
  227. if __name__ == "__main__":
  228. print("Starting Pipeline Dashboard server on http://127.0.0.1:8080")
  229. uvicorn.run("server:app", host="0.0.0.0", port=8080, reload=False)