server.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. import asyncio
  2. import json
  3. from pathlib import Path
  4. from typing import Dict, List, Optional
  5. from datetime import datetime
  6. from collections import deque
  7. from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, UploadFile, File
  8. from fastapi.staticfiles import StaticFiles
  9. from fastapi.responses import HTMLResponse
  10. from pydantic import BaseModel
  11. import uvicorn
  12. import sys
  13. app = FastAPI(title="Pipeline Dashboard")
  14. BASE_DIR = Path(__file__).parent
  15. OUTPUT_DIR = BASE_DIR / "output"
  16. DB_PATH = BASE_DIR / "db_requirements.json"
  17. PROMPTS_DIR = BASE_DIR / "prompts"
  18. # In-memory storage for active runs
  19. class ActiveRun:
  20. def __init__(self):
  21. self.process: Optional[asyncio.subprocess.Process] = None
  22. self.logs: deque = deque(maxlen=200) # Keep last 200 lines
  23. self.status: str = "starting"
  24. self.start_time: str = datetime.now().isoformat()
  25. active_runs: Dict[int, ActiveRun] = {}
  26. @app.post("/api/requirements/{index}/upload_source_ex")
  27. async def upload_source_ex(index: int, file: UploadFile = File(...)):
  28. idx_str = f"{(index+1):03d}"
  29. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  30. dir_path.mkdir(parents=True, exist_ok=True)
  31. file_path = dir_path / "source_ex.json"
  32. content = await file.read()
  33. with open(file_path, "wb") as f:
  34. f.write(content)
  35. return {"status": "success", "filename": "source_ex.json"}
  36. class RunRequest(BaseModel):
  37. platforms: Optional[str] = None
  38. use_claude_sdk: bool = False
  39. restart_mode: Optional[str] = None
  40. phase: Optional[int] = None
  41. only_step: Optional[str] = None
  42. start_from: Optional[str] = None
  43. end_at: Optional[str] = None
  44. case_index: Optional[int] = None
  45. class MemoRequest(BaseModel):
  46. memo: str
  47. class RequirementUpdateRequest(BaseModel):
  48. requirement: str
  49. class PromptRequest(BaseModel):
  50. content: str
  51. schema_content: Optional[str] = None
  52. class ImportExternalRequest(BaseModel):
  53. case_id: str
  54. @app.post("/api/requirements/{index}/import_source_ex")
  55. def import_source_ex(index: int, req: ImportExternalRequest):
  56. idx_str = f"{(index+1):03d}"
  57. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  58. source_ex_path = dir_path / "source_ex.json"
  59. source_path = dir_path / "source.json"
  60. if not source_ex_path.exists():
  61. raise HTTPException(status_code=404, detail="source_ex.json not found")
  62. with open(source_ex_path, "r", encoding="utf-8") as f:
  63. ex_data = json.load(f)
  64. ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
  65. target_case = None
  66. for c in ex_cases:
  67. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  68. if cId == req.case_id:
  69. target_case = c
  70. break
  71. if not target_case:
  72. raise HTTPException(status_code=404, detail="Case not found in external sources")
  73. source_cases = []
  74. if source_path.exists():
  75. with open(source_path, "r", encoding="utf-8") as f:
  76. s_data = json.load(f)
  77. source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
  78. # Check if already exists
  79. for c in source_cases:
  80. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  81. if cId == req.case_id:
  82. return {"status": "success", "message": "Already imported"}
  83. # Append
  84. source_cases.append(target_case)
  85. with open(source_path, "w", encoding="utf-8") as f:
  86. # Save as object format for source.json
  87. json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
  88. return {"status": "success"}
  89. @app.post("/api/requirements/{index}/import_all_source_ex")
  90. def import_all_source_ex(index: int):
  91. idx_str = f"{(index+1):03d}"
  92. dir_path = OUTPUT_DIR / idx_str / "raw_cases"
  93. source_ex_path = dir_path / "source_ex.json"
  94. source_path = dir_path / "source.json"
  95. if not source_ex_path.exists():
  96. raise HTTPException(status_code=404, detail="source_ex.json not found")
  97. with open(source_ex_path, "r", encoding="utf-8") as f:
  98. ex_data = json.load(f)
  99. ex_cases = ex_data if isinstance(ex_data, list) else (ex_data.get("cases") or ex_data.get("sources") or [])
  100. source_cases = []
  101. if source_path.exists():
  102. with open(source_path, "r", encoding="utf-8") as f:
  103. s_data = json.load(f)
  104. source_cases = s_data if isinstance(s_data, list) else (s_data.get("cases") or s_data.get("sources") or [])
  105. existing_ids = set()
  106. for c in source_cases:
  107. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  108. if cId:
  109. existing_ids.add(cId)
  110. added_count = 0
  111. for c in ex_cases:
  112. cId = c.get("case_id") or (c.get("_raw") and c["_raw"].get("case_id")) or (c.get("post") and c["post"].get("channel_content_id"))
  113. if cId and cId not in existing_ids:
  114. source_cases.append(c)
  115. existing_ids.add(cId)
  116. added_count += 1
  117. if added_count > 0:
  118. with open(source_path, "w", encoding="utf-8") as f:
  119. json.dump({"total": len(source_cases), "sources": source_cases}, f, ensure_ascii=False, indent=2)
  120. return {"status": "success", "count": added_count}
  121. @app.get("/api/requirements")
  122. def get_requirements():
  123. if not DB_PATH.exists():
  124. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  125. with open(DB_PATH, "r", encoding="utf-8") as f:
  126. reqs = json.load(f)
  127. results = []
  128. for i, req in enumerate(reqs):
  129. idx_str = f"{(i+1):03d}"
  130. dir_path = OUTPUT_DIR / idx_str
  131. has_strategy = (dir_path / "strategy.json").exists()
  132. has_blueprint = (dir_path / "blueprint.json").exists()
  133. has_caps = (dir_path / "capabilities_extracted.json").exists()
  134. raw_cases_count = 0
  135. case_json_path = dir_path / "case.json"
  136. if case_json_path.exists():
  137. try:
  138. import json
  139. with open(case_json_path, 'r', encoding='utf-8') as f:
  140. case_data = json.load(f)
  141. raw_cases_count = len(case_data.get('cases', []))
  142. except:
  143. pass
  144. if raw_cases_count == 0:
  145. raw_cases_dir = dir_path / "raw_cases"
  146. if raw_cases_dir.exists():
  147. raw_cases_count = len(list(raw_cases_dir.glob("case_*.json")))
  148. status = "completed" if has_strategy else ("partial" if raw_cases_count > 0 else "pending")
  149. if i in active_runs and active_runs[i].status == "running":
  150. status = "running"
  151. memo_content = ""
  152. memo_path = dir_path / "memo.txt"
  153. if memo_path.exists():
  154. with open(memo_path, "r", encoding="utf-8") as fm:
  155. memo_content = fm.read().strip()
  156. results.append({
  157. "index": i,
  158. "id": idx_str,
  159. "requirement": req,
  160. "status": status,
  161. "has_strategy": has_strategy,
  162. "has_blueprint": has_blueprint,
  163. "has_caps": has_caps,
  164. "raw_cases_count": raw_cases_count,
  165. "memo": memo_content
  166. })
  167. return results
  168. @app.put("/api/requirements/{index}")
  169. def update_requirement(index: int, req: RequirementUpdateRequest):
  170. if not DB_PATH.exists():
  171. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  172. with open(DB_PATH, "r", encoding="utf-8") as f:
  173. reqs = json.load(f)
  174. if index < 0 or index >= len(reqs):
  175. raise HTTPException(status_code=404, detail="Requirement index out of range")
  176. reqs[index] = req.requirement
  177. with open(DB_PATH, "w", encoding="utf-8") as f:
  178. json.dump(reqs, f, ensure_ascii=False, indent=2)
  179. return {"status": "success", "requirement": req.requirement}
  180. @app.post("/api/requirements")
  181. def add_requirement(req: RequirementUpdateRequest):
  182. if not DB_PATH.exists():
  183. raise HTTPException(status_code=404, detail="db_requirements.json not found")
  184. with open(DB_PATH, "r", encoding="utf-8") as f:
  185. reqs = json.load(f)
  186. reqs.append(req.requirement)
  187. new_index = len(reqs) - 1
  188. with open(DB_PATH, "w", encoding="utf-8") as f:
  189. json.dump(reqs, f, ensure_ascii=False, indent=2)
  190. # Pre-create output directory
  191. idx_str = f"{(new_index+1):03d}"
  192. dir_path = OUTPUT_DIR / idx_str
  193. dir_path.mkdir(parents=True, exist_ok=True)
  194. return {"status": "success", "index": new_index, "requirement": req.requirement}
  195. @app.get("/api/requirements/{index}/data")
  196. def get_requirement_data(index: int):
  197. idx_str = f"{(index+1):03d}"
  198. dir_path = OUTPUT_DIR / idx_str
  199. def safe_load_json(p: Path):
  200. if not p.exists():
  201. return None
  202. content = ""
  203. try:
  204. with open(p, "r", encoding="utf-8") as f:
  205. content = f.read()
  206. return json.loads(content)
  207. except Exception as e:
  208. return {"error": "Failed to parse JSON", "raw_content": content, "details": str(e)}
  209. data = {
  210. "strategy": safe_load_json(dir_path / "strategy.json"),
  211. "blueprint": safe_load_json(dir_path / "process.json") or safe_load_json(dir_path / "blueprint.json"),
  212. "blueprint_temp": safe_load_json(dir_path / "blueprint_temp.json"),
  213. "capabilities": safe_load_json(dir_path / "capabilities.json") or safe_load_json(dir_path / "capabilities_extracted.json"),
  214. "capabilities_temp": safe_load_json(dir_path / "capabilities_temp.json"),
  215. "raw_cases": {
  216. "case": safe_load_json(dir_path / "case.json")
  217. }
  218. }
  219. raw_cases_dir = dir_path / "raw_cases"
  220. if raw_cases_dir.exists():
  221. for f in raw_cases_dir.glob("*.json"):
  222. data["raw_cases"][f.stem] = safe_load_json(f)
  223. return data
  224. @app.get("/api/requirements/{index}/memo")
  225. def get_memo(index: int):
  226. idx_str = f"{(index+1):03d}"
  227. memo_path = OUTPUT_DIR / idx_str / "memo.txt"
  228. if memo_path.exists():
  229. with open(memo_path, "r", encoding="utf-8") as f:
  230. return {"memo": f.read()}
  231. return {"memo": ""}
  232. @app.get("/api/requirements/{index}/pipeline-status")
  233. def get_pipeline_status(index: int):
  234. idx_str = f"{(index+1):03d}"
  235. dir_path = OUTPUT_DIR / idx_str
  236. raw_cases_dir = dir_path / "raw_cases"
  237. status = {
  238. "research": False,
  239. "source": False,
  240. "generate-case": False,
  241. "workflow-extract": False,
  242. "capability-extract-1": False,
  243. "process-cluster": False,
  244. "process-score": False,
  245. "capability-extract": False,
  246. "capability-enrich": False,
  247. "strategy": False
  248. }
  249. if raw_cases_dir.exists():
  250. if list(raw_cases_dir.glob("case_*.json")):
  251. status["research"] = True
  252. if (raw_cases_dir / "source.json").exists():
  253. status["source"] = True
  254. case_file = dir_path / "case.json"
  255. legacy_detailed = raw_cases_dir / "case_detailed.json"
  256. has_workflow = False
  257. has_capability = False
  258. if case_file.exists():
  259. status["generate-case"] = True
  260. try:
  261. with open(case_file, "r", encoding="utf-8") as f:
  262. cdata = json.load(f)
  263. if cdata.get("cases"):
  264. for c in cdata["cases"]:
  265. if c.get("workflow"):
  266. has_workflow = True
  267. if c.get("capabilities"):
  268. has_capability = True
  269. if has_workflow and has_capability:
  270. break
  271. except Exception:
  272. pass
  273. if legacy_detailed.exists():
  274. status["generate-case"] = True
  275. has_workflow = True
  276. if has_workflow:
  277. status["workflow-extract"] = True
  278. if has_capability:
  279. status["capability-extract-1"] = True
  280. if (dir_path / "blueprint_temp.json").exists():
  281. status["process-cluster"] = True
  282. if (dir_path / "process.json").exists():
  283. status["process-score"] = True
  284. if (dir_path / "capabilities_temp.json").exists():
  285. status["capability-extract"] = True
  286. if (dir_path / "capabilities.json").exists():
  287. status["capability-enrich"] = True
  288. if (dir_path / "strategy.json").exists():
  289. status["strategy"] = True
  290. return status
  291. @app.post("/api/requirements/{index}/memo")
  292. def save_memo(index: int, req: MemoRequest):
  293. idx_str = f"{(index+1):03d}"
  294. dir_path = OUTPUT_DIR / idx_str
  295. dir_path.mkdir(parents=True, exist_ok=True)
  296. memo_path = dir_path / "memo.txt"
  297. with open(memo_path, "w", encoding="utf-8") as f:
  298. f.write(req.memo)
  299. return {"status": "ok"}
  300. @app.get("/api/prompts")
  301. def list_prompts():
  302. if not PROMPTS_DIR.exists():
  303. return []
  304. return [f.name for f in PROMPTS_DIR.glob("*.prompt")]
  305. @app.get("/api/prompts/{name}")
  306. def get_prompt(name: str):
  307. if "/" in name or "\\" in name:
  308. raise HTTPException(status_code=400, detail="Invalid prompt name")
  309. prompt_path = PROMPTS_DIR / name
  310. if not prompt_path.exists() or not prompt_path.is_file():
  311. raise HTTPException(status_code=404, detail="Prompt not found")
  312. schema_name = name.replace(".prompt", ".schema.json")
  313. schema_path = PROMPTS_DIR / schema_name
  314. schema_content = ""
  315. if schema_path.exists() and schema_path.is_file():
  316. with open(schema_path, "r", encoding="utf-8") as f:
  317. schema_content = f.read()
  318. with open(prompt_path, "r", encoding="utf-8") as f:
  319. return {"content": f.read(), "schema_content": schema_content}
  320. @app.post("/api/prompts/{name}")
  321. def save_prompt(name: str, req: PromptRequest):
  322. if "/" in name or "\\" in name:
  323. raise HTTPException(status_code=400, detail="Invalid prompt name")
  324. prompt_path = PROMPTS_DIR / name
  325. if not prompt_path.exists() or not prompt_path.is_file():
  326. raise HTTPException(status_code=404, detail="Prompt not found")
  327. schema_name = name.replace(".prompt", ".schema.json")
  328. schema_path = PROMPTS_DIR / schema_name
  329. if req.schema_content is not None:
  330. if req.schema_content.strip():
  331. try:
  332. parsed_schema = json.loads(req.schema_content)
  333. except Exception as e:
  334. raise HTTPException(status_code=400, detail=f"Invalid Schema JSON: {str(e)}")
  335. with open(schema_path, "w", encoding="utf-8") as f:
  336. json.dump(parsed_schema, f, ensure_ascii=False, indent=2)
  337. else:
  338. if schema_path.exists():
  339. schema_path.unlink() # remove if empty
  340. with open(prompt_path, "w", encoding="utf-8", newline="\n") as f:
  341. # Enforce Unix LF line endings to avoid unnecessary git diffs
  342. # and format inconsistencies between the file system and UI.
  343. f.write(req.content.replace('\r\n', '\n'))
  344. return {"status": "ok"}
  345. async def run_pipeline_task(index: int, run_req: RunRequest):
  346. run_state = active_runs[index]
  347. run_state.status = "running"
  348. dir_path = OUTPUT_DIR / f"{(index+1):03d}"
  349. # We no longer manually delete files here for most modes,
  350. # run_pipeline.py handles overwriting when --only-step or --start-from is passed.
  351. # build command
  352. script_path = BASE_DIR / "run_pipeline.py"
  353. cmd = [sys.executable, str(script_path), "--index", str(index)]
  354. if run_req.platforms:
  355. cmd.extend(["--platforms", run_req.platforms])
  356. if run_req.use_claude_sdk:
  357. cmd.append("--use-claude-sdk")
  358. if run_req.only_step:
  359. cmd.extend(["--only-step", run_req.only_step])
  360. if run_req.case_index is not None:
  361. cmd.extend(["--case-index", str(run_req.case_index)])
  362. elif run_req.phase is not None:
  363. cmd.extend(["--phase", str(run_req.phase)])
  364. else:
  365. if run_req.start_from:
  366. cmd.extend(["--start-from", run_req.start_from])
  367. if run_req.end_at:
  368. cmd.extend(["--end-at", run_req.end_at])
  369. if run_req.restart_mode and run_req.restart_mode == "smart":
  370. pass # Smart mode requires no extra args
  371. run_state.logs.append(f"Starting command: {' '.join(cmd)}\n")
  372. import threading
  373. import subprocess
  374. import os
  375. def run_process():
  376. try:
  377. process = subprocess.Popen(
  378. cmd,
  379. stdout=subprocess.PIPE,
  380. stderr=subprocess.STDOUT,
  381. text=True,
  382. encoding="utf-8",
  383. bufsize=1,
  384. cwd=str(BASE_DIR),
  385. env=dict(os.environ, PYTHONIOENCODING="utf-8")
  386. )
  387. run_state.process = process
  388. for line in iter(process.stdout.readline, ''):
  389. if line:
  390. run_state.logs.append(line)
  391. process.stdout.close()
  392. return_code = process.wait()
  393. run_state.logs.append(f"\nProcess exited with code {return_code}")
  394. run_state.status = "completed" if return_code == 0 else "failed"
  395. except Exception as e:
  396. run_state.logs.append(f"\nException occurred: {repr(e)}")
  397. run_state.status = "failed"
  398. thread = threading.Thread(target=run_process)
  399. thread.start()
  400. @app.post("/api/pipeline/run/{index}")
  401. async def trigger_pipeline(index: int, req: RunRequest):
  402. if index in active_runs and active_runs[index].status == "running":
  403. raise HTTPException(status_code=400, detail="Pipeline already running for this index")
  404. active_runs[index] = ActiveRun()
  405. asyncio.create_task(run_pipeline_task(index, req))
  406. return {"message": "Pipeline started", "index": index}
  407. @app.post("/api/pipeline/stop/{index}")
  408. async def stop_pipeline(index: int):
  409. if index not in active_runs or active_runs[index].status != "running":
  410. raise HTTPException(status_code=400, detail="No running pipeline found for this index")
  411. process = active_runs[index].process
  412. if process:
  413. try:
  414. process.terminate()
  415. active_runs[index].logs.append("\n[System] 🛑 Pipeline forcefully terminated by user.\n")
  416. active_runs[index].status = "failed"
  417. return {"status": "stopped"}
  418. except Exception as e:
  419. raise HTTPException(status_code=500, detail=f"Failed to terminate: {str(e)}")
  420. raise HTTPException(status_code=500, detail="Process handle missing")
  421. @app.get("/api/pipeline/status")
  422. def get_all_status():
  423. res = {}
  424. for idx, run in active_runs.items():
  425. res[idx] = {
  426. "status": run.status,
  427. "start_time": run.start_time,
  428. "logs": list(run.logs)
  429. }
  430. return res
  431. # Mount UI static files
  432. ui_dir = BASE_DIR / "ui"
  433. ui_dir.mkdir(exist_ok=True)
  434. app.mount("/static", StaticFiles(directory=str(ui_dir)), name="static")
  435. # Mount output directory for local resources (like images)
  436. app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
  437. @app.get("/")
  438. def serve_ui():
  439. index_html = ui_dir / "index.html"
  440. if not index_html.exists():
  441. return HTMLResponse("UI not found. Please create ui/index.html", status_code=404)
  442. with open(index_html, "r", encoding="utf-8") as f:
  443. return HTMLResponse(f.read())
  444. if __name__ == "__main__":
  445. print("Starting Pipeline Dashboard server on http://127.0.0.0:18080")
  446. uvicorn.run("server:app", host="0.0.0.0", port=18080, reload=False)