| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- from fastapi import FastAPI, HTTPException, Request
- from fastapi.responses import JSONResponse
- from fastapi.middleware.cors import CORSMiddleware
- from starlette.responses import Response, StreamingResponse
- from utils.params import DecodeContentParam, PatternContentParam, TopicSearchParam
- from dotenv import load_dotenv, find_dotenv
- from typing import Any, Dict, List, Optional
- from tasks.decode import begin_decode_task
- from tasks.detail import get_decode_detail_by_task_id
- from tasks.pattern import begin_pattern_task
- from tasks.topic_search import search_topics
- from loguru import logger
- import sys
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- # 接口访问日志(与仅 ERROR 的 sink 并存,默认 stderr 仍会输出 INFO)
- _MAX_ACCESS_LOG_BYTES = 8192
- def _preview_bytes(data: bytes) -> str:
- if not data:
- return ""
- text = data.decode("utf-8", errors="replace")
- if len(text) > _MAX_ACCESS_LOG_BYTES:
- return text[:_MAX_ACCESS_LOG_BYTES] + f"...<truncated len={len(text)}>"
- return text
- # 响应消息映射
- RESPONSE_MSG_MAP = {
- 0: "success",
- 1002: "视频不存在",
- 2001: "解构/聚类任务创建失败",
- -1: "failed",
- 404: "任务不存在"
- }
- load_dotenv(find_dotenv(), override=False)
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.middleware("http")
- async def api_access_log_middleware(request: Request, call_next):
- """记录每个接口的请求(路径、查询、body)与响应(状态码、body)。"""
- body_bytes = await request.body()
- async def receive() -> dict:
- return {"type": "http.request", "body": body_bytes, "more_body": False}
- wrapped = Request(request.scope, receive)
- req_body_preview = _preview_bytes(body_bytes) if body_bytes else ""
- logger.info(
- "api_request method={} path={} query={} body={}",
- request.method,
- request.url.path,
- str(request.query_params),
- req_body_preview if req_body_preview else "<empty>",
- )
- response = await call_next(wrapped)
- if isinstance(response, StreamingResponse):
- logger.info(
- "api_response status={} path={} body=<streaming skipped>",
- response.status_code,
- request.url.path,
- )
- return response
- resp_chunks: List[bytes] = []
- async for chunk in response.body_iterator:
- resp_chunks.append(chunk)
- resp_body = b"".join(resp_chunks)
- resp_preview = _preview_bytes(resp_body) if resp_body else ""
- logger.info(
- "api_response status={} path={} body={}",
- response.status_code,
- request.url.path,
- resp_preview if resp_preview else "<empty>",
- )
- return Response(
- content=resp_body,
- status_code=response.status_code,
- headers=dict(response.headers),
- media_type=response.media_type,
- )
- @app.exception_handler(HTTPException)
- async def http_exception_handler(request: Request, exc: HTTPException):
- """统一处理 HTTPException,保证返回结构与其它接口一致"""
- msg = RESPONSE_MSG_MAP.get(exc.status_code, "failed")
- content = {
- "code": exc.status_code,
- "msg": msg,
- "data": None,
- }
- # 将异常 detail 写入 reason,便于排查问题
- if exc.detail:
- content["reason"] = str(exc.detail)
- return JSONResponse(status_code=200, content=content)
- def _build_api_response(
- code: int,
- data: Any = None,
- reason: Optional[str] = None
- ) -> JSONResponse:
- """构建统一的API响应"""
- msg = RESPONSE_MSG_MAP.get(code, "failed")
- content = {
- "code": code,
- "msg": msg,
- "data": data
- }
-
- # 失败时添加 reason 字段
- if code != 0 and reason:
- content["reason"] = reason
-
- return JSONResponse(status_code=200, content=content)
- @app.post("/api/v1/content/tasks/decode")
- def decode_content(param: DecodeContentParam):
- """创建解构任务"""
- res = begin_decode_task(param)
- code = res.get("code", -1)
- task_id = res.get("task_id")
- reason = res.get("reason", "")
-
-
- return _build_api_response(
- code=code,
- data={"task_id": task_id} if task_id else None,
- reason=reason
- )
- @app.get("/api/v1/content/tasks/{taskId}")
- def get_task_detail(taskId: str):
- """获取任务详情"""
- result = get_decode_detail_by_task_id(taskId)
-
- # 任务不存在
- if result is None:
- return _build_api_response(code=404, data=None)
-
- # 直接返回结果(已经包含 code、msg、data、reason)
- return JSONResponse(status_code=200, content=result)
- @app.post("/api/v1/content/tasks/pattern")
- def pattern_content(param: PatternContentParam):
- """创建聚类任务"""
- res = begin_pattern_task(param)
- code = res.get("code", -1)
- task_id = res.get("task_id")
- reason = res.get("reason", "")
-
- return _build_api_response(
- code=code,
- data={"task_id": task_id} if task_id else None,
- reason=reason
- )
- @app.post("/api/v1/content/topics/search")
- def search_content_topics(param: TopicSearchParam):
- """视频选题检索:根据关键词在解构结果中匹配,返回匹配度最高的 top5"""
- results = search_topics(param)
- return _build_api_response(code=0, data=results)
|