from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from starlette.responses import Response, StreamingResponse from utils.params import DecodeContentParam, PatternContentParam, TopicSearchParam from dotenv import load_dotenv, find_dotenv from typing import Any, Dict, List, Optional from tasks.decode import begin_decode_task from tasks.detail import get_decode_detail_by_task_id from tasks.pattern import begin_pattern_task from tasks.topic_search import search_topics from loguru import logger import sys logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) # 接口访问日志(与仅 ERROR 的 sink 并存,默认 stderr 仍会输出 INFO) _MAX_ACCESS_LOG_BYTES = 8192 def _preview_bytes(data: bytes) -> str: if not data: return "" text = data.decode("utf-8", errors="replace") if len(text) > _MAX_ACCESS_LOG_BYTES: return text[:_MAX_ACCESS_LOG_BYTES] + f"..." return text # 响应消息映射 RESPONSE_MSG_MAP = { 0: "success", 1002: "视频不存在", 2001: "解构/聚类任务创建失败", -1: "failed", 404: "任务不存在" } load_dotenv(find_dotenv(), override=False) app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def api_access_log_middleware(request: Request, call_next): """记录每个接口的请求(路径、查询、body)与响应(状态码、body)。""" body_bytes = await request.body() async def receive() -> dict: return {"type": "http.request", "body": body_bytes, "more_body": False} wrapped = Request(request.scope, receive) req_body_preview = _preview_bytes(body_bytes) if body_bytes else "" logger.info( "api_request method={} path={} query={} body={}", request.method, request.url.path, str(request.query_params), req_body_preview if req_body_preview else "", ) response = await call_next(wrapped) if isinstance(response, StreamingResponse): logger.info( "api_response status={} path={} body=", response.status_code, request.url.path, ) return response resp_chunks: List[bytes] = [] async for chunk in response.body_iterator: resp_chunks.append(chunk) resp_body = b"".join(resp_chunks) resp_preview = _preview_bytes(resp_body) if resp_body else "" logger.info( "api_response status={} path={} body={}", response.status_code, request.url.path, resp_preview if resp_preview else "", ) return Response( content=resp_body, status_code=response.status_code, headers=dict(response.headers), media_type=response.media_type, ) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """统一处理 HTTPException,保证返回结构与其它接口一致""" msg = RESPONSE_MSG_MAP.get(exc.status_code, "failed") content = { "code": exc.status_code, "msg": msg, "data": None, } # 将异常 detail 写入 reason,便于排查问题 if exc.detail: content["reason"] = str(exc.detail) return JSONResponse(status_code=200, content=content) def _build_api_response( code: int, data: Any = None, reason: Optional[str] = None ) -> JSONResponse: """构建统一的API响应""" msg = RESPONSE_MSG_MAP.get(code, "failed") content = { "code": code, "msg": msg, "data": data } # 失败时添加 reason 字段 if code != 0 and reason: content["reason"] = reason return JSONResponse(status_code=200, content=content) @app.post("/api/v1/content/tasks/decode") def decode_content(param: DecodeContentParam): """创建解构任务""" res = begin_decode_task(param) code = res.get("code", -1) task_id = res.get("task_id") reason = res.get("reason", "") return _build_api_response( code=code, data={"task_id": task_id} if task_id else None, reason=reason ) @app.get("/api/v1/content/tasks/{taskId}") def get_task_detail(taskId: str): """获取任务详情""" result = get_decode_detail_by_task_id(taskId) # 任务不存在 if result is None: return _build_api_response(code=404, data=None) # 直接返回结果(已经包含 code、msg、data、reason) return JSONResponse(status_code=200, content=result) @app.post("/api/v1/content/tasks/pattern") def pattern_content(param: PatternContentParam): """创建聚类任务""" res = begin_pattern_task(param) code = res.get("code", -1) task_id = res.get("task_id") reason = res.get("reason", "") return _build_api_response( code=code, data={"task_id": task_id} if task_id else None, reason=reason ) @app.post("/api/v1/content/topics/search") def search_content_topics(param: TopicSearchParam): """视频选题检索:根据关键词在解构结果中匹配,返回匹配度最高的 top5""" results = search_topics(param) return _build_api_response(code=0, data=results)