from __future__ import annotations import os from fastapi import FastAPI, HTTPException, Query from fastapi.exceptions import RequestValidationError from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from content_agent.dashboard_service import DashboardService from content_agent.errors import ErrorCode, error_response, sanitize_error_detail from content_agent.run_service import RunService from content_agent.schemas import ( ConfigFileResponse, ContentItemsResponse, DashboardResponse, JsonFileResponse, PlatformCatalogResponse, PlatformDescriptor, QueryListResponse, RecordsResponse, RunListResponse, RunStartRequest, RunStartResponse, RunSummaryResponse, RuntimeFileResponse, RuntimeFilesResponse, TimelineResponse, ValidationResponse, ) app = FastAPI(title="Content Agent V1") service = RunService.from_env() _cors_origins = [ origin.strip() for origin in os.environ.get( "CONTENT_AGENT_WEB_CORS_ORIGINS", "http://localhost:3000,http://127.0.0.1:3000,http://localhost:3010,http://127.0.0.1:3010", ).split(",") if origin.strip() ] if _cors_origins: app.add_middleware( CORSMiddleware, allow_origins=_cors_origins, allow_credentials=False, allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], ) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request, exc: RequestValidationError): return JSONResponse( status_code=422, content={ "detail": error_response( ErrorCode.INVALID_REQUEST, "invalid request", {"errors": jsonable_encoder(sanitize_error_detail(exc.errors()))}, ) }, ) @app.post("/runs", response_model=RunStartResponse) def start_run(request: RunStartRequest) -> RunStartResponse: state = service.start_run(request) if state["status"] not in {"success", "partial_success"}: detail = error_response( state.get("error_code", ErrorCode.RUN_START_FAILED), state.get("error_message", "run failed"), state.get("error_detail", {"errors": state.get("errors", [])}), ) raise HTTPException(status_code=state.get("http_status_code", 500), detail=detail) run_id = state["run_id"] return RunStartResponse( run_id=run_id, policy_run_id=state["policy_run_id"], status=state["status"], policy_bundle_id=state["policy_bundle_id"], strategy_version=state["strategy_version"], platform=state["platform"], platform_mode=state["platform_mode"], output_dir=str(service.runtime.run_dir(run_id)), ) @app.get("/runs", response_model=RunListResponse) def list_runs( status: str | None = None, platform: str | None = None, platform_mode: str | None = None, strategy_version: str | None = None, validation_status: str | None = None, error_code: str | None = None, page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=100), ) -> RunListResponse: return RunListResponse( **_dashboard_service().list_runs( status=status, platform=platform, platform_mode=platform_mode, strategy_version=strategy_version, validation_status=validation_status, error_code=error_code, page=page, page_size=page_size, ) ) @app.get("/runs/{run_id}", response_model=RunSummaryResponse) def get_run(run_id: str) -> RunSummaryResponse: _ensure_run_exists(run_id) return RunSummaryResponse(**service.get_summary(run_id)) _CONFIG_FILES = { "rule-packs": "product_documents/规则包/douyin_rule_packs.v1.json", "walk-strategy": "product_documents/抖音游走策略/douyin_walk_strategy.v1.json", "query-prompts": "product_documents/配置/query_prompts.v1.json", "walk-policy": "tech_documents/数据接口与来源/walk_policy.json", } def _config_file_response(key: str) -> ConfigFileResponse: from pathlib import Path from content_agent.integrations import config_store path = Path(_CONFIG_FILES[key]) if not path.exists(): raise HTTPException(status_code=404, detail=error_response( ErrorCode.RUN_NOT_FOUND, f"config file missing: {key}", {"source_file": str(path)} )) data, _ = config_store.load_json(path) return ConfigFileResponse(source_file=str(path), data=data) @app.get("/config/rule-packs", response_model=ConfigFileResponse) def get_config_rule_packs() -> ConfigFileResponse: return _config_file_response("rule-packs") @app.get("/config/walk-strategy", response_model=ConfigFileResponse) def get_config_walk_strategy() -> ConfigFileResponse: return _config_file_response("walk-strategy") @app.get("/config/query-prompts", response_model=ConfigFileResponse) def get_config_query_prompts() -> ConfigFileResponse: return _config_file_response("query-prompts") @app.get("/config/walk-policy", response_model=ConfigFileResponse) def get_config_walk_policy() -> ConfigFileResponse: # 返回原始 JSON 不解包 {value,provenance,tbd}:展示层保留拍板留痕,解包由前端做。 return _config_file_response("walk-policy") _PLATFORM_PROFILE_DIR = "tech_documents/数据接口与来源/platform_profiles" @app.get("/config/platforms", response_model=PlatformCatalogResponse) def get_config_platforms() -> PlatformCatalogResponse: # 平台展示目录:从 platform_profiles/*.json 派生每平台的 label + 真实可得互动指标(heat 字段)。 # 前端按此渲染平台名与互动数据——加平台只改 profile JSON,前端零改动。 from pathlib import Path from content_agent.integrations import config_store catalog: dict[str, PlatformDescriptor] = {} profile_dir = Path(_PLATFORM_PROFILE_DIR) for path in sorted(profile_dir.glob("*.json")): try: data, _ = config_store.load_json(path) except (FileNotFoundError, OSError, ValueError): continue platform = str(data.get("platform") or path.stem) heat_fields = [ str(sig["field"]) for sig in ((data.get("heat") or {}).get("signals") or []) if isinstance(sig, dict) and sig.get("field") ] catalog[platform] = PlatformDescriptor( platform=platform, label=str(data.get("platform_label") or platform), status=data.get("status"), heat_fields=heat_fields, ) return PlatformCatalogResponse(platforms=catalog) @app.get("/runs/{run_id}/dashboard", response_model=DashboardResponse) def get_run_dashboard(run_id: str) -> DashboardResponse: _ensure_web_run_exists(run_id) return DashboardResponse(**_dashboard_service().dashboard(run_id)) @app.get("/runs/{run_id}/queries", response_model=QueryListResponse) def get_run_queries(run_id: str) -> QueryListResponse: _ensure_web_run_exists(run_id) return QueryListResponse(**_dashboard_service().queries(run_id)) @app.get("/runs/{run_id}/timeline", response_model=TimelineResponse) def get_run_timeline(run_id: str) -> TimelineResponse: _ensure_web_run_exists(run_id) return TimelineResponse(**_dashboard_service().timeline(run_id)) @app.get("/runs/{run_id}/content-items", response_model=ContentItemsResponse) def get_run_content_items(run_id: str) -> ContentItemsResponse: _ensure_web_run_exists(run_id) return ContentItemsResponse(**_dashboard_service().content_items(run_id)) @app.get("/runs/{run_id}/runtime-files", response_model=RuntimeFilesResponse) def get_run_runtime_files(run_id: str) -> RuntimeFilesResponse: _ensure_web_run_exists(run_id) return RuntimeFilesResponse(**_dashboard_service().runtime_files(run_id)) @app.get("/runs/{run_id}/runtime-files/{filename}", response_model=RuntimeFileResponse) def get_run_runtime_file( run_id: str, filename: str, limit: int = Query(default=100, ge=1, le=500), offset: int = Query(default=0, ge=0), ) -> RuntimeFileResponse: _ensure_web_run_exists(run_id) try: return RuntimeFileResponse( **_dashboard_service().runtime_file( run_id, filename, limit=limit, offset=offset, ) ) except ValueError: raise HTTPException( status_code=400, detail=error_response( ErrorCode.INVALID_REQUEST, "runtime filename is not allowed", {"filename": filename}, ), ) @app.get("/runs/{run_id}/discovered-content-items", response_model=RecordsResponse) def get_discovered_content_items(run_id: str) -> RecordsResponse: return _jsonl_response(run_id, "discovered_content_items.jsonl") @app.get("/runs/{run_id}/rule-decisions", response_model=RecordsResponse) def get_rule_decisions(run_id: str) -> RecordsResponse: return _jsonl_response(run_id, "rule_decisions.jsonl") @app.get("/runs/{run_id}/source-path-records", response_model=RecordsResponse) def get_source_path_records(run_id: str) -> RecordsResponse: return _jsonl_response(run_id, "source_path_records.jsonl") @app.get("/runs/{run_id}/final-output", response_model=JsonFileResponse) def get_final_output(run_id: str) -> JsonFileResponse: return _json_response(run_id, "final_output.json") @app.get("/runs/{run_id}/strategy-review", response_model=JsonFileResponse) def get_strategy_review(run_id: str) -> JsonFileResponse: _ensure_run_exists(run_id) return JsonFileResponse(run_id=run_id, data=service.strategy_review(run_id)) @app.post("/runs/{run_id}/strategy-review/regenerate", response_model=JsonFileResponse) def regenerate_strategy_review(run_id: str) -> JsonFileResponse: _ensure_run_exists(run_id) return JsonFileResponse(run_id=run_id, data=service.regenerate_strategy_review(run_id)) @app.get("/runs/{run_id}/validation", response_model=ValidationResponse) def get_validation(run_id: str) -> ValidationResponse: _ensure_run_exists(run_id) return ValidationResponse(**service.validate_run(run_id)) def _jsonl_response(run_id: str, filename: str) -> RecordsResponse: _ensure_run_exists(run_id) return RecordsResponse(run_id=run_id, records=service.read_jsonl(run_id, filename)) def _json_response(run_id: str, filename: str) -> JsonFileResponse: _ensure_run_exists(run_id) return JsonFileResponse(run_id=run_id, data=service.read_json(run_id, filename)) def _ensure_run_exists(run_id: str) -> None: if not service.runtime.run_dir(run_id).exists(): raise HTTPException( status_code=404, detail=error_response( ErrorCode.RUN_NOT_FOUND, "run not found", {"run_id": run_id}, ), ) def _ensure_web_run_exists(run_id: str) -> None: if not _dashboard_service().run_exists(run_id): raise HTTPException( status_code=404, detail=error_response( ErrorCode.RUN_NOT_FOUND, "run not found", {"run_id": run_id}, ), ) def _dashboard_service() -> DashboardService: return DashboardService.from_runtime(service.runtime)