Parcourir la source

update case history management

guantao il y a 12 heures
Parent
commit
37d05014f2

+ 17 - 3
examples/process_pipeline/run_pipeline.py

@@ -677,10 +677,19 @@ async def main():
     raw_cases_dir = output_dir / "raw_cases"
     raw_cases_dir.mkdir(parents=True, exist_ok=True)
 
-    # 0.5 stdout/stderr 同时落到 output_dir 下的运行日志(思路1:tee)
-    # 留痕用,文件名带时间戳保留每次跑的历史
-    _run_log_path = output_dir / f"run_{datetime.now():%Y%m%d_%H%M%S}.log"
+    # 0.5 本次运行的快照盒:output_dir/history/<run_id>/{case.json, run.log}
+    # run_id 同时作为 case_history 的活动 run id,让快照和 log 落在同一个文件夹
+    from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file
+    _run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
+    set_run_id(_run_id)
+    _run_dir = output_dir / "history" / _run_id
+    _run_dir.mkdir(parents=True, exist_ok=True)
+    _run_log_path = _run_dir / "run.log"
     _run_log_file = open(_run_log_path, "w", encoding="utf-8")
+    # 启动时立刻快照一次原 case.json(如果已存在),作为本次运行的回滚点
+    _initial_case_file = output_dir / "case.json"
+    if _initial_case_file.exists():
+        snapshot_case_file(_initial_case_file, step="run_start")
 
     class _Tee:
         def __init__(self, *streams):
@@ -924,6 +933,11 @@ async def main():
                         if c.get("index") == args.case_index:
                             original_data["cases"][i] = updated_case
                             break
+                    # 写前快照:把旧 case.json 复制到 history/ 留底
+                    from examples.process_pipeline.script.case_history import snapshot_case_file
+                    snap = snapshot_case_file(case_file, step="workflow_merge")
+                    if snap:
+                        print(f"   [snapshot] {snap.name}")
                     with open(case_file, "w", encoding="utf-8") as f:
                         json.dump(original_data, f, ensure_ascii=False, indent=2)
                     temp_case_file.unlink()  # 删除临时文件

+ 75 - 0
examples/process_pipeline/script/case_history.py

@@ -0,0 +1,75 @@
+"""
+case.json 写前快照工具
+
+每次 pipeline 运行(一次 run_pipeline.py 调用)会被分配一个 run_id(时间戳),
+对应 {output_dir}/history/{run_id}/ 文件夹。该文件夹里:
+  - case.json       本次运行开始前的 case.json 完整快照(用于回滚)
+  - run.log         本次运行的 stdout/stderr 全文(由 run_pipeline.py 的 tee 写入)
+
+设计原则:
+- 一次 run = 一个 history 子文件夹 = 一个回滚点
+- snapshot_case_file 是 first-call-wins:本次运行已快照过就不再覆盖(避免被中
+  途的中间状态污染回滚点)
+- 静默失败:快照失败不阻塞主流程
+"""
+
+import shutil
+from datetime import datetime
+from pathlib import Path
+from typing import Optional, Union
+
+
+_RUN_ID: Optional[str] = None
+
+
+def set_run_id(rid: str) -> None:
+    """由 run_pipeline.py 在启动时调用,固定本次 run 的 id(建议时间戳格式)"""
+    global _RUN_ID
+    _RUN_ID = rid
+
+
+def get_run_id() -> str:
+    """取当前 run id;未 set 过则用 fallback 时间戳(一般不应走到这里)"""
+    if _RUN_ID is None:
+        return datetime.now().strftime("%Y%m%d_%H%M%S")
+    return _RUN_ID
+
+
+def get_run_history_dir(case_file: Union[str, Path]) -> Path:
+    """根据 case_file 位置算出本次 run 的 history 子目录路径"""
+    p = Path(case_file)
+    return p.parent / "history" / get_run_id()
+
+
+def snapshot_case_file(
+    case_file: Union[str, Path],
+    step: str = "unknown",
+) -> Optional[Path]:
+    """
+    在 case_file 被覆写之前调用。把当前内容复制到本次 run 对应的 history 子目录。
+
+    First-call-wins:若该 run 的快照已存在,直接返回已有路径,不覆盖。
+    这样无论是 step 启动还是 merge-back 调,本次运行的回滚点都是"运行开始前的状态"。
+
+    Args:
+        case_file: 即将被覆写的 case.json 路径
+        step: 仅用于日志/调试,不进入路径
+
+    Returns:
+        快照文件路径;原文件不存在或异常时返回 None
+    """
+    p = Path(case_file)
+    if not p.exists():
+        return None
+
+    try:
+        run_dir = get_run_history_dir(p)
+        run_dir.mkdir(parents=True, exist_ok=True)
+        dst = run_dir / "case.json"
+        if dst.exists():
+            return dst  # first-call-wins
+        shutil.copy2(p, dst)
+        return dst
+    except Exception as e:
+        print(f"  ⚠ snapshot failed: {type(e).__name__}: {e}")
+        return None

+ 19 - 5
examples/process_pipeline/script/extract_workflow.py

@@ -280,11 +280,20 @@ async def extract_workflow(
             print(f"  <- [{index}] [{case_id}] workflow {status}")
 
             result = dict(case_item)
-            result["workflow_groups"] = workflow_groups if workflow_groups is not None else []
-            result.pop("workflow", None)
-            result.pop("capability", None)
-            result.pop("capabilities", None)
-            result.pop("fragments", None)
+            if workflow_groups:
+                # 提取成功:写入新数据并清理旧格式残留
+                result["workflow_groups"] = workflow_groups
+                result.pop("workflow", None)
+                result.pop("capability", None)
+                result.pop("capabilities", None)
+                result.pop("fragments", None)
+            else:
+                # 提取失败:保留 case_item 原有内容(包括旧 workflow_groups 或 legacy workflow/capability)
+                if case_item.get("workflow_groups") or case_item.get("workflow"):
+                    print(f"  ⚠ [{index}] [{case_id}] extraction failed, preserving previous workflow data")
+                else:
+                    # 之前也没数据,给一个空数组让结构一致
+                    result["workflow_groups"] = []
             return result, cost
 
     tasks = [process_with_semaphore(case) for case in cases_to_process]
@@ -311,6 +320,11 @@ async def extract_workflow(
     case_data["cases"] = results
 
     case_file.parent.mkdir(parents=True, exist_ok=True)
+    # 写前快照:把旧 case.json 复制到 history/ 留底,便于失败回滚
+    from examples.process_pipeline.script.case_history import snapshot_case_file
+    snap = snapshot_case_file(case_file, step="workflow")
+    if snap:
+        print(f"  [snapshot] {snap.name}")
     with open(case_file, "w", encoding="utf-8") as f:
         json.dump(case_data, f, ensure_ascii=False, indent=2)
 

+ 200 - 0
examples/process_pipeline/script/recover.py

@@ -0,0 +1,200 @@
+"""
+case.json 历史快照恢复工具
+
+每次 pipeline 运行会在 {output_dir}/history/<run_id>/ 留下快照盒:
+  - case.json   运行开始前的 case.json 完整快照
+  - run.log     该次运行的全文日志(stdout + stderr)
+
+本脚本用于查看运行清单、恢复整个文件或恢复单个 case。
+
+用法:
+  # 列出所有 run(含 case 数、有 workflow 数、log 大小)
+  python script/recover.py list  output/112
+
+  # 全文件回滚到某次 run 开始前的状态
+  python script/recover.py restore output/112  20260509_141802
+
+  # 仅恢复指定 case(其他 case 不动)
+  python script/recover.py restore output/112  20260509_141802  --case-index 12
+
+  # 批量恢复多个 case
+  python script/recover.py restore output/112  20260509_141802  --case-index 1,5,12
+"""
+
+import argparse
+import json
+import shutil
+import sys
+from datetime import datetime
+from pathlib import Path
+from typing import List, Optional
+
+
+def _summarize_case_file(p: Path) -> dict:
+    """读取 case.json 并返回简要统计"""
+    try:
+        with open(p, "r", encoding="utf-8") as f:
+            d = json.load(f)
+    except Exception as e:
+        return {"error": f"{type(e).__name__}: {e}"}
+    cases = d.get("cases", []) if isinstance(d, dict) else []
+    with_wf = sum(
+        1 for c in cases
+        if (c.get("workflow_groups")
+            or (c.get("workflow") and c.get("workflow", {}).get("steps")))
+    )
+    return {
+        "size_bytes": p.stat().st_size,
+        "total_cases": len(cases),
+        "with_workflow": with_wf,
+    }
+
+
+def cmd_list(output_dir: Path) -> int:
+    history_dir = output_dir / "history"
+    if not history_dir.exists():
+        print(f"No history directory at: {history_dir}")
+        return 0
+
+    # 每次 run 是 history/<run_id>/,每个里面有 case.json 和 run.log
+    run_dirs = sorted(
+        [d for d in history_dir.iterdir() if d.is_dir()],
+        key=lambda d: d.name,
+        reverse=True,
+    )
+    if not run_dirs:
+        print(f"History directory empty: {history_dir}")
+        return 0
+
+    current = output_dir / "case.json"
+    if current.exists():
+        info = _summarize_case_file(current)
+        print(f"\n=== Current case.json ===")
+        print(f"  total={info.get('total_cases', 0)}  with_workflow={info.get('with_workflow', 0)}  size={info.get('size_bytes', 0):,}B")
+
+    print(f"\n=== Run history in {history_dir} (newest first, {len(run_dirs)} runs) ===")
+    print(f"  {'run_id':<20}  {'cases':>5}  {'with_wf':>7}  {'case.json':>10}  {'log':>9}")
+    for d in run_dirs:
+        snap = d / "case.json"
+        log = d / "run.log"
+        if snap.exists():
+            info = _summarize_case_file(snap)
+            cases_str = str(info.get("total_cases", 0))
+            wf_str = str(info.get("with_workflow", 0))
+            snap_size = f"{info.get('size_bytes', 0):,}B"
+        else:
+            cases_str, wf_str, snap_size = "-", "-", "(missing)"
+        log_size = f"{log.stat().st_size:,}B" if log.exists() else "(none)"
+        print(f"  {d.name:<20}  {cases_str:>5}  {wf_str:>7}  {snap_size:>10}  {log_size:>9}")
+    return 0
+
+
+def _parse_case_indices(s: str) -> List[int]:
+    """把 '1,5,12' 拆成 [1, 5, 12],单数字也支持"""
+    return [int(x.strip()) for x in s.split(",") if x.strip()]
+
+
+def cmd_restore(
+    output_dir: Path,
+    run_id: str,
+    case_indices: Optional[List[int]],
+) -> int:
+    history_dir = output_dir / "history"
+    snap_path = history_dir / run_id / "case.json"
+    if not snap_path.exists():
+        print(f"Snapshot not found: {snap_path}", file=sys.stderr)
+        return 1
+
+    target = output_dir / "case.json"
+    if not target.exists():
+        print(f"Target case.json not found: {target}", file=sys.stderr)
+        return 1
+
+    # 恢复前先快照当前 case.json — 让 restore 操作本身也可回滚
+    # 用 set_run_id("restore_<ts>") 让快照落到独立 run 文件夹
+    from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file
+    set_run_id(f"restore_{datetime.now():%Y%m%d_%H%M%S}")
+    pre_snap = snapshot_case_file(target, step="pre_restore")
+    if pre_snap:
+        print(f"  [snapshot] saved current state to history/{pre_snap.parent.name}/{pre_snap.name}")
+
+    # 加载快照
+    with open(snap_path, "r", encoding="utf-8") as f:
+        snap_data = json.load(f)
+
+    if case_indices is None:
+        # 整文件恢复
+        shutil.copy2(snap_path, target)
+        info = _summarize_case_file(target)
+        print(f"  ✓ restored full case.json from run {run_id}")
+        print(f"    now: total={info.get('total_cases', 0)}  with_workflow={info.get('with_workflow', 0)}")
+        return 0
+
+    # 单/多 case 恢复
+    snap_cases = {c.get("index"): c for c in snap_data.get("cases", [])}
+    missing = [i for i in case_indices if i not in snap_cases]
+    if missing:
+        print(f"  ✗ case index {missing} not found in snapshot", file=sys.stderr)
+        return 1
+
+    with open(target, "r", encoding="utf-8") as f:
+        target_data = json.load(f)
+
+    target_cases = target_data.get("cases", [])
+    target_index_map = {c.get("index"): i for i, c in enumerate(target_cases)}
+    restored: List[int] = []
+    appended: List[int] = []
+    for idx in case_indices:
+        snap_case = snap_cases[idx]
+        if idx in target_index_map:
+            target_cases[target_index_map[idx]] = snap_case
+            restored.append(idx)
+        else:
+            target_cases.append(snap_case)
+            appended.append(idx)
+
+    target_cases.sort(key=lambda c: c.get("index", 0))
+    target_data["cases"] = target_cases
+
+    with open(target, "w", encoding="utf-8") as f:
+        json.dump(target_data, f, ensure_ascii=False, indent=2)
+
+    print(f"  ✓ restored {len(restored)} case(s) from run {run_id}: {restored}")
+    if appended:
+        print(f"  + appended {len(appended)} new case(s) (not present in current): {appended}")
+    return 0
+
+
+def main() -> int:
+    parser = argparse.ArgumentParser(description="Restore case.json from history snapshots")
+    sub = parser.add_subparsers(dest="cmd", required=True)
+
+    p_list = sub.add_parser("list", help="List snapshots in {output_dir}/history/")
+    p_list.add_argument("output_dir", type=Path, help="output dir, e.g. examples/process_pipeline/output/112")
+
+    p_restore = sub.add_parser("restore", help="Restore from a run snapshot (auto-snapshots current state first)")
+    p_restore.add_argument("output_dir", type=Path)
+    p_restore.add_argument("run_id", help="run id (timestamp folder name), e.g. 20260509_141802")
+    p_restore.add_argument(
+        "--case-index",
+        type=str,
+        default=None,
+        help="restore only specified case(s), comma-separated (e.g. '12' or '1,5,12'); omit to restore full file",
+    )
+
+    args = parser.parse_args()
+
+    if args.cmd == "list":
+        return cmd_list(args.output_dir)
+    elif args.cmd == "restore":
+        indices = _parse_case_indices(args.case_index) if args.case_index else None
+        return cmd_restore(args.output_dir, args.run_id, indices)
+    return 1
+
+
+if __name__ == "__main__":
+    # 让脚本能在 repo 根目录直接 python script/recover.py 跑
+    repo_root = Path(__file__).resolve().parent.parent.parent.parent
+    if str(repo_root) not in sys.path:
+        sys.path.insert(0, str(repo_root))
+    sys.exit(main())

+ 5 - 0
examples/process_pipeline/ui/index.html

@@ -143,6 +143,7 @@
                 <!-- Merged Tabs in Topbar -->
                 <div class="data-tabs-pill" style="display: flex; gap: 6px; margin-right: 15px;">
                     <button class="tab-btn-pill active" data-target="tab-raw">Case 案例</button>
+                    <button class="tab-btn-pill" data-target="tab-capability">Capability 能力</button>
                     <button class="tab-btn-pill" data-target="tab-blueprint">Strategy 工序</button>
                 </div>
                 <!-- Search Input -->
@@ -185,6 +186,10 @@
                         <div class="content-viewer" id="json-raw">加载中...</div>
                     </div>
 
+                    <div class="tab-content" id="tab-capability" style="height: 100%; padding: 0;">
+                        <iframe src="http://localhost:9999/capabilities/embed" style="width: 100%; height: 100%; border: none; border-radius: 8px;"></iframe>
+                    </div>
+
                     <div class="tab-content" id="tab-blueprint">
                         <div class="content-viewer" id="json-blueprint">加载中...</div>
                     </div>

+ 3 - 3
knowhub/frontend/public/viz4.html

@@ -158,7 +158,7 @@
         align-items: center;
         gap: 6px;
       }
-      .item:hover:not(.disabled) {
+      .item:hover:not(.disabled):not(.active) {
         background: var(--panel2);
       }
       .item.active {
@@ -271,7 +271,7 @@
         border: 1px solid var(--border);
         background: var(--panel2);
       }
-      .iset:hover:not(.disabled) {
+      .iset:hover:not(.disabled):not(.active) {
         border-color: var(--accent);
       }
       .iset.active {
@@ -379,7 +379,7 @@
         font-size: 12px;
         line-height: 1.35;
       }
-      .tline:hover:not(.disabled) {
+      .tline:hover:not(.disabled):not(.active) {
         background: var(--panel2);
       }
       .tline.active {

+ 4 - 0
knowhub/frontend/src/App.tsx

@@ -39,6 +39,10 @@ function App() {
     navigate(TAB_TO_PATH[tab]);
   };
 
+  if (location.pathname === '/capabilities/embed') {
+    return <Capabilities />;
+  }
+
   return (
     <MainLayout activeTab={activeTab} onTabChange={handleTabChange}>
       {(tab) => {