server.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. import asyncio
  2. import json
  3. from pathlib import Path
  4. from typing import Dict, List, Optional
  5. from datetime import datetime
  6. from collections import deque
  7. from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, UploadFile, File
  8. from fastapi.staticfiles import StaticFiles
  9. from fastapi.responses import HTMLResponse
  10. from pydantic import BaseModel
  11. import uvicorn
  12. import sys
  13. app = FastAPI(title="Pipeline Dashboard")
  14. BASE_DIR = Path(__file__).parent
  15. OUTPUT_DIR = BASE_DIR / "output"
  16. DB_PATH = BASE_DIR / "db_requirements.json"
  17. PROMPTS_DIR = BASE_DIR / "prompts"
  18. # In-memory storage for active runs
  19. class ActiveRun:
  20. def __init__(self):
  21. self.process: Optional[asyncio.subprocess.Process] = None
  22. self.logs: deque = deque(maxlen=200) # Keep last 200 lines
  23. self.status: str = "starting"
  24. self.start_time: str = datetime.now().isoformat()
  25. active_runs: Dict[int, ActiveRun] = {}
  26. @app.post("/api/requirements/{index}/upload_source_ex")
  27. async def upload_source_ex(index: int, file: UploadFile = File(...)):
  28. idx_str = f"{(index+1):03d}"
  29. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  30. dir_path.mkdir(parents=True, exist_ok=True)
  31. file_path = dir_path / "source_ex.json"
  32. content = await file.read()
  33. with open(file_path, "wb") as f:
  34. f.write(content)
  35. return {"status": "success", "filename": "source_ex.json"}
  36. class RunRequest(BaseModel):
  37. platforms: Optional[str] = None
  38. use_claude_sdk: bool = False
  39. restart_mode: Optional[str] = None
  40. phase: Optional[int] = None
  41. only_step: Optional[str] = None
  42. start_from: Optional[str] = None
  43. end_at: Optional[str] = None
  44. case_index: Optional[int] = None
  45. class MemoRequest(BaseModel):
  46. memo: str
  47. class RequirementUpdateRequest(BaseModel):
  48. requirement: str
  49. class PromptRequest(BaseModel):
  50. content: str
  51. schema_content: Optional[str] = None
  52. class ImportExternalRequest(BaseModel):
  53. case_id: str
  54. @app.post("/api/requirements/{index}/import_source_ex")
  55. def import_source_ex(index: int, req: ImportExternalRequest):
  56. idx_str = f"{(index+1):03d}"
  57. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  58. source_ex_path = dir_path / "source_ex.json"
  59. source_path = dir_path / "source.json"
  60. if not source_ex_path.exists():
  61. raise HTTPException(status_code=404, detail="source_ex.json not found")
  62. with open(source_ex_path, "r", encoding="utf-8") as f:
  63. ex_data = json.load(f)
  64. ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
  65. target_case = None
  66. for c in ex_cases:
  67. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  68. if cId == req.case_id:
  69. target_case = c
  70. break
  71. if not target_case:
  72. raise HTTPException(status_code=404, detail="Case not found in external sources")
  73. source_cases = []
  74. if source_path.exists():
  75. with open(source_path, "r", encoding="utf-8") as f:
  76. s_data = json.load(f)
  77. source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
  78. # Check if already exists
  79. for c in source_cases:
  80. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  81. if cId == req.case_id:
  82. return {"status": "success", "message": "Already imported"}
  83. # Append
  84. source_cases.append(target_case)
  85. with open(source_path, "w", encoding="utf-8") as f:
  86. # Save as object format for source.json
  87. json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
  88. return {"status": "success"}
  89. @app.post("/api/requirements/{index}/import_all_source_ex")
  90. def import_all_source_ex(index: int):
  91. idx_str = f"{(index+1):03d}"
  92. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  93. source_ex_path = dir_path / "source_ex.json"
  94. source_path = dir_path / "source.json"
  95. if not source_ex_path.exists():
  96. raise HTTPException(status_code=404, detail="source_ex.json not found")
  97. with open(source_ex_path, "r", encoding="utf-8") as f:
  98. ex_data = json.load(f)
  99. ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
  100. source_cases = []
  101. if source_path.exists():
  102. with open(source_path, "r", encoding="utf-8") as f:
  103. s_data = json.load(f)
  104. source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
  105. existing_ids = set()
  106. for c in source_cases:
  107. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  108. if cId:
  109. existing_ids.add(cId)
  110. added_count = 0
  111. for c in ex_cases:
  112. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  113. if cId and cId not in existing_ids:
  114. source_cases.append(c)
  115. existing_ids.add(cId)
  116. added_count += 1
  117. if added_count > 0:
  118. with open(source_path, "w", encoding="utf-8") as f:
  119. json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
  120. return {"status": "success", "count": added_count}
  121. @app.get("/api/requirements")
  122. def get_requirements():
  123. if not DB_PATH.exists():
  124. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  125. with open(DB_PATH, "r", encoding="utf-8") as f:
  126. reqs = json.load(f)
  127. results = []
  128. for i, req in enumerate(reqs):
  129. idx_str = f"{(i+1):03d}"
  130. dir_path = OUTPUT_DIR / idx_str
  131. has_strategy = (dir_path / "strategy.json").exists()
  132. has_blueprint = (dir_path / "blueprint.json").exists()
  133. has_caps = (dir_path / "capabilities_extracted.json").exists()
  134. raw_cases_count = 0
  135. case_json_path = dir_path / "case.json"
  136. if case_json_path.exists():
  137. try:
  138. with open(case_json_path, 'r', encoding='utf-8') as f:
  139. case_data = json.load(f)
  140. raw_cases_count = len(case_data.get('cases', []))
  141. except Exception:
  142. pass
  143. if raw_cases_count == 0:
  144. raw_cases_dir = dir_path / "raw_cases"
  145. if raw_cases_dir.exists():
  146. raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
  147. status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
  148. if i in active_runs and active_runs[i].status == "running":
  149. status = "running"
  150. memo_content = ""
  151. memo_path = dir_path / "memo.txt"
  152. if memo_path.exists():
  153. with open(memo_path, "r", encoding="utf-8") as fm:
  154. memo_content = fm.read().strip()
  155. results.append({
  156. "index": i,
  157. "id": idx_str,
  158. "requirement": req,
  159. "status": status,
  160. "has_strategy": has_strategy,
  161. "has_blueprint": has_blueprint,
  162. "has_caps": has_caps,
  163. "raw_cases_count": raw_cases_count,
  164. "memo": memo_content
  165. })
  166. return results
  167. @app.put("/api/requirements/{index}")
  168. def update_requirement(index: int, req: RequirementUpdateRequest):
  169. if not DB_PATH.exists():
  170. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  171. with open(DB_PATH, "r", encoding="utf-8") as f:
  172. reqs = json.load(f)
  173. if index < 0 or index >= len(reqs):
  174. raise HTTPException(status_code=404, detail="Requirement index out of range")
  175. reqs[index] = req.requirement
  176. with open(DB_PATH, "w", encoding="utf-8") as f:
  177. json.dump(reqs, f, ensure_ascii=False, indent=2)
  178. return {"status": "success", "requirement": req.requirement}
  179. @app.post("/api/requirements")
  180. def add_requirement(req: RequirementUpdateRequest):
  181. if not DB_PATH.exists():
  182. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  183. with open(DB_PATH, "r", encoding="utf-8") as f:
  184. reqs = json.load(f)
  185. reqs.append(req.requirement)
  186. new_index = len(reqs) - 1
  187. with open(DB_PATH, "w", encoding="utf-8") as f:
  188. json.dump(reqs, f, ensure_ascii=False, indent=2)
  189. # Pre-create output directory
  190. idx_str = f"{(new_index+1):03d}"
  191. dir_path = OUTPUT_DIR / idx_str
  192. dir_path.mkdir(parents=True, exist_ok=True)
  193. return {"status": "success", "index": new_index, "requirement": req.requirement}
  194. @app.get("/api/requirements/{index}/data")
  195. def get_requirement_data(index: int):
  196. idx_str = f"{(index+1):03d}"
  197. dir_path = OUTPUT_DIR / idx_str
  198. def safe_load_json(p: Path):
  199. if not p.exists():
  200. return None
  201. content = ""
  202. try:
  203. with open(p, "r", encoding="utf-8") as f:
  204. content = f.read()
  205. return json.loads(content)
  206. except Exception as e:
  207. return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
  208. data = {
  209. "strategy": safe_load_json(dir_path / "strategy.json"),
  210. "blueprint": safe_load_json(dir_path / "process.json") or safe_load_json(dir_path / "blueprint.json"),
  211. "blueprint_temp": safe_load_json(dir_path / "blueprint_temp.json"),
  212. "capabilities": safe_load_json(dir_path / "capabilities.json") or safe_load_json(dir_path / "capabilities_extracted.json"),
  213. "capabilities_temp": safe_load_json(dir_path / "capabilities_temp.json"),
  214. "raw_cases": {
  215. "case": safe_load_json(dir_path / "case.json")
  216. }
  217. }
  218. raw_cases_dir = dir_path / "raw_cases"
  219. if raw_cases_dir.exists():
  220. for f in raw_cases_dir.glob("*.json"):
  221. data["raw_cases"][f.stem] = safe_load_json(f)
  222. return data
  223. @app.get("/api/requirements/{index}/memo")
  224. def get_memo(index: int):
  225. idx_str = f"{(index+1):03d}"
  226. memo_path = OUTPUT_DIR / idx_str / "memo.txt"
  227. if memo_path.exists():
  228. with open(memo_path, "r", encoding="utf-8") as f:
  229. return {"memo": f.read()}
  230. return {"memo": ""}
  231. @app.get("/api/requirements/{index}/pipeline-status")
  232. def get_pipeline_status(index: int):
  233. idx_str = f"{(index+1):03d}"
  234. dir_path = OUTPUT_DIR / idx_str
  235. raw_cases_dir = dir_path / "raw_cases"
  236. status = {
  237. "research": False,
  238. "source": False,
  239. "generate-case": False,
  240. "workflow-extract": False,
  241. "capability-extract-1": False,
  242. "process-cluster": False,
  243. "process-score": False,
  244. "capability-extract": False,
  245. "capability-enrich": False,
  246. "strategy": False
  247. }
  248. if raw_cases_dir.exists():
  249. if list(raw_cases_dir.glob("case_*.json")):
  250. status["research"] = True
  251. if (raw_cases_dir / "source.json").exists():
  252. status["source"] = True
  253. case_file = dir_path / "case.json"
  254. legacy_detailed = raw_cases_dir / "case_detailed.json"
  255. has_workflow = False
  256. has_capability = False
  257. if case_file.exists():
  258. status["generate-case"] = True
  259. try:
  260. with open(case_file, "r", encoding="utf-8") as f:
  261. cdata = json.load(f)
  262. if cdata.get("cases"):
  263. for c in cdata["cases"]:
  264. if c.get("workflow"):
  265. has_workflow = True
  266. if c.get("capabilities"):
  267. has_capability = True
  268. if has_workflow and has_capability:
  269. break
  270. except Exception:
  271. pass
  272. if legacy_detailed.exists():
  273. status["generate-case"] = True
  274. has_workflow = True
  275. if has_workflow:
  276. status["workflow-extract"] = True
  277. if has_capability:
  278. status["capability-extract-1"] = True
  279. if (dir_path / "blueprint_temp.json").exists():
  280. status["process-cluster"] = True
  281. if (dir_path / "process.json").exists():
  282. status["process-score"] = True
  283. if (dir_path / "capabilities_temp.json").exists():
  284. status["capability-extract"] = True
  285. if (dir_path / "capabilities.json").exists():
  286. status["capability-enrich"] = True
  287. if (dir_path / "strategy.json").exists():
  288. status["strategy"] = True
  289. return status
  290. @app.post("/api/requirements/{index}/memo")
  291. def save_memo(index: int, req: MemoRequest):
  292. idx_str = f"{(index+1):03d}"
  293. dir_path = OUTPUT_DIR / idx_str
  294. dir_path.mkdir(parents=True, exist_ok=True)
  295. memo_path = dir_path / "memo.txt"
  296. with open(memo_path, "w", encoding="utf-8") as f:
  297. f.write(req.memo)
  298. return {"status": "ok"}
  299. @app.get("/api/prompts")
  300. def list_prompts():
  301. if not PROMPTS_DIR.exists():
  302. return []
  303. return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
  304. @app.get("/api/prompts/{name}")
  305. def get_prompt(name: str):
  306. if "/" in name or "\\" in name:
  307. raise HTTPException(status_code=400, detail="Invalid prompt name")
  308. prompt_path = PROMPTS_DIR / name
  309. if not prompt_path.exists() or not prompt_path.is_file():
  310. raise HTTPException(status_code=404, detail="Prompt not found")
  311. schema_name = name.replace(".prompt", ".schema.json")
  312. schema_path = PROMPTS_DIR / schema_name
  313. schema_content = ""
  314. if schema_path.exists() and schema_path.is_file():
  315. with open(schema_path, "r", encoding="utf-8") as f:
  316. schema_content = f.read()
  317. with open(prompt_path, "r", encoding="utf-8") as f:
  318. return {"content": f.read(), "schema_content": schema_content}
  319. @app.post("/api/prompts/{name}")
  320. def save_prompt(name: str, req: PromptRequest):
  321. if "/" in name or "\\" in name:
  322. raise HTTPException(status_code=400, detail="Invalid prompt name")
  323. prompt_path = PROMPTS_DIR / name
  324. if not prompt_path.exists() or not prompt_path.is_file():
  325. raise HTTPException(status_code=404, detail="Prompt not found")
  326. schema_name = name.replace(".prompt", ".schema.json")
  327. schema_path = PROMPTS_DIR / schema_name
  328. if req.schema_content is not None:
  329. if req.schema_content.strip():
  330. try:
  331. parsed_schema = json.loads(req.schema_content)
  332. except Exception as e:
  333. raise HTTPException(status_code=400, detail=f"Invalid Schema JSON: {str(e)}")
  334. with open(schema_path, "w", encoding="utf-8") as f:
  335. json.dump(parsed_schema, f, ensure_ascii=False, indent=2)
  336. else:
  337. if schema_path.exists():
  338. schema_path.unlink() # remove if empty
  339. with open(prompt_path, "w", encoding="utf-8", newline="\n") as f:
  340. # Enforce Unix LF line endings to avoid unnecessary git diffs
  341. # and format inconsistencies between the file system and UI.
  342. f.write(req.content.replace('\r\n', '\n'))
  343. return {"status": "ok"}
  344. async def run_pipeline_task(index: int, run_req: RunRequest):
  345. run_state = active_runs[index]
  346. run_state.status = "running"
  347. dir_path = OUTPUT_DIR / f"{(index+1):03d}"
  348. # We no longer manually delete files here for most modes,
  349. # run_pipeline.py handles overwriting when --only-step or --start-from is passed.
  350. # build command
  351. script_path = BASE_DIR / "run_pipeline.py"
  352. cmd = [sys.executable, str(script_path), "--index", str(index)]
  353. if run_req.platforms:
  354. cmd.extend(["--platforms", run_req.platforms])
  355. if run_req.use_claude_sdk:
  356. cmd.append("--use-claude-sdk")
  357. if run_req.only_step:
  358. cmd.extend(["--only-step", run_req.only_step])
  359. if run_req.case_index is not None:
  360. cmd.extend(["--case-index", str(run_req.case_index)])
  361. elif run_req.phase is not None:
  362. cmd.extend(["--phase", str(run_req.phase)])
  363. else:
  364. if run_req.start_from:
  365. cmd.extend(["--start-from", run_req.start_from])
  366. if run_req.end_at:
  367. cmd.extend(["--end-at", run_req.end_at])
  368. if run_req.restart_mode and run_req.restart_mode == "smart":
  369. pass # Smart mode requires no extra args
  370. run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
  371. import threading
  372. import subprocess
  373. import os
  374. def run_process():
  375. try:
  376. process = subprocess.Popen(
  377. cmd,
  378. stdout=subprocess.PIPE,
  379. stderr=subprocess.STDOUT,
  380. text=True,
  381. encoding="utf-8",
  382. bufsize=1,
  383. cwd=str(BASE_DIR),
  384. env=dict(os.environ, PYTHONIOENCODING="utf-8")
  385. )
  386. run_state.process = process
  387. for line in iter(process.stdout.readline, ''):
  388. if line:
  389. run_state.logs.append(line)
  390. process.stdout.close()
  391. return_code = process.wait()
  392. run_state.logs.append(f"\nProcess exited with code {return_code}")
  393. run_state.status = "completed" if return_code == 0 else "failed"
  394. except Exception as e:
  395. run_state.logs.append(f"\nException occurred: {repr(e)}")
  396. run_state.status = "failed"
  397. thread = threading.Thread(target=run_process)
  398. thread.start()
  399. @app.post("/api/pipeline/run/{index}")
  400. async def trigger_pipeline(index: int, req: RunRequest):
  401. if index in active_runs and active_runs[index].status == "running":
  402. raise HTTPException(status_code=400, detail="Pipeline already running for this index")
  403. active_runs[index] = ActiveRun()
  404. asyncio.create_task(run_pipeline_task(index, req))
  405. return {"message": "Pipeline started", "index": index}
  406. @app.post("/api/pipeline/stop/{index}")
  407. async def stop_pipeline(index: int):
  408. if index not in active_runs or active_runs[index].status != "running":
  409. raise HTTPException(status_code=400, detail="No running pipeline found for this index")
  410. process = active_runs[index].process
  411. if process:
  412. try:
  413. process.terminate()
  414. active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
  415. active_runs[index].status = "failed"
  416. return {"status": "stopped"}
  417. except Exception as e:
  418. raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
  419. raise HTTPException(status_code=500, detail="Process handle missing")
  420. @app.get("/api/pipeline/status")
  421. def get_all_status():
  422. res = {}
  423. for idx, run in active_runs.items():
  424. res[idx] = {
  425. "status": run.status,
  426. "start_time": run.start_time,
  427. "logs": list(run.logs)
  428. }
  429. return res
  430. # Mount UI static files
  431. ui_dir = BASE_DIR / "ui"
  432. ui_dir.mkdir(exist_ok=True)
  433. app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
  434. # Mount output directory for local resources (like images)
  435. app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
  436. @app.get("/")
  437. def serve_ui():
  438. index_html = ui_dir / "index.html"
  439. if not index_html.exists():
  440. return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
  441. with open(index_html, "r", encoding="utf-8") as f:
  442. return HTMLResponse(f.read())
  443. if __name__ == "__main__":
  444. print("Starting Pipeline Dashboard server on http://127.0.0.0:18080")
  445. uvicorn.run("server:app", host="0.0.0.0", port=18080, reload=False)