| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- from __future__ import annotations
- import os
- from fastapi import FastAPI, HTTPException, Query
- from fastapi.exceptions import RequestValidationError
- from fastapi.encoders import jsonable_encoder
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import JSONResponse
- from content_agent.dashboard_service import DashboardService
- from content_agent.errors import ErrorCode, error_response, sanitize_error_detail
- from content_agent.run_service import RunService
- from content_agent.schemas import (
- ConfigFileResponse,
- ContentItemsResponse,
- DashboardResponse,
- JsonFileResponse,
- PlatformCatalogResponse,
- PlatformDescriptor,
- QueryListResponse,
- RecordsResponse,
- RunListResponse,
- RunStartRequest,
- RunStartResponse,
- RunSummaryResponse,
- RuntimeFileResponse,
- RuntimeFilesResponse,
- TimelineResponse,
- ValidationResponse,
- )
- app = FastAPI(title="Content Agent V1")
- service = RunService.from_env()
- _cors_origins = [
- origin.strip()
- for origin in os.environ.get(
- "CONTENT_AGENT_WEB_CORS_ORIGINS",
- "http://localhost:3000,http://127.0.0.1:3000,http://localhost:3010,http://127.0.0.1:3010",
- ).split(",")
- if origin.strip()
- ]
- if _cors_origins:
- app.add_middleware(
- CORSMiddleware,
- allow_origins=_cors_origins,
- allow_credentials=False,
- allow_methods=["GET", "POST", "OPTIONS"],
- allow_headers=["*"],
- )
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(request, exc: RequestValidationError):
- return JSONResponse(
- status_code=422,
- content={
- "detail": error_response(
- ErrorCode.INVALID_REQUEST,
- "invalid request",
- {"errors": jsonable_encoder(sanitize_error_detail(exc.errors()))},
- )
- },
- )
- @app.post("/runs", response_model=RunStartResponse)
- def start_run(request: RunStartRequest) -> RunStartResponse:
- state = service.start_run(request)
- if state["status"] not in {"success", "partial_success"}:
- detail = error_response(
- state.get("error_code", ErrorCode.RUN_START_FAILED),
- state.get("error_message", "run failed"),
- state.get("error_detail", {"errors": state.get("errors", [])}),
- )
- raise HTTPException(status_code=state.get("http_status_code", 500), detail=detail)
- run_id = state["run_id"]
- return RunStartResponse(
- run_id=run_id,
- policy_run_id=state["policy_run_id"],
- status=state["status"],
- policy_bundle_id=state["policy_bundle_id"],
- strategy_version=state["strategy_version"],
- platform=state["platform"],
- platform_mode=state["platform_mode"],
- output_dir=str(service.runtime.run_dir(run_id)),
- )
- @app.get("/runs", response_model=RunListResponse)
- def list_runs(
- status: str | None = None,
- platform: str | None = None,
- platform_mode: str | None = None,
- strategy_version: str | None = None,
- validation_status: str | None = None,
- error_code: str | None = None,
- page: int = Query(default=1, ge=1),
- page_size: int = Query(default=20, ge=1, le=100),
- ) -> RunListResponse:
- return RunListResponse(
- **_dashboard_service().list_runs(
- status=status,
- platform=platform,
- platform_mode=platform_mode,
- strategy_version=strategy_version,
- validation_status=validation_status,
- error_code=error_code,
- page=page,
- page_size=page_size,
- )
- )
- @app.get("/runs/{run_id}", response_model=RunSummaryResponse)
- def get_run(run_id: str) -> RunSummaryResponse:
- _ensure_run_exists(run_id)
- return RunSummaryResponse(**service.get_summary(run_id))
- _CONFIG_FILES = {
- "rule-packs": "product_documents/规则包/douyin_rule_packs.v1.json",
- "walk-strategy": "product_documents/抖音游走策略/douyin_walk_strategy.v1.json",
- "query-prompts": "product_documents/配置/query_prompts.v1.json",
- "walk-policy": "tech_documents/数据接口与来源/walk_policy.json",
- }
- def _config_file_response(key: str) -> ConfigFileResponse:
- from pathlib import Path
- from content_agent.integrations import config_store
- path = Path(_CONFIG_FILES[key])
- if not path.exists():
- raise HTTPException(status_code=404, detail=error_response(
- ErrorCode.RUN_NOT_FOUND, f"config file missing: {key}", {"source_file": str(path)}
- ))
- data, _ = config_store.load_json(path)
- return ConfigFileResponse(source_file=str(path), data=data)
- @app.get("/config/rule-packs", response_model=ConfigFileResponse)
- def get_config_rule_packs() -> ConfigFileResponse:
- return _config_file_response("rule-packs")
- @app.get("/config/walk-strategy", response_model=ConfigFileResponse)
- def get_config_walk_strategy() -> ConfigFileResponse:
- return _config_file_response("walk-strategy")
- @app.get("/config/query-prompts", response_model=ConfigFileResponse)
- def get_config_query_prompts() -> ConfigFileResponse:
- return _config_file_response("query-prompts")
- @app.get("/config/walk-policy", response_model=ConfigFileResponse)
- def get_config_walk_policy() -> ConfigFileResponse:
- # 返回原始 JSON 不解包 {value,provenance,tbd}:展示层保留拍板留痕,解包由前端做。
- return _config_file_response("walk-policy")
- _PLATFORM_PROFILE_DIR = "tech_documents/数据接口与来源/platform_profiles"
- @app.get("/config/platforms", response_model=PlatformCatalogResponse)
- def get_config_platforms() -> PlatformCatalogResponse:
- # 平台展示目录:从 platform_profiles/*.json 派生每平台的 label + 真实可得互动指标(heat 字段)。
- # 前端按此渲染平台名与互动数据——加平台只改 profile JSON,前端零改动。
- from pathlib import Path
- from content_agent.integrations import config_store
- catalog: dict[str, PlatformDescriptor] = {}
- profile_dir = Path(_PLATFORM_PROFILE_DIR)
- for path in sorted(profile_dir.glob("*.json")):
- try:
- data, _ = config_store.load_json(path)
- except (FileNotFoundError, OSError, ValueError):
- continue
- platform = str(data.get("platform") or path.stem)
- heat_fields = [
- str(sig["field"])
- for sig in ((data.get("heat") or {}).get("signals") or [])
- if isinstance(sig, dict) and sig.get("field")
- ]
- catalog[platform] = PlatformDescriptor(
- platform=platform,
- label=str(data.get("platform_label") or platform),
- status=data.get("status"),
- heat_fields=heat_fields,
- )
- return PlatformCatalogResponse(platforms=catalog)
- @app.get("/runs/{run_id}/dashboard", response_model=DashboardResponse)
- def get_run_dashboard(run_id: str) -> DashboardResponse:
- _ensure_web_run_exists(run_id)
- return DashboardResponse(**_dashboard_service().dashboard(run_id))
- @app.get("/runs/{run_id}/queries", response_model=QueryListResponse)
- def get_run_queries(run_id: str) -> QueryListResponse:
- _ensure_web_run_exists(run_id)
- return QueryListResponse(**_dashboard_service().queries(run_id))
- @app.get("/runs/{run_id}/timeline", response_model=TimelineResponse)
- def get_run_timeline(run_id: str) -> TimelineResponse:
- _ensure_web_run_exists(run_id)
- return TimelineResponse(**_dashboard_service().timeline(run_id))
- @app.get("/runs/{run_id}/content-items", response_model=ContentItemsResponse)
- def get_run_content_items(run_id: str) -> ContentItemsResponse:
- _ensure_web_run_exists(run_id)
- return ContentItemsResponse(**_dashboard_service().content_items(run_id))
- @app.get("/runs/{run_id}/runtime-files", response_model=RuntimeFilesResponse)
- def get_run_runtime_files(run_id: str) -> RuntimeFilesResponse:
- _ensure_web_run_exists(run_id)
- return RuntimeFilesResponse(**_dashboard_service().runtime_files(run_id))
- @app.get("/runs/{run_id}/runtime-files/{filename}", response_model=RuntimeFileResponse)
- def get_run_runtime_file(
- run_id: str,
- filename: str,
- limit: int = Query(default=100, ge=1, le=500),
- offset: int = Query(default=0, ge=0),
- ) -> RuntimeFileResponse:
- _ensure_web_run_exists(run_id)
- try:
- return RuntimeFileResponse(
- **_dashboard_service().runtime_file(
- run_id,
- filename,
- limit=limit,
- offset=offset,
- )
- )
- except ValueError:
- raise HTTPException(
- status_code=400,
- detail=error_response(
- ErrorCode.INVALID_REQUEST,
- "runtime filename is not allowed",
- {"filename": filename},
- ),
- )
- @app.get("/runs/{run_id}/discovered-content-items", response_model=RecordsResponse)
- def get_discovered_content_items(run_id: str) -> RecordsResponse:
- return _jsonl_response(run_id, "discovered_content_items.jsonl")
- @app.get("/runs/{run_id}/rule-decisions", response_model=RecordsResponse)
- def get_rule_decisions(run_id: str) -> RecordsResponse:
- return _jsonl_response(run_id, "rule_decisions.jsonl")
- @app.get("/runs/{run_id}/source-path-records", response_model=RecordsResponse)
- def get_source_path_records(run_id: str) -> RecordsResponse:
- return _jsonl_response(run_id, "source_path_records.jsonl")
- @app.get("/runs/{run_id}/final-output", response_model=JsonFileResponse)
- def get_final_output(run_id: str) -> JsonFileResponse:
- return _json_response(run_id, "final_output.json")
- @app.get("/runs/{run_id}/strategy-review", response_model=JsonFileResponse)
- def get_strategy_review(run_id: str) -> JsonFileResponse:
- _ensure_run_exists(run_id)
- return JsonFileResponse(run_id=run_id, data=service.strategy_review(run_id))
- @app.post("/runs/{run_id}/strategy-review/regenerate", response_model=JsonFileResponse)
- def regenerate_strategy_review(run_id: str) -> JsonFileResponse:
- _ensure_run_exists(run_id)
- return JsonFileResponse(run_id=run_id, data=service.regenerate_strategy_review(run_id))
- @app.get("/runs/{run_id}/validation", response_model=ValidationResponse)
- def get_validation(run_id: str) -> ValidationResponse:
- _ensure_run_exists(run_id)
- return ValidationResponse(**service.validate_run(run_id))
- def _jsonl_response(run_id: str, filename: str) -> RecordsResponse:
- _ensure_run_exists(run_id)
- return RecordsResponse(run_id=run_id, records=service.read_jsonl(run_id, filename))
- def _json_response(run_id: str, filename: str) -> JsonFileResponse:
- _ensure_run_exists(run_id)
- return JsonFileResponse(run_id=run_id, data=service.read_json(run_id, filename))
- def _ensure_run_exists(run_id: str) -> None:
- if not service.runtime.run_dir(run_id).exists():
- raise HTTPException(
- status_code=404,
- detail=error_response(
- ErrorCode.RUN_NOT_FOUND,
- "run not found",
- {"run_id": run_id},
- ),
- )
- def _ensure_web_run_exists(run_id: str) -> None:
- if not _dashboard_service().run_exists(run_id):
- raise HTTPException(
- status_code=404,
- detail=error_response(
- ErrorCode.RUN_NOT_FOUND,
- "run not found",
- {"run_id": run_id},
- ),
- )
- def _dashboard_service() -> DashboardService:
- return DashboardService.from_runtime(service.runtime)
|