server.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. import asyncio
  2. import json
  3. from pathlib import Path
  4. from typing import Dict, List, Optional
  5. from datetime import datetime
  6. from collections import deque
  7. from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, UploadFile, File
  8. from fastapi.staticfiles import StaticFiles
  9. from fastapi.responses import HTMLResponse
  10. from pydantic import BaseModel
  11. import uvicorn
  12. import sys
  13. app = FastAPI(title="Pipeline Dashboard")
  14. BASE_DIR = Path(__file__).parent
  15. OUTPUT_DIR = BASE_DIR / "output"
  16. DB_PATH = BASE_DIR / "db_requirements.json"
  17. PROMPTS_DIR = BASE_DIR / "prompts"
  18. # In-memory storage for active runs
  19. class ActiveRun:
  20. def __init__(self):
  21. self.process: Optional[asyncio.subprocess.Process] = None
  22. self.logs: deque = deque(maxlen=200) # Keep last 200 lines
  23. self.status: str = "starting"
  24. self.start_time: str = datetime.now().isoformat()
  25. active_runs: Dict[int, ActiveRun] = {}
  26. @app.post("/api/requirements/{index}/upload_source_ex")
  27. async def upload_source_ex(index: int, file: UploadFile = File(...)):
  28. idx_str = f"{(index+1):03d}"
  29. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  30. dir_path.mkdir(parents=True, exist_ok=True)
  31. file_path = dir_path / "source_ex.json"
  32. content = await file.read()
  33. with open(file_path, "wb") as f:
  34. f.write(content)
  35. return {"status": "success", "filename": "source_ex.json"}
  36. class RunRequest(BaseModel):
  37. platforms: Optional[str] = None
  38. use_claude_sdk: bool = False
  39. restart_mode: Optional[str] = None
  40. phase: Optional[int] = None
  41. only_step: Optional[str] = None
  42. start_from: Optional[str] = None
  43. end_at: Optional[str] = None
  44. case_index: Optional[int] = None
  45. class MemoRequest(BaseModel):
  46. memo: str
  47. class RequirementUpdateRequest(BaseModel):
  48. requirement: str
  49. class PromptRequest(BaseModel):
  50. content: str
  51. schema_content: Optional[str] = None
  52. class ImportExternalRequest(BaseModel):
  53. case_id: str
  54. class CaseFilterRequest(BaseModel):
  55. case_id: str
  56. reason: Optional[str] = "manual_delete"
  57. @app.post("/api/requirements/{index}/import_source_ex")
  58. def import_source_ex(index: int, req: ImportExternalRequest):
  59. idx_str = f"{(index+1):03d}"
  60. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  61. source_ex_path = dir_path / "source_ex.json"
  62. source_path = dir_path / "source.json"
  63. if not source_ex_path.exists():
  64. raise HTTPException(status_code=404, detail="source_ex.json not found")
  65. with open(source_ex_path, "r", encoding="utf-8") as f:
  66. ex_data = json.load(f)
  67. ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
  68. target_case = None
  69. for c in ex_cases:
  70. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  71. if cId == req.case_id:
  72. target_case = c
  73. break
  74. if not target_case:
  75. raise HTTPException(status_code=404, detail="Case not found in external sources")
  76. source_cases = []
  77. if source_path.exists():
  78. with open(source_path, "r", encoding="utf-8") as f:
  79. s_data = json.load(f)
  80. source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
  81. # Check if already exists
  82. for c in source_cases:
  83. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  84. if cId == req.case_id:
  85. return {"status": "success", "message": "Already imported"}
  86. # Append
  87. source_cases.append(target_case)
  88. with open(source_path, "w", encoding="utf-8") as f:
  89. # Save as object format for source.json
  90. json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
  91. return {"status": "success"}
  92. @app.post("/api/requirements/{index}/import_all_source_ex")
  93. def import_all_source_ex(index: int):
  94. idx_str = f"{(index+1):03d}"
  95. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  96. source_ex_path = dir_path / "source_ex.json"
  97. source_path = dir_path / "source.json"
  98. if not source_ex_path.exists():
  99. raise HTTPException(status_code=404, detail="source_ex.json not found")
  100. with open(source_ex_path, "r", encoding="utf-8") as f:
  101. ex_data = json.load(f)
  102. ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
  103. source_cases = []
  104. if source_path.exists():
  105. with open(source_path, "r", encoding="utf-8") as f:
  106. s_data = json.load(f)
  107. source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
  108. existing_ids = set()
  109. for c in source_cases:
  110. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  111. if cId:
  112. existing_ids.add(cId)
  113. added_count = 0
  114. for c in ex_cases:
  115. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  116. if cId and cId not in existing_ids:
  117. source_cases.append(c)
  118. existing_ids.add(cId)
  119. added_count += 1
  120. if added_count > 0:
  121. with open(source_path, "w", encoding="utf-8") as f:
  122. json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
  123. return {"status": "success", "count": added_count}
  124. @app.get("/api/requirements")
  125. def get_requirements():
  126. if not DB_PATH.exists():
  127. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  128. with open(DB_PATH, "r", encoding="utf-8") as f:
  129. reqs = json.load(f)
  130. results = []
  131. for i, req in enumerate(reqs):
  132. idx_str = f"{(i+1):03d}"
  133. dir_path = OUTPUT_DIR / idx_str
  134. has_strategy = (dir_path / "strategy.json").exists()
  135. has_blueprint = (dir_path / "blueprint.json").exists()
  136. has_caps = (dir_path / "capabilities_extracted.json").exists()
  137. raw_cases_count = 0
  138. case_json_path = dir_path / "case.json"
  139. if case_json_path.exists():
  140. try:
  141. with open(case_json_path, 'r', encoding='utf-8') as f:
  142. case_data = json.load(f)
  143. raw_cases_count = len(case_data.get('cases', []))
  144. except Exception:
  145. pass
  146. if raw_cases_count == 0:
  147. raw_cases_dir = dir_path / "raw_cases"
  148. if raw_cases_dir.exists():
  149. raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
  150. status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
  151. if i in active_runs and active_runs[i].status == "running":
  152. status = "running"
  153. memo_content = ""
  154. memo_path = dir_path / "memo.txt"
  155. if memo_path.exists():
  156. with open(memo_path, "r", encoding="utf-8") as fm:
  157. memo_content = fm.read().strip()
  158. results.append({
  159. "index": i,
  160. "id": idx_str,
  161. "requirement": req,
  162. "status": status,
  163. "has_strategy": has_strategy,
  164. "has_blueprint": has_blueprint,
  165. "has_caps": has_caps,
  166. "raw_cases_count": raw_cases_count,
  167. "memo": memo_content
  168. })
  169. return results
  170. @app.put("/api/requirements/{index}")
  171. def update_requirement(index: int, req: RequirementUpdateRequest):
  172. if not DB_PATH.exists():
  173. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  174. with open(DB_PATH, "r", encoding="utf-8") as f:
  175. reqs = json.load(f)
  176. if index < 0 or index >= len(reqs):
  177. raise HTTPException(status_code=404, detail="Requirement index out of range")
  178. reqs[index] = req.requirement
  179. with open(DB_PATH, "w", encoding="utf-8") as f:
  180. json.dump(reqs, f, ensure_ascii=False, indent=2)
  181. return {"status": "success", "requirement": req.requirement}
  182. @app.post("/api/requirements/{index}/cases/filter")
  183. def filter_case(index: int, req: CaseFilterRequest):
  184. idx_str = f"{(index+1):03d}"
  185. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  186. source_path = dir_path / "source.json"
  187. filtered_path = dir_path / "filtered_cases.json"
  188. if not source_path.exists():
  189. raise HTTPException(status_code=404, detail="source.json not found")
  190. with open(source_path, "r", encoding="utf-8") as f:
  191. source_data = json.load(f)
  192. target_case = None
  193. new_sources = []
  194. for c in source_data.get("sources", []):
  195. c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id"))
  196. if c_id == req.case_id:
  197. target_case = c
  198. else:
  199. new_sources.append(c)
  200. if not target_case:
  201. raise HTTPException(status_code=404, detail="Case not found in source.json")
  202. source_data["sources"] = new_sources
  203. source_data["total"] = len(new_sources)
  204. with open(source_path, "w", encoding="utf-8") as f:
  205. json.dump(source_data, f, ensure_ascii=False, indent=2)
  206. filtered_data = {"total": 0, "by_reason": {}}
  207. if filtered_path.exists():
  208. with open(filtered_path, "r", encoding="utf-8") as f:
  209. filtered_data = json.load(f)
  210. reason = req.reason or "manual_delete"
  211. if reason not in filtered_data["by_reason"]:
  212. filtered_data["by_reason"][reason] = {"total": 0, "sources": []}
  213. # Copy and add filter_reason
  214. target_case["filter_reason"] = reason
  215. filtered_data["by_reason"][reason]["sources"].append(target_case)
  216. filtered_data["by_reason"][reason]["total"] = len(filtered_data["by_reason"][reason]["sources"])
  217. total_filtered = sum(r.get("total", 0) for r in filtered_data["by_reason"].values())
  218. filtered_data["total"] = total_filtered
  219. with open(filtered_path, "w", encoding="utf-8") as f:
  220. json.dump(filtered_data, f, ensure_ascii=False, indent=2)
  221. # Also attempt to remove from case.json if it exists
  222. case_path = dir_path.parent / "case.json"
  223. if case_path.exists():
  224. with open(case_path, "r", encoding="utf-8") as f:
  225. case_data = json.load(f)
  226. if "cases" in case_data:
  227. original_len = len(case_data["cases"])
  228. case_data["cases"] = [c for c in case_data["cases"] if c.get("case_id") != req.case_id and c.get("_raw", {}).get("case_id") != req.case_id]
  229. if len(case_data["cases"]) != original_len:
  230. with open(case_path, "w", encoding="utf-8") as f:
  231. json.dump(case_data, f, ensure_ascii=False, indent=2)
  232. return {"status": "success"}
  233. @app.post("/api/requirements/{index}/cases/restore")
  234. def restore_case(index: int, req: CaseFilterRequest):
  235. idx_str = f"{(index+1):03d}"
  236. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  237. source_path = dir_path / "source.json"
  238. filtered_path = dir_path / "filtered_cases.json"
  239. if not filtered_path.exists():
  240. raise HTTPException(status_code=404, detail="filtered_cases.json not found")
  241. with open(filtered_path, "r", encoding="utf-8") as f:
  242. filtered_data = json.load(f)
  243. target_case = None
  244. # Find and remove
  245. for reason, reason_obj in filtered_data.get("by_reason", {}).items():
  246. new_sources = []
  247. for c in reason_obj.get("sources", []):
  248. c_id = c.get("case_id") or (c.get("_raw", {}).get("case_id")) or (c.get("post", {}).get("channel_content_id"))
  249. if c_id == req.case_id:
  250. target_case = c
  251. else:
  252. new_sources.append(c)
  253. if target_case:
  254. reason_obj["sources"] = new_sources
  255. reason_obj["total"] = len(new_sources)
  256. break
  257. if not target_case:
  258. raise HTTPException(status_code=404, detail="Case not found in filtered_cases.json")
  259. total_filtered = sum(r.get("total", 0) for r in filtered_data.get("by_reason", {}).values())
  260. filtered_data["total"] = total_filtered
  261. with open(filtered_path, "w", encoding="utf-8") as f:
  262. json.dump(filtered_data, f, ensure_ascii=False, indent=2)
  263. source_data = {"total": 0, "sources": []}
  264. if source_path.exists():
  265. with open(source_path, "r", encoding="utf-8") as f:
  266. source_data = json.load(f)
  267. # Clean up filter_reason
  268. target_case.pop("filter_reason", None)
  269. source_data["sources"].append(target_case)
  270. source_data["total"] = len(source_data["sources"])
  271. with open(source_path, "w", encoding="utf-8") as f:
  272. json.dump(source_data, f, ensure_ascii=False, indent=2)
  273. # Note: We do not automatically inject it back into case.json because case.json
  274. # requires format normalization (which generate_case.py does). The user must rerun step 1.6.
  275. return {"status": "success"}
  276. @app.post("/api/requirements/{index}/upload_cluster")
  277. async def upload_cluster(index: int, file: UploadFile = File(...)):
  278. idx_str = f"{(index+1):03d}"
  279. dir_path = OUTPUT_DIR / idx_str
  280. dir_path.mkdir(parents=True, exist_ok=True)
  281. cluster_path = dir_path / "cluster.json"
  282. content = await file.read()
  283. try:
  284. json_data = json.loads(content)
  285. # Load existing data or start new list
  286. existing_data = []
  287. if cluster_path.exists():
  288. with open(cluster_path, "r", encoding="utf-8") as f:
  289. try:
  290. existing = json.load(f)
  291. if isinstance(existing, list):
  292. existing_data = existing
  293. else:
  294. existing_data = [existing] # Wrap old format
  295. except:
  296. pass
  297. existing_data.append(json_data)
  298. with open(cluster_path, "w", encoding="utf-8") as f:
  299. json.dump(existing_data, f, ensure_ascii=False, indent=2)
  300. return {"status": "success"}
  301. except Exception as e:
  302. raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
  303. @app.post("/api/requirements/{index}/save_cluster")
  304. async def save_cluster(index: int, request: Request):
  305. idx_str = f"{(index+1):03d}"
  306. dir_path = OUTPUT_DIR / idx_str
  307. dir_path.mkdir(parents=True, exist_ok=True)
  308. cluster_path = dir_path / "cluster.json"
  309. try:
  310. data = await request.json()
  311. with open(cluster_path, "w", encoding="utf-8") as f:
  312. json.dump(data, f, ensure_ascii=False, indent=2)
  313. return {"status": "success"}
  314. except Exception as e:
  315. raise HTTPException(status_code=400, detail=f"Failed to save JSON: {str(e)}")
  316. @app.post("/api/requirements")
  317. def add_requirement(req: RequirementUpdateRequest):
  318. if not DB_PATH.exists():
  319. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  320. with open(DB_PATH, "r", encoding="utf-8") as f:
  321. reqs = json.load(f)
  322. reqs.append(req.requirement)
  323. new_index = len(reqs) - 1
  324. with open(DB_PATH, "w", encoding="utf-8") as f:
  325. json.dump(reqs, f, ensure_ascii=False, indent=2)
  326. # Pre-create output directory
  327. idx_str = f"{(new_index+1):03d}"
  328. dir_path = OUTPUT_DIR / idx_str
  329. dir_path.mkdir(parents=True, exist_ok=True)
  330. return {"status": "success", "index": new_index, "requirement": req.requirement}
  331. @app.get("/api/requirements/{index}/data")
  332. def get_requirement_data(index: int):
  333. idx_str = f"{(index+1):03d}"
  334. dir_path = OUTPUT_DIR / idx_str
  335. def safe_load_json(p: Path):
  336. if not p.exists():
  337. return None
  338. content = ""
  339. try:
  340. with open(p, "r", encoding="utf-8") as f:
  341. content = f.read()
  342. return json.loads(content)
  343. except Exception as e:
  344. return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
  345. data = {
  346. "strategy": safe_load_json(dir_path / "strategy.json"),
  347. "blueprint": safe_load_json(dir_path / "process.json") or safe_load_json(dir_path / "blueprint.json"),
  348. "blueprint_temp": safe_load_json(dir_path / "blueprint_temp.json"),
  349. "cluster": safe_load_json(dir_path / "cluster.json"),
  350. "capabilities": safe_load_json(dir_path / "capabilities.json") or safe_load_json(dir_path / "capabilities_extracted.json"),
  351. "capabilities_temp": safe_load_json(dir_path / "capabilities_temp.json"),
  352. "raw_cases": {
  353. "case": safe_load_json(dir_path / "case.json")
  354. }
  355. }
  356. raw_cases_dir = dir_path / "raw_cases"
  357. if raw_cases_dir.exists():
  358. for f in raw_cases_dir.glob("*.json"):
  359. data["raw_cases"][f.stem] = safe_load_json(f)
  360. return data
  361. @app.get("/api/requirements/{index}/memo")
  362. def get_memo(index: int):
  363. idx_str = f"{(index+1):03d}"
  364. memo_path = OUTPUT_DIR / idx_str / "memo.txt"
  365. if memo_path.exists():
  366. with open(memo_path, "r", encoding="utf-8") as f:
  367. return {"memo": f.read()}
  368. return {"memo": ""}
  369. @app.get("/api/requirements/{index}/pipeline-status")
  370. def get_pipeline_status(index: int):
  371. idx_str = f"{(index+1):03d}"
  372. dir_path = OUTPUT_DIR / idx_str
  373. raw_cases_dir = dir_path / "raw_cases"
  374. status = {
  375. "research": False,
  376. "source": False,
  377. "generate-case": False,
  378. "workflow-extract": False,
  379. "capability-extract-1": False,
  380. "process-cluster": False,
  381. "process-score": False,
  382. "capability-extract": False,
  383. "capability-enrich": False,
  384. "strategy": False
  385. }
  386. if raw_cases_dir.exists():
  387. if list(raw_cases_dir.glob("case_*.json")):
  388. status["research"] = True
  389. if (raw_cases_dir / "source.json").exists():
  390. status["source"] = True
  391. case_file = dir_path / "case.json"
  392. legacy_detailed = raw_cases_dir / "case_detailed.json"
  393. has_workflow = False
  394. has_capability = False
  395. if case_file.exists():
  396. status["generate-case"] = True
  397. try:
  398. with open(case_file, "r", encoding="utf-8") as f:
  399. cdata = json.load(f)
  400. if cdata.get("cases"):
  401. for c in cdata["cases"]:
  402. if c.get("workflow_groups"):
  403. has_workflow = True
  404. if any(
  405. isinstance(group, dict)
  406. and group.get("capability")
  407. for group in (c.get("workflow_groups") or [])
  408. ):
  409. has_capability = True
  410. if has_workflow and has_capability:
  411. break
  412. except Exception:
  413. pass
  414. if legacy_detailed.exists():
  415. status["generate-case"] = True
  416. has_workflow = True
  417. if has_workflow:
  418. status["workflow-extract"] = True
  419. if has_capability:
  420. status["capability-extract-1"] = True
  421. if (dir_path / "blueprint_temp.json").exists():
  422. status["process-cluster"] = True
  423. if (dir_path / "process.json").exists():
  424. status["process-score"] = True
  425. if (dir_path / "capabilities_temp.json").exists():
  426. status["capability-extract"] = True
  427. if (dir_path / "capabilities.json").exists():
  428. status["capability-enrich"] = True
  429. if (dir_path / "strategy.json").exists():
  430. status["strategy"] = True
  431. return status
  432. @app.post("/api/requirements/{index}/memo")
  433. def save_memo(index: int, req: MemoRequest):
  434. idx_str = f"{(index+1):03d}"
  435. dir_path = OUTPUT_DIR / idx_str
  436. dir_path.mkdir(parents=True, exist_ok=True)
  437. memo_path = dir_path / "memo.txt"
  438. with open(memo_path, "w", encoding="utf-8") as f:
  439. f.write(req.memo)
  440. return {"status": "ok"}
  441. @app.get("/api/prompts")
  442. def list_prompts():
  443. if not PROMPTS_DIR.exists():
  444. return []
  445. return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
  446. @app.get("/api/prompts/{name}")
  447. def get_prompt(name: str):
  448. if "/" in name or "\\" in name:
  449. raise HTTPException(status_code=400, detail="Invalid prompt name")
  450. prompt_path = PROMPTS_DIR / name
  451. if not prompt_path.exists() or not prompt_path.is_file():
  452. raise HTTPException(status_code=404, detail="Prompt not found")
  453. schema_name = name.replace(".prompt", ".schema.json")
  454. schema_path = PROMPTS_DIR / schema_name
  455. schema_content = ""
  456. if schema_path.exists() and schema_path.is_file():
  457. with open(schema_path, "r", encoding="utf-8") as f:
  458. schema_content = f.read()
  459. with open(prompt_path, "r", encoding="utf-8") as f:
  460. return {"content": f.read(), "schema_content": schema_content}
  461. @app.post("/api/prompts/{name}")
  462. def save_prompt(name: str, req: PromptRequest):
  463. if "/" in name or "\\" in name:
  464. raise HTTPException(status_code=400, detail="Invalid prompt name")
  465. prompt_path = PROMPTS_DIR / name
  466. if not prompt_path.exists() or not prompt_path.is_file():
  467. raise HTTPException(status_code=404, detail="Prompt not found")
  468. schema_name = name.replace(".prompt", ".schema.json")
  469. schema_path = PROMPTS_DIR / schema_name
  470. if req.schema_content is not None:
  471. if req.schema_content.strip():
  472. try:
  473. parsed_schema = json.loads(req.schema_content)
  474. except Exception as e:
  475. raise HTTPException(status_code=400, detail=f"Invalid Schema JSON: {str(e)}")
  476. with open(schema_path, "w", encoding="utf-8") as f:
  477. json.dump(parsed_schema, f, ensure_ascii=False, indent=2)
  478. else:
  479. if schema_path.exists():
  480. schema_path.unlink() # remove if empty
  481. with open(prompt_path, "w", encoding="utf-8", newline="\n") as f:
  482. # Enforce Unix LF line endings to avoid unnecessary git diffs
  483. # and format inconsistencies between the file system and UI.
  484. f.write(req.content.replace('\r\n', '\n'))
  485. return {"status": "ok"}
  486. @app.post("/api/prompts/{name}/update_schema")
  487. async def api_update_schema(name: str):
  488. if "/" in name or "\\" in name:
  489. raise HTTPException(status_code=400, detail="Invalid prompt name")
  490. prompt_name = name.replace(".prompt", "")
  491. from script.update_schema import update_schema as script_update_schema
  492. try:
  493. # Calls the script to update the schema (this writes to the file)
  494. await script_update_schema(prompt_name=prompt_name, model="openai/gpt-5.4", dry_run=False)
  495. schema_name = name.replace(".prompt", ".schema.json")
  496. schema_path = PROMPTS_DIR / schema_name
  497. schema_content = ""
  498. if schema_path.exists() and schema_path.is_file():
  499. with open(schema_path, "r", encoding="utf-8") as f:
  500. schema_content = f.read()
  501. return {"status": "ok", "schema_content": schema_content}
  502. except Exception as e:
  503. import traceback
  504. traceback.print_exc()
  505. raise HTTPException(status_code=500, detail=f"Failed to update schema: {str(e)}")
  506. async def run_pipeline_task(index: int, run_req: RunRequest):
  507. run_state = active_runs[index]
  508. run_state.status = "running"
  509. dir_path = OUTPUT_DIR / f"{(index+1):03d}"
  510. # We no longer manually delete files here for most modes,
  511. # run_pipeline.py handles overwriting when --only-step or --start-from is passed.
  512. # build command
  513. script_path = BASE_DIR / "run_pipeline.py"
  514. cmd = [sys.executable, str(script_path), "--index", str(index)]
  515. if run_req.platforms:
  516. cmd.extend(["--platforms", run_req.platforms])
  517. if run_req.use_claude_sdk:
  518. cmd.append("--use-claude-sdk")
  519. if run_req.only_step:
  520. cmd.extend(["--only-step", run_req.only_step])
  521. if run_req.case_index is not None:
  522. cmd.extend(["--case-index", str(run_req.case_index)])
  523. elif run_req.phase is not None:
  524. cmd.extend(["--phase", str(run_req.phase)])
  525. else:
  526. if run_req.start_from:
  527. cmd.extend(["--start-from", run_req.start_from])
  528. if run_req.end_at:
  529. cmd.extend(["--end-at", run_req.end_at])
  530. if run_req.restart_mode and run_req.restart_mode == "smart":
  531. pass # Smart mode requires no extra args
  532. run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
  533. import threading
  534. import subprocess
  535. import os
  536. def run_process():
  537. try:
  538. process = subprocess.Popen(
  539. cmd,
  540. stdout=subprocess.PIPE,
  541. stderr=subprocess.STDOUT,
  542. text=True,
  543. encoding="utf-8",
  544. bufsize=1,
  545. cwd=str(BASE_DIR),
  546. env=dict(os.environ, PYTHONIOENCODING="utf-8")
  547. )
  548. run_state.process = process
  549. for line in iter(process.stdout.readline, ''):
  550. if line:
  551. run_state.logs.append(line)
  552. process.stdout.close()
  553. return_code = process.wait()
  554. run_state.logs.append(f"\nProcess exited with code {return_code}")
  555. run_state.status = "completed" if return_code == 0 else "failed"
  556. except Exception as e:
  557. run_state.logs.append(f"\nException occurred: {repr(e)}")
  558. run_state.status = "failed"
  559. thread = threading.Thread(target=run_process)
  560. thread.start()
  561. @app.post("/api/pipeline/run/{index}")
  562. async def trigger_pipeline(index: int, req: RunRequest):
  563. if index in active_runs and active_runs[index].status == "running":
  564. raise HTTPException(status_code=400, detail="Pipeline already running for this index")
  565. active_runs[index] = ActiveRun()
  566. asyncio.create_task(run_pipeline_task(index, req))
  567. return {"message": "Pipeline started", "index": index}
  568. @app.post("/api/pipeline/stop/{index}")
  569. async def stop_pipeline(index: int):
  570. if index not in active_runs or active_runs[index].status != "running":
  571. raise HTTPException(status_code=400, detail="No running pipeline found for this index")
  572. process = active_runs[index].process
  573. if process:
  574. try:
  575. process.terminate()
  576. active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
  577. active_runs[index].status = "failed"
  578. return {"status": "stopped"}
  579. except Exception as e:
  580. raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
  581. raise HTTPException(status_code=500, detail="Process handle missing")
  582. @app.get("/api/pipeline/status")
  583. def get_all_status():
  584. res = {}
  585. for idx, run in active_runs.items():
  586. res[idx] = {
  587. "status": run.status,
  588. "start_time": run.start_time,
  589. "logs": list(run.logs)
  590. }
  591. return res
  592. # Mount UI static files
  593. ui_dir = BASE_DIR / "ui"
  594. ui_dir.mkdir(exist_ok=True)
  595. app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
  596. # Mount output directory for local resources (like images)
  597. app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
  598. @app.get("/")
  599. def serve_ui():
  600. index_html = ui_dir / "index.html"
  601. if not index_html.exists():
  602. return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
  603. with open(index_html, "r", encoding="utf-8") as f:
  604. return HTMLResponse(f.read())
  605. if __name__ == "__main__":
  606. print("Starting Pipeline Dashboard server on http://127.0.0.0:18080")
  607. uvicorn.run("server:app", host="0.0.0.0", port=18080, reload=False)