Explorar el Código

feat(web-api): run dashboard read endpoints and DashboardService

- 新增 DashboardService:run 列表/详情聚合(DB 优先、runtime 文件回退,
  data_origin 标注来源)
- api.py 新增只读端点:GET /runs(分页+状态过滤+搜索)、/runs/{id}/dashboard、
  /queries、/timeline、/content-items、/runtime-files、/runtime-files/{filename}
- schemas.py 新增对应响应模型;tests/test_api.py 补端点断言

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee hace 4 días
padre
commit
6ee0079
Se han modificado 4 ficheros con 1189 adiciones y 1 borrados
  1. 128 1
      content_agent/api.py
  2. 925 0
      content_agent/dashboard_service.py
  3. 79 0
      content_agent/schemas.py
  4. 57 0
      tests/test_api.py

+ 128 - 1
content_agent/api.py

@@ -1,18 +1,29 @@
 from __future__ import annotations
 
-from fastapi import FastAPI, HTTPException
+import os
+
+from fastapi import FastAPI, HTTPException, Query
 from fastapi.exceptions import RequestValidationError
 from fastapi.encoders import jsonable_encoder
+from fastapi.middleware.cors import CORSMiddleware
 from fastapi.responses import JSONResponse
 
+from content_agent.dashboard_service import DashboardService
 from content_agent.errors import ErrorCode, error_response, sanitize_error_detail
 from content_agent.run_service import RunService
 from content_agent.schemas import (
+    ContentItemsResponse,
+    DashboardResponse,
     JsonFileResponse,
+    QueryListResponse,
     RecordsResponse,
+    RunListResponse,
     RunStartRequest,
     RunStartResponse,
     RunSummaryResponse,
+    RuntimeFileResponse,
+    RuntimeFilesResponse,
+    TimelineResponse,
     ValidationResponse,
 )
 
@@ -20,6 +31,23 @@ from content_agent.schemas import (
 app = FastAPI(title="Content Agent V1")
 service = RunService.from_env()
 
+_cors_origins = [
+    origin.strip()
+    for origin in os.environ.get(
+        "CONTENT_AGENT_WEB_CORS_ORIGINS",
+        "http://localhost:3000,http://127.0.0.1:3000,http://localhost:3010,http://127.0.0.1:3010",
+    ).split(",")
+    if origin.strip()
+]
+if _cors_origins:
+    app.add_middleware(
+        CORSMiddleware,
+        allow_origins=_cors_origins,
+        allow_credentials=False,
+        allow_methods=["GET", "POST", "OPTIONS"],
+        allow_headers=["*"],
+    )
+
 
 @app.exception_handler(RequestValidationError)
 async def validation_exception_handler(request, exc: RequestValidationError):
@@ -58,12 +86,95 @@ def start_run(request: RunStartRequest) -> RunStartResponse:
     )
 
 
+@app.get("/runs", response_model=RunListResponse)
+def list_runs(
+    status: str | None = None,
+    platform: str | None = None,
+    platform_mode: str | None = None,
+    strategy_version: str | None = None,
+    validation_status: str | None = None,
+    error_code: str | None = None,
+    page: int = Query(default=1, ge=1),
+    page_size: int = Query(default=20, ge=1, le=100),
+) -> RunListResponse:
+    return RunListResponse(
+        **_dashboard_service().list_runs(
+            status=status,
+            platform=platform,
+            platform_mode=platform_mode,
+            strategy_version=strategy_version,
+            validation_status=validation_status,
+            error_code=error_code,
+            page=page,
+            page_size=page_size,
+        )
+    )
+
+
 @app.get("/runs/{run_id}", response_model=RunSummaryResponse)
 def get_run(run_id: str) -> RunSummaryResponse:
     _ensure_run_exists(run_id)
     return RunSummaryResponse(**service.get_summary(run_id))
 
 
+@app.get("/runs/{run_id}/dashboard", response_model=DashboardResponse)
+def get_run_dashboard(run_id: str) -> DashboardResponse:
+    _ensure_web_run_exists(run_id)
+    return DashboardResponse(**_dashboard_service().dashboard(run_id))
+
+
+@app.get("/runs/{run_id}/queries", response_model=QueryListResponse)
+def get_run_queries(run_id: str) -> QueryListResponse:
+    _ensure_web_run_exists(run_id)
+    return QueryListResponse(**_dashboard_service().queries(run_id))
+
+
+@app.get("/runs/{run_id}/timeline", response_model=TimelineResponse)
+def get_run_timeline(run_id: str) -> TimelineResponse:
+    _ensure_web_run_exists(run_id)
+    return TimelineResponse(**_dashboard_service().timeline(run_id))
+
+
+@app.get("/runs/{run_id}/content-items", response_model=ContentItemsResponse)
+def get_run_content_items(run_id: str) -> ContentItemsResponse:
+    _ensure_web_run_exists(run_id)
+    return ContentItemsResponse(**_dashboard_service().content_items(run_id))
+
+
+@app.get("/runs/{run_id}/runtime-files", response_model=RuntimeFilesResponse)
+def get_run_runtime_files(run_id: str) -> RuntimeFilesResponse:
+    _ensure_web_run_exists(run_id)
+    return RuntimeFilesResponse(**_dashboard_service().runtime_files(run_id))
+
+
+@app.get("/runs/{run_id}/runtime-files/{filename}", response_model=RuntimeFileResponse)
+def get_run_runtime_file(
+    run_id: str,
+    filename: str,
+    limit: int = Query(default=100, ge=1, le=500),
+    offset: int = Query(default=0, ge=0),
+) -> RuntimeFileResponse:
+    _ensure_web_run_exists(run_id)
+    try:
+        return RuntimeFileResponse(
+            **_dashboard_service().runtime_file(
+                run_id,
+                filename,
+                limit=limit,
+                offset=offset,
+            )
+        )
+    except ValueError:
+        raise HTTPException(
+            status_code=400,
+            detail=error_response(
+                ErrorCode.INVALID_REQUEST,
+                "runtime filename is not allowed",
+                {"filename": filename},
+            ),
+        )
+
+
 @app.get("/runs/{run_id}/discovered-content-items", response_model=RecordsResponse)
 def get_discovered_content_items(run_id: str) -> RecordsResponse:
     return _jsonl_response(run_id, "discovered_content_items.jsonl")
@@ -122,3 +233,19 @@ def _ensure_run_exists(run_id: str) -> None:
                 {"run_id": run_id},
             ),
         )
+
+
+def _ensure_web_run_exists(run_id: str) -> None:
+    if not _dashboard_service().run_exists(run_id):
+        raise HTTPException(
+            status_code=404,
+            detail=error_response(
+                ErrorCode.RUN_NOT_FOUND,
+                "run not found",
+                {"run_id": run_id},
+            ),
+        )
+
+
+def _dashboard_service() -> DashboardService:
+    return DashboardService.from_runtime(service.runtime)

+ 925 - 0
content_agent/dashboard_service.py

@@ -0,0 +1,925 @@
+from __future__ import annotations
+
+from datetime import date, datetime
+from pathlib import Path
+from typing import Any
+
+from content_agent.business_modules.run_record import validate_run
+from content_agent.integrations.composite_runtime import CompositeRuntimeStore
+from content_agent.integrations.database_runtime import DatabaseRuntimeStore
+from content_agent.integrations.database_runtime import RUNTIME_FILE_TABLES
+from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
+from content_agent.interfaces import RuntimeStore
+
+
+DATA_ORIGIN_PRODUCTION_DB = "production_db"
+DATA_ORIGIN_RUNTIME_EXPORT = "runtime_export"
+DATA_ORIGIN_MIXED = "mixed_with_runtime_export"
+
+
+class DashboardService:
+    def __init__(
+        self,
+        runtime: RuntimeStore,
+        export_runtime: RuntimeStore | None = None,
+    ) -> None:
+        self.runtime = runtime
+        self.export_runtime = export_runtime
+
+    @classmethod
+    def from_runtime(cls, runtime: RuntimeStore) -> "DashboardService":
+        if isinstance(runtime, CompositeRuntimeStore):
+            return cls(runtime.primary, runtime.export)
+        return cls(runtime)
+
+    @property
+    def data_origin(self) -> str:
+        if isinstance(self.runtime, DatabaseRuntimeStore):
+            return DATA_ORIGIN_PRODUCTION_DB
+        return DATA_ORIGIN_RUNTIME_EXPORT
+
+    def run_exists(self, run_id: str) -> bool:
+        if isinstance(self.runtime, DatabaseRuntimeStore):
+            row = self.runtime._fetch_one(
+                "SELECT COUNT(*) AS cnt FROM `content_agent_runs` WHERE `run_id` = %s",
+                (run_id,),
+            )
+            return bool(row and row.get("cnt", 0))
+        try:
+            return self.runtime.run_dir(run_id).exists()
+        except Exception:
+            return any(self.runtime.file_status(run_id).values())
+
+    def list_runs(
+        self,
+        *,
+        status: str | None = None,
+        platform: str | None = None,
+        platform_mode: str | None = None,
+        strategy_version: str | None = None,
+        validation_status: str | None = None,
+        error_code: str | None = None,
+        page: int = 1,
+        page_size: int = 20,
+    ) -> dict[str, Any]:
+        page = max(page, 1)
+        page_size = min(max(page_size, 1), 100)
+        if isinstance(self.runtime, DatabaseRuntimeStore):
+            return self._list_db_runs(
+                status=status,
+                platform=platform,
+                platform_mode=platform_mode,
+                strategy_version=strategy_version,
+                validation_status=validation_status,
+                error_code=error_code,
+                page=page,
+                page_size=page_size,
+            )
+        items = [
+            self._local_run_list_item(run_id)
+            for run_id in getattr(self.runtime, "list_runs", lambda: [])()
+        ]
+        items = [
+            item
+            for item in items
+            if _matches(item, "status", status)
+            and _matches(item, "platform", platform)
+            and _matches(item, "platform_mode", platform_mode)
+            and _matches(item, "strategy_version", strategy_version)
+            and _matches(item, "validation_status", validation_status)
+            and _matches(item, "error_code", error_code)
+        ]
+        total = len(items)
+        offset = (page - 1) * page_size
+        return {
+            "items": items[offset:offset + page_size],
+            "page": page,
+            "page_size": page_size,
+            "total": total,
+            "data_origin": self.data_origin,
+        }
+
+    def dashboard(self, run_id: str) -> dict[str, Any]:
+        runtime_files_response = self.runtime_files(run_id)
+        runtime_files = runtime_files_response["files"]
+        file_status = {item["filename"]: item["exists"] for item in runtime_files}
+        record_counts = {item["filename"]: item["record_count"] for item in runtime_files}
+        final_output = self._read_json_optional(run_id, "final_output.json")
+        strategy_review = self._read_json_optional(run_id, "strategy_review.json")
+        source_context = self._read_json_optional(run_id, "source_context.json") or {}
+        validation = self._validate_optional(run_id)
+        run_item = self._db_run_item(run_id) if isinstance(self.runtime, DatabaseRuntimeStore) else self._local_run_list_item(run_id)
+        queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
+        run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
+        content_items = self._read_jsonl_optional(run_id, "discovered_content_items.jsonl")
+        decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
+        walk_actions = self._read_jsonl_optional(run_id, "walk_actions.jsonl")
+        source_paths = self._read_jsonl_optional(run_id, "source_path_records.jsonl")
+        counts = {
+            "queries": record_counts.get("search_queries.jsonl", 0),
+            "discovered_content_items": record_counts.get("discovered_content_items.jsonl", 0),
+            "rule_decisions": record_counts.get("rule_decisions.jsonl", 0),
+            "walk_actions": record_counts.get("walk_actions.jsonl", 0),
+            "source_path_records": record_counts.get("source_path_records.jsonl", 0),
+            "run_events": record_counts.get("run_events.jsonl", 0),
+        }
+        primary_failure_reason = _primary_failure_reason(run_item, run_events)
+        return _json_safe({
+            "run_id": run_id,
+            "summary": run_item,
+            "counts": counts,
+            "files": file_status,
+            "runtime_files": runtime_files,
+            "validation": validation,
+            "final_output_summary": (final_output or {}).get("summary", {}),
+            "strategy_review_status": (strategy_review or {}).get(
+                "review_status",
+                "not_generated",
+            ),
+            "business_summary": _business_summary(
+                run_item,
+                counts,
+                final_output or {},
+                primary_failure_reason,
+            ),
+            "stage_conclusions": _stage_conclusions(
+                file_status,
+                counts,
+                run_item,
+                queries,
+                run_events,
+                decisions,
+                walk_actions,
+                final_output or {},
+                strategy_review or {},
+                source_context,
+            ),
+            "rule_application_summary": _rule_application_summary(decisions, content_items),
+            "walk_graph": _walk_graph(queries, content_items, walk_actions, source_paths),
+            "primary_failure_reason": primary_failure_reason,
+            "technical_refs": {
+                "runtime_files_url": f"/runs/{run_id}/runtime-files",
+                "validation_url": f"/runs/{run_id}/validation",
+                "data_origin": runtime_files_response["data_origin"],
+                "runtime_file_count": len(runtime_files),
+            },
+            "data_origin": runtime_files_response["data_origin"],
+            "links": {
+                "queries": f"/runs/{run_id}/queries",
+                "timeline": f"/runs/{run_id}/timeline",
+                "content_items": f"/runs/{run_id}/content-items",
+                "runtime_files": f"/runs/{run_id}/runtime-files",
+            },
+        })
+
+    def runtime_files(self, run_id: str) -> dict[str, Any]:
+        if isinstance(self.runtime, DatabaseRuntimeStore):
+            counts = self._db_runtime_file_counts(run_id)
+            files = [
+                {
+                    "filename": filename,
+                    "exists": counts.get(filename, 0) > 0,
+                    "record_count": counts.get(filename, 0),
+                    "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
+                    "contract_status": "current",
+                    "data_origin": DATA_ORIGIN_PRODUCTION_DB,
+                }
+                for filename in RUNTIME_FILENAMES
+            ]
+            return {
+                "run_id": run_id,
+                "files": files,
+                "data_origin": DATA_ORIGIN_PRODUCTION_DB,
+            }
+        status = self.runtime.file_status(run_id)
+        files = []
+        for filename in RUNTIME_FILENAMES:
+            exists = bool(status.get(filename))
+            files.append({
+                "filename": filename,
+                "exists": exists,
+                "record_count": self._runtime_record_count(run_id, filename) if exists else 0,
+                "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
+                "contract_status": "current",
+                "data_origin": self._file_origin(run_id, filename),
+            })
+        return {
+            "run_id": run_id,
+            "files": files,
+            "data_origin": self._combined_origin(run_id),
+        }
+
+    def runtime_file(
+        self,
+        run_id: str,
+        filename: str,
+        *,
+        limit: int = 100,
+        offset: int = 0,
+    ) -> dict[str, Any]:
+        self._validate_runtime_filename(filename)
+        limit = min(max(limit, 1), 500)
+        offset = max(offset, 0)
+        if filename.endswith(".jsonl"):
+            records = self._read_jsonl_optional(run_id, filename)
+            return {
+                "run_id": run_id,
+                "filename": filename,
+                "records": _json_safe(records[offset:offset + limit]),
+                "offset": offset,
+                "limit": limit,
+                "total": len(records),
+                "data_origin": self._file_origin(run_id, filename),
+            }
+        data = self._read_json_optional(run_id, filename) or {}
+        return {
+            "run_id": run_id,
+            "filename": filename,
+            "data": _json_safe(data),
+            "data_origin": self._file_origin(run_id, filename),
+        }
+
+    def queries(self, run_id: str) -> dict[str, Any]:
+        queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
+        clues_by_query = {
+            row.get("search_query_id"): row
+            for row in self._read_jsonl_optional(run_id, "search_clues.jsonl")
+        }
+        decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
+        decision_counts: dict[str, dict[str, int]] = {}
+        for decision in decisions:
+            query_id = decision.get("search_query_id")
+            if not query_id:
+                continue
+            counts = decision_counts.setdefault(query_id, {})
+            action = decision.get("decision_action") or "unknown"
+            counts[action] = counts.get(action, 0) + 1
+        items = []
+        for query in queries:
+            query_id = query.get("search_query_id")
+            clue = clues_by_query.get(query_id) or {}
+            items.append(_json_safe({
+                **query,
+                "search_clue": clue,
+                "decision_action_counts": decision_counts.get(query_id, {}),
+                "search_query_effect_status": clue.get(
+                    "search_query_effect_status",
+                    query.get("search_query_effect_status"),
+                ),
+                "walk_next_step": clue.get("walk_next_step"),
+                "failure_reason": clue.get("failure_reason")
+                or (clue.get("raw_payload") or {}).get("failure_reason"),
+            }))
+        return {
+            "run_id": run_id,
+            "items": items,
+            "total": len(items),
+            "data_origin": self._combined_origin(run_id),
+        }
+
+    def timeline(self, run_id: str) -> dict[str, Any]:
+        events = [
+            {
+                "source": "run_events.jsonl",
+                "stage": row.get("stage") or row.get("event_type"),
+                "event_type": row.get("event_type"),
+                "status": row.get("status"),
+                "timestamp": row.get("created_at") or row.get("timestamp"),
+                "error_code": row.get("error_code"),
+                "walk_action_id": (row.get("raw_payload") or {}).get("walk_action_id"),
+                "record": row,
+            }
+            for row in self._read_jsonl_optional(run_id, "run_events.jsonl")
+        ]
+        events.extend(
+            {
+                "source": "walk_actions.jsonl",
+                "stage": "walk",
+                "event_type": row.get("edge_type"),
+                "status": row.get("walk_status"),
+                "timestamp": row.get("created_at"),
+                "error_code": None,
+                "walk_action_id": row.get("walk_action_id"),
+                "record": row,
+            }
+            for row in self._read_jsonl_optional(run_id, "walk_actions.jsonl")
+        )
+        events.extend(
+            {
+                "source": "source_path_records.jsonl",
+                "stage": "source_path",
+                "event_type": row.get("source_path_type"),
+                "status": row.get("status"),
+                "timestamp": row.get("created_at"),
+                "error_code": None,
+                "walk_action_id": row.get("walk_action_id")
+                or (row.get("raw_payload") or {}).get("walk_action_id"),
+                "record": row,
+            }
+            for row in self._read_jsonl_optional(run_id, "source_path_records.jsonl")
+        )
+        events.sort(key=lambda item: str(item.get("timestamp") or ""))
+        return {
+            "run_id": run_id,
+            "items": _json_safe(events),
+            "total": len(events),
+            "data_origin": self._combined_origin(run_id),
+        }
+
+    def content_items(self, run_id: str) -> dict[str, Any]:
+        media_by_platform_id = {
+            row.get("platform_content_id"): row
+            for row in self._read_jsonl_optional(run_id, "content_media_records.jsonl")
+        }
+        recall_by_platform_id = {
+            row.get("platform_content_id"): row
+            for row in self._read_jsonl_optional(run_id, "pattern_recall_evidence.jsonl")
+        }
+        decisions_by_target = {
+            row.get("decision_target_id"): row
+            for row in self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
+        }
+        items = []
+        for content in self._read_jsonl_optional(run_id, "discovered_content_items.jsonl"):
+            platform_content_id = content.get("platform_content_id")
+            discovery_id = content.get("content_discovery_id")
+            items.append(_json_safe({
+                **content,
+                "media_record": media_by_platform_id.get(platform_content_id),
+                "pattern_recall_evidence": recall_by_platform_id.get(platform_content_id),
+                "rule_decision": decisions_by_target.get(discovery_id)
+                or decisions_by_target.get(platform_content_id),
+            }))
+        return {
+            "run_id": run_id,
+            "items": items,
+            "total": len(items),
+            "data_origin": self._combined_origin(run_id),
+        }
+
+    def _list_db_runs(self, **filters: Any) -> dict[str, Any]:
+        page = filters.pop("page")
+        page_size = filters.pop("page_size")
+        where_sql, params = _db_run_filters(filters)
+        count_row = self.runtime._fetch_one(
+            f"SELECT COUNT(*) AS cnt FROM `content_agent_runs` r {where_sql}",
+            tuple(params),
+        )
+        rows = self.runtime._fetch_all(
+            "SELECT r.run_id, "
+            "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
+            "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
+            "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
+            "r.validation_status, r.error_code, r.started_at, r.completed_at "
+            f"FROM `content_agent_runs` r {where_sql} "
+            "ORDER BY r.started_at DESC, r.id DESC LIMIT %s OFFSET %s",
+            tuple([*params, page_size, (page - 1) * page_size]),
+        )
+        return {
+            "items": [_json_safe(_db_run_row_to_item(row)) for row in rows],
+            "page": page,
+            "page_size": page_size,
+            "total": int((count_row or {}).get("cnt") or 0),
+            "data_origin": self.data_origin,
+        }
+
+    def _db_run_item(self, run_id: str) -> dict[str, Any]:
+        row = self.runtime._fetch_one(
+            "SELECT r.run_id, "
+            "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
+            "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
+            "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
+            "r.validation_status, r.error_code, r.started_at, r.completed_at "
+            "FROM `content_agent_runs` r WHERE r.run_id = %s LIMIT 1",
+            (run_id,),
+        )
+        return _json_safe(_db_run_row_to_item(row or {"run_id": run_id, "status": "unknown"}))
+
+    def _local_run_list_item(self, run_id: str) -> dict[str, Any]:
+        final_output = self._read_json_optional(run_id, "final_output.json") or {}
+        run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
+        lifecycle = [
+            row for row in run_events if str(row.get("event_id", "")).startswith("lifecycle_")
+        ]
+        latest = lifecycle[-1] if lifecycle else {}
+        return _json_safe({
+            "run_id": run_id,
+            "policy_run_id": final_output.get("policy_run_id"),
+            "status": latest.get("status") or ("success" if final_output else "failed"),
+            "current_step": "review_strategy" if final_output else "unknown",
+            "platform": final_output.get("platform"),
+            "platform_mode": final_output.get("platform_mode"),
+            "strategy_version": final_output.get("strategy_version"),
+            "validation_status": final_output.get("validation_status"),
+            "error_code": latest.get("error_code"),
+            "started_at": latest.get("created_at"),
+            "completed_at": None,
+        })
+
+    def _read_json_optional(self, run_id: str, filename: str) -> dict[str, Any] | None:
+        try:
+            return self.runtime.read_json(run_id, filename)
+        except Exception:
+            if self.export_runtime:
+                try:
+                    return self.export_runtime.read_json(run_id, filename)
+                except Exception:
+                    return None
+            return None
+
+    def _read_jsonl_optional(self, run_id: str, filename: str) -> list[dict[str, Any]]:
+        try:
+            return self.runtime.read_jsonl(run_id, filename)
+        except Exception:
+            if self.export_runtime:
+                try:
+                    return self.export_runtime.read_jsonl(run_id, filename)
+                except Exception:
+                    return []
+            return []
+
+    def _validate_optional(self, run_id: str) -> dict[str, Any]:
+        if isinstance(self.runtime, DatabaseRuntimeStore):
+            run_item = self._db_run_item(run_id)
+            status = run_item.get("validation_status")
+            return {
+                "run_id": run_id,
+                "status": status or "unknown",
+                "findings": [] if status == "pass" else [],
+            }
+        try:
+            return validate_run(run_id, self.runtime)
+        except Exception as exc:
+            return {
+                "run_id": run_id,
+                "status": "fail",
+                "findings": [{"severity": "error", "message": str(exc)}],
+            }
+
+    def _runtime_record_count(self, run_id: str, filename: str) -> int:
+        if isinstance(self.runtime, DatabaseRuntimeStore):
+            return self._db_runtime_file_counts(run_id).get(filename, 0)
+        if filename.endswith(".jsonl"):
+            return len(self._read_jsonl_optional(run_id, filename))
+        return 1 if self._read_json_optional(run_id, filename) is not None else 0
+
+    def _combined_origin(self, run_id: str) -> str:
+        if not isinstance(self.runtime, DatabaseRuntimeStore):
+            return DATA_ORIGIN_RUNTIME_EXPORT
+        return DATA_ORIGIN_PRODUCTION_DB
+
+    def _file_origin(self, run_id: str, filename: str) -> str:
+        if not isinstance(self.runtime, DatabaseRuntimeStore):
+            return DATA_ORIGIN_RUNTIME_EXPORT
+        primary_exists = self.runtime.file_status(run_id).get(filename, False)
+        export_exists = bool(self.export_runtime and self.export_runtime.file_status(run_id).get(filename))
+        if primary_exists:
+            return DATA_ORIGIN_PRODUCTION_DB
+        if export_exists:
+            return DATA_ORIGIN_RUNTIME_EXPORT
+        return self.data_origin
+
+    def _validate_runtime_filename(self, filename: str) -> None:
+        if (
+            filename not in RUNTIME_FILENAMES
+            or ".." in filename
+            or "/" in filename
+            or "\\" in filename
+            or Path(filename).is_absolute()
+        ):
+            raise ValueError("runtime filename is not allowed")
+
+    def _db_runtime_file_counts(self, run_id: str) -> dict[str, int]:
+        counts: dict[str, int] = {}
+        with self.runtime._connection_factory() as conn:
+            with conn.cursor() as cur:
+                for filename in RUNTIME_FILENAMES:
+                    table = RUNTIME_FILE_TABLES.get(filename)
+                    if not table:
+                        counts[filename] = 0
+                        continue
+                    cur.execute(
+                        f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
+                        (run_id,),
+                    )
+                    row = cur.fetchone() or {}
+                    counts[filename] = int(row.get("cnt") or 0)
+        return counts
+
+
+def _db_run_filters(filters: dict[str, Any]) -> tuple[str, list[Any]]:
+    allowed = {
+        "status": "status",
+        "platform": "platform",
+        "platform_mode": "platform_mode",
+        "strategy_version": "strategy_version",
+        "validation_status": "validation_status",
+        "error_code": "error_code",
+    }
+    clauses = []
+    params = []
+    for key, column in allowed.items():
+        value = filters.get(key)
+        if value:
+            clauses.append(f"r.`{column}` = %s")
+            params.append(value)
+    if not clauses:
+        return "", params
+    return "WHERE " + " AND ".join(clauses), params
+
+
+def _db_run_row_to_item(row: dict[str, Any]) -> dict[str, Any]:
+    return {
+        "run_id": row.get("run_id"),
+        "policy_run_id": row.get("policy_run_id"),
+        "status": row.get("status"),
+        "current_step": row.get("current_step"),
+        "platform": row.get("platform"),
+        "platform_mode": row.get("platform_mode"),
+        "strategy_version": row.get("strategy_version"),
+        "validation_status": row.get("validation_status"),
+        "error_code": row.get("error_code"),
+        "started_at": row.get("started_at"),
+        "completed_at": row.get("completed_at"),
+    }
+
+
+def _matches(item: dict[str, Any], key: str, expected: str | None) -> bool:
+    return not expected or item.get(key) == expected
+
+
+def _json_safe(value: Any) -> Any:
+    if isinstance(value, dict):
+        return {key: _json_safe(item) for key, item in value.items()}
+    if isinstance(value, list):
+        return [_json_safe(item) for item in value]
+    if isinstance(value, (datetime, date)):
+        return value.isoformat()
+    return value
+
+
+def _business_summary(
+    run_item: dict[str, Any],
+    counts: dict[str, int],
+    final_output: dict[str, Any],
+    primary_failure_reason: dict[str, Any] | None,
+) -> dict[str, Any]:
+    summary = final_output.get("summary") or {}
+    status = run_item.get("status") or "unknown"
+    if primary_failure_reason:
+        headline = f"本次运行停在{primary_failure_reason.get('stage_label')}:{primary_failure_reason.get('reason_label')}"
+    elif status == "success":
+        headline = "本次运行已完成,可查看内容判断和资产沉淀结果"
+    elif status == "partial_success":
+        headline = "本次运行部分成功,有 query 或平台请求失败"
+    elif status == "running":
+        headline = "本次运行仍在进行中"
+    else:
+        headline = "本次运行未形成完整成功链路"
+    return {
+        "headline": headline,
+        "status": status,
+        "source_label": _source_label(run_item),
+        "query_count": counts.get("queries", 0),
+        "content_count": counts.get("discovered_content_items", 0),
+        "kept_count": int(summary.get("pooled_content_count") or 0),
+        "review_count": int(summary.get("review_content_count") or 0),
+        "rejected_count": int(summary.get("rejected_content_count") or 0),
+        "asset_count": int(summary.get("author_asset_count") or 0),
+        "primary_failure_reason": primary_failure_reason,
+    }
+
+
+def _stage_conclusions(
+    files: dict[str, bool],
+    counts: dict[str, int],
+    run_item: dict[str, Any],
+    queries: list[dict[str, Any]],
+    run_events: list[dict[str, Any]],
+    decisions: list[dict[str, Any]],
+    walk_actions: list[dict[str, Any]],
+    final_output: dict[str, Any],
+    strategy_review: dict[str, Any],
+    source_context: dict[str, Any],
+) -> list[dict[str, Any]]:
+    query_failures = [
+        event for event in run_events
+        if event.get("event_type") == "platform_query_failed"
+        or event.get("status") == "failed" and event.get("search_query_id")
+    ]
+    decision_counts = _effect_status_counts(decisions)
+    walk_counts = _walk_status_counts(walk_actions)
+    final_summary = final_output.get("summary") or {}
+    return [
+        {
+            "stage_id": "source",
+            "label": "数据源",
+            "status": "success" if files.get("source_context.json") else "failed",
+            "headline": "真实需求已读取" if files.get("source_context.json") else "需求来源缺失",
+            "detail": _source_stage_detail(source_context)
+            if files.get("source_context.json")
+            else "未找到 source_context,无法解释输入来源",
+            "metric": "已就绪" if files.get("pattern_seed_pack.json") else "种子缺失",
+        },
+        {
+            "stage_id": "query",
+            "label": "Query",
+            "status": "success" if counts.get("queries") else "failed",
+            "headline": f"生成 {counts.get('queries', 0)} 条搜索意图",
+            "detail": f"{len(query_failures)} 条 query 有失败记录,其余进入平台搜索或后续链路",
+            "metric": _query_generation_label(queries, run_events),
+        },
+        {
+            "stage_id": "platform",
+            "label": "平台 / 内容",
+            "status": "success" if counts.get("discovered_content_items") else "failed",
+            "headline": f"发现 {counts.get('discovered_content_items', 0)} 条内容",
+            "detail": "平台结果已进入内容发现"
+            if counts.get("discovered_content_items")
+            else "未形成发现内容,通常需要先查看平台请求或 query 失败原因",
+            "metric": f"{len(query_failures)} 条平台失败",
+        },
+        {
+            "stage_id": "judge",
+            "label": "判断",
+            "status": "success" if counts.get("rule_decisions") else "pending",
+            "headline": f"{counts.get('rule_decisions', 0)} 条内容完成判断",
+            "detail": _decision_status_label(decision_counts),
+            "metric": f"入池 {decision_counts.get('success', 0)} / 阻断 {decision_counts.get('rule_blocked', 0)}",
+        },
+        {
+            "stage_id": "walk",
+            "label": "游走",
+            "status": "success" if counts.get("walk_actions") else "pending",
+            "headline": f"触发 {counts.get('walk_actions', 0)} 个游走动作",
+            "detail": _walk_status_label(walk_counts),
+            "metric": f"成功 {walk_counts.get('success', 0)} / 失败 {walk_counts.get('failed', 0)}",
+        },
+        {
+            "stage_id": "asset",
+            "label": "资产",
+            "status": "success" if files.get("final_output.json") else "pending",
+            "headline": f"沉淀 {final_summary.get('pooled_content_count', 0) or 0} 条内容资产",
+            "detail": f"待复看 {final_summary.get('review_content_count', 0) or 0},淘汰 {final_summary.get('rejected_content_count', 0) or 0}",
+            "metric": f"作者资产 {final_summary.get('author_asset_count', 0) or 0}",
+        },
+        {
+            "stage_id": "learning",
+            "label": "学习",
+            "status": "success" if strategy_review.get("review_status") == "generated" else "pending",
+            "headline": "策略复盘已生成"
+            if strategy_review.get("review_status") == "generated"
+            else "策略复盘未生成",
+            "detail": "可查看 query / rule / walk 的优化建议"
+            if strategy_review.get("review_status") == "generated"
+            else "当前 run 还没有可展示的策略学习建议",
+            "metric": strategy_review.get("review_status", "not_generated"),
+        },
+    ]
+
+
+def _rule_application_summary(
+    decisions: list[dict[str, Any]],
+    content_items: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+    content_by_id = {
+        item.get("content_discovery_id") or item.get("platform_content_id"): item
+        for item in content_items
+    }
+    summaries = []
+    for decision in decisions:
+        replay = decision.get("decision_replay_data") or {}
+        target_id = decision.get("decision_target_id")
+        content = content_by_id.get(target_id) or {}
+        triggered_rules = decision.get("triggered_blocking_rules") or []
+        scorecard = decision.get("scorecard") or {}
+        summaries.append({
+            "decision_id": decision.get("decision_id"),
+            "content_title": content.get("title") or decision.get("decision_target_id"),
+            "platform_content_id": content.get("platform_content_id") or decision.get("decision_target_id"),
+            "rule_pack": decision.get("rule_pack_id")
+            or replay.get("rule_pack_id")
+            or "Content Rule Pack V1",
+            "hard_gate_status": "没通过" if triggered_rules else "通过",
+            "score": decision.get("score") or scorecard.get("total_score"),
+            "decision_action": decision.get("decision_action"),
+            "decision_reason_code": decision.get("decision_reason_code"),
+            "content_effect_status": decision.get("content_effect_status"),
+            "primary_reason": _reason_label(decision.get("decision_reason_code")),
+            "technical_ref": {
+                "decision_id": decision.get("decision_id"),
+                "target_id": decision.get("decision_target_id"),
+                "has_replay_data": bool(replay),
+            },
+        })
+    return summaries
+
+
+def _walk_graph(
+    queries: list[dict[str, Any]],
+    content_items: list[dict[str, Any]],
+    walk_actions: list[dict[str, Any]],
+    source_paths: list[dict[str, Any]],
+) -> dict[str, Any]:
+    nodes: dict[str, dict[str, Any]] = {
+        "source": {
+            "id": "source",
+            "type": "source",
+            "label": "需求",
+            "status": "success",
+        }
+    }
+    edges: list[dict[str, Any]] = []
+    for query in queries[:12]:
+        node_id = f"query:{query.get('search_query_id')}"
+        nodes[node_id] = {
+            "id": node_id,
+            "type": "query",
+            "label": query.get("search_query") or query.get("search_query_id"),
+            "status": query.get("search_query_effect_status") or "success",
+        }
+        edges.append({
+            "id": f"source->{node_id}",
+            "source": "source",
+            "target": node_id,
+            "label": query.get("search_query_generation_method") or "query",
+            "status": "success",
+            "rule_pack": None,
+        })
+    for content in content_items[:20]:
+        node_id = f"content:{content.get('platform_content_id') or content.get('content_discovery_id')}"
+        query_id = content.get("search_query_id")
+        source_id = f"query:{query_id}" if query_id else "source"
+        nodes[node_id] = {
+            "id": node_id,
+            "type": "content",
+            "label": content.get("title") or content.get("platform_content_id") or "内容",
+            "status": (content.get("pattern_match_result") or {}).get("recall_status") or "pending",
+        }
+        edges.append({
+            "id": f"{source_id}->{node_id}",
+            "source": source_id if source_id in nodes else "source",
+            "target": node_id,
+            "label": "搜索命中",
+            "status": "success",
+            "rule_pack": "Content Rule Pack V1",
+        })
+    for action in walk_actions:
+        source_id = _walk_node_id(action.get("from_node_type"), action.get("from_node_id"))
+        target_id = _walk_node_id(action.get("to_node_type"), action.get("to_node_id"))
+        nodes.setdefault(source_id, {
+            "id": source_id,
+            "type": action.get("from_node_type") or "node",
+            "label": action.get("from_node_id") or "起点",
+            "status": "success",
+        })
+        nodes.setdefault(target_id, {
+            "id": target_id,
+            "type": action.get("to_node_type") or "node",
+            "label": action.get("to_node_id") or action.get("edge_type") or "下一跳",
+            "status": action.get("walk_status") or "pending",
+        })
+        edges.append({
+            "id": action.get("walk_action_id") or f"{source_id}->{target_id}",
+            "source": source_id,
+            "target": target_id,
+            "label": action.get("edge_id") or action.get("edge_type") or action.get("walk_action"),
+            "status": action.get("walk_status") or "pending",
+            "rule_pack": action.get("rule_pack_id"),
+            "budget_tier": action.get("budget_tier"),
+            "reason_code": action.get("reason_code"),
+        })
+    return {
+        "nodes": list(nodes.values()),
+        "edges": edges,
+        "source_path_count": len(source_paths),
+    }
+
+
+def _primary_failure_reason(
+    run_item: dict[str, Any],
+    run_events: list[dict[str, Any]],
+) -> dict[str, Any] | None:
+    error_code = run_item.get("error_code")
+    if not error_code and run_item.get("status") not in {"failed", "partial_success"}:
+        return None
+    failed_events = [
+        event for event in run_events
+        if event.get("status") == "failed" or event.get("error_code")
+    ]
+    event = failed_events[-1] if failed_events else {}
+    code = error_code or event.get("error_code") or "UNKNOWN"
+    return {
+        "reason_code": code,
+        "reason_label": _reason_label(code),
+        "stage": event.get("stage") or _stage_from_error_code(code),
+        "stage_label": _stage_label(event.get("stage") or _stage_from_error_code(code)),
+        "message": event.get("message") or run_item.get("error_message") or code,
+    }
+
+
+def _source_label(run_item: dict[str, Any]) -> str:
+    demand_id = run_item.get("demand_content_id")
+    if demand_id:
+        return f"真实需求 #{demand_id}"
+    return "真实需求池 / 默认样例"
+
+
+def _source_stage_detail(source_context: dict[str, Any]) -> str:
+    demand_id = source_context.get("demand_content_id") or source_context.get("id")
+    if demand_id:
+        return f"需求池 ID:{demand_id}"
+    raw_demand = source_context.get("raw_demand_content")
+    if isinstance(raw_demand, dict) and raw_demand.get("id"):
+        return f"需求池 ID:{raw_demand.get('id')}"
+    return "需求池 ID:缺失"
+
+
+def _query_generation_label(queries: list[dict[str, Any]], run_events: list[dict[str, Any]]) -> str:
+    generated_count = len(queries)
+    generation_failed = any(
+        event.get("error_code") == "QUERY_GENERATION_FAILED"
+        or (event.get("event_type") == "query_generation_failed")
+        for event in run_events
+    )
+    if generated_count and generation_failed:
+        return f"{generated_count} 条生成成功 / 生成失败"
+    if generated_count:
+        return f"{generated_count} 条生成成功"
+    if generation_failed:
+        return "生成失败"
+    return "0 条生成成功"
+
+
+def _effect_status_counts(decisions: list[dict[str, Any]]) -> dict[str, int]:
+    counts: dict[str, int] = {}
+    for decision in decisions:
+        status = str(decision.get("content_effect_status") or "unknown")
+        counts[status] = counts.get(status, 0) + 1
+    return counts
+
+
+def _walk_status_counts(walk_actions: list[dict[str, Any]]) -> dict[str, int]:
+    counts: dict[str, int] = {}
+    for action in walk_actions:
+        status = str(action.get("walk_status") or "unknown")
+        counts[status] = counts.get(status, 0) + 1
+    return counts
+
+
+def _decision_status_label(counts: dict[str, int]) -> str:
+    if not counts:
+        return "当前没有内容进入规则判断"
+    return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
+
+
+def _walk_status_label(counts: dict[str, int]) -> str:
+    if not counts:
+        return "当前没有触发游走动作"
+    return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
+
+
+def _reason_label(code: Any) -> str:
+    labels = {
+        "success": "成功",
+        "pending": "待复看",
+        "failed": "失败",
+        "rule_blocked": "规则阻断",
+        "PLATFORM_REQUEST_FAILED": "平台请求失败",
+        "QUERY_GENERATION_FAILED": "Query 生成失败",
+        "INVALID_SOURCE": "需求来源无效",
+        "missing_score": "分数缺失",
+        "pattern_recall_failed": "Pattern 回扣失败",
+        "missing_source_evidence": "来源证据缺失",
+        "missing_content_portrait": "画像缺失",
+        "high_risk_content": "高风险内容",
+    }
+    value = str(code or "unknown")
+    return labels.get(value, value)
+
+
+def _stage_from_error_code(code: Any) -> str:
+    value = str(code or "")
+    if "QUERY" in value:
+        return "query"
+    if "PLATFORM" in value:
+        return "platform"
+    if "SOURCE" in value:
+        return "source"
+    return "run"
+
+
+def _stage_label(stage: Any) -> str:
+    labels = {
+        "source": "数据源阶段",
+        "query": "Query 阶段",
+        "platform": "平台搜索阶段",
+        "judge": "规则判断阶段",
+        "walk": "游走阶段",
+        "run": "运行阶段",
+    }
+    return labels.get(str(stage or "run"), str(stage or "运行阶段"))
+
+
+def _walk_node_id(node_type: Any, node_id: Any) -> str:
+    return f"{node_type or 'node'}:{node_id or 'unknown'}"

+ 79 - 0
content_agent/schemas.py

@@ -77,3 +77,82 @@ class ValidationResponse(BaseModel):
     run_id: str
     status: str
     findings: list[dict[str, Any]]
+
+
+class RunListItem(BaseModel):
+    run_id: str
+    policy_run_id: str | None = None
+    status: str | None = None
+    current_step: str | None = None
+    platform: str | None = None
+    platform_mode: str | None = None
+    strategy_version: str | None = None
+    validation_status: str | None = None
+    error_code: str | None = None
+    started_at: str | None = None
+    completed_at: str | None = None
+
+
+class RunListResponse(BaseModel):
+    items: list[RunListItem]
+    page: int
+    page_size: int
+    total: int
+    data_origin: str
+
+
+class DashboardResponse(BaseModel):
+    run_id: str
+    summary: dict[str, Any]
+    counts: dict[str, int]
+    files: dict[str, bool]
+    runtime_files: list[dict[str, Any]]
+    validation: dict[str, Any]
+    final_output_summary: dict[str, Any]
+    strategy_review_status: str
+    business_summary: dict[str, Any]
+    stage_conclusions: list[dict[str, Any]]
+    rule_application_summary: list[dict[str, Any]]
+    walk_graph: dict[str, Any]
+    primary_failure_reason: dict[str, Any] | None = None
+    technical_refs: dict[str, Any]
+    data_origin: str
+    links: dict[str, str]
+
+
+class RuntimeFilesResponse(BaseModel):
+    run_id: str
+    files: list[dict[str, Any]]
+    data_origin: str
+
+
+class RuntimeFileResponse(BaseModel):
+    run_id: str
+    filename: str
+    data_origin: str
+    data: dict[str, Any] | None = None
+    records: list[dict[str, Any]] | None = None
+    offset: int | None = None
+    limit: int | None = None
+    total: int | None = None
+
+
+class QueryListResponse(BaseModel):
+    run_id: str
+    items: list[dict[str, Any]]
+    total: int
+    data_origin: str
+
+
+class TimelineResponse(BaseModel):
+    run_id: str
+    items: list[dict[str, Any]]
+    total: int
+    data_origin: str
+
+
+class ContentItemsResponse(BaseModel):
+    run_id: str
+    items: list[dict[str, Any]]
+    total: int
+    data_origin: str

+ 57 - 0
tests/test_api.py

@@ -49,6 +49,63 @@ def test_api_runs_and_queries_mock_chain(tmp_path, monkeypatch):
     summary = client.get(f"/runs/{run_id}").json()
     assert summary["validation_status"] == "pass"
 
+    run_list = client.get("/runs").json()
+    assert run_list["total"] == 1
+    assert run_list["items"][0]["run_id"] == run_id
+    assert run_list["data_origin"] == "runtime_export"
+
+    dashboard = client.get(f"/runs/{run_id}/dashboard").json()
+    assert dashboard["run_id"] == run_id
+    assert dashboard["data_origin"] == "runtime_export"
+    assert dashboard["counts"]["queries"] >= 1
+    assert dashboard["runtime_files"]
+    assert dashboard["business_summary"]["query_count"] >= 1
+    assert {stage["stage_id"] for stage in dashboard["stage_conclusions"]} >= {
+        "source",
+        "query",
+        "platform",
+        "judge",
+        "walk",
+        "asset",
+        "learning",
+    }
+    query_stage = next(stage for stage in dashboard["stage_conclusions"] if stage["stage_id"] == "query")
+    assert "生成成功" in query_stage["metric"]
+    assert "llm_variant" not in query_stage["metric"]
+    source_stage = next(stage for stage in dashboard["stage_conclusions"] if stage["stage_id"] == "source")
+    assert source_stage["detail"] == "需求池 ID:1"
+    assert isinstance(dashboard["rule_application_summary"], list)
+    assert "nodes" in dashboard["walk_graph"]
+    assert dashboard["technical_refs"]["runtime_files_url"].endswith("/runtime-files")
+
+    queries = client.get(f"/runs/{run_id}/queries").json()
+    assert queries["total"] >= 1
+    assert queries["items"][0]["search_query_id"]
+
+    content_items = client.get(f"/runs/{run_id}/content-items").json()
+    assert content_items["total"] >= 1
+    assert "rule_decision" in content_items["items"][0]
+
+    timeline = client.get(f"/runs/{run_id}/timeline").json()
+    assert timeline["total"] >= 1
+    assert any(item["source"] == "run_events.jsonl" for item in timeline["items"])
+
+    runtime_files = client.get(f"/runs/{run_id}/runtime-files").json()
+    filenames = {item["filename"] for item in runtime_files["files"]}
+    assert "search_queries.jsonl" in filenames
+
+    runtime_file = client.get(f"/runs/{run_id}/runtime-files/search_queries.jsonl").json()
+    assert runtime_file["records"]
+    assert runtime_file["data_origin"] == "runtime_export"
+
+    bad_runtime_file = client.get(f"/runs/{run_id}/runtime-files/not_allowed.jsonl")
+    assert bad_runtime_file.status_code == 400
+    assert bad_runtime_file.json()["detail"]["error_code"] == "INVALID_REQUEST"
+
+    missing_dashboard = client.get("/runs/not-a-run/dashboard")
+    assert missing_dashboard.status_code == 404
+    assert missing_dashboard.json()["detail"]["error_code"] == "RUN_NOT_FOUND"
+
 
 def test_api_defaults_to_real_platform_mode_but_can_select_mock(tmp_path, monkeypatch):
     selected_modes = []