main.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. from fastapi import FastAPI, HTTPException, Request
  2. from fastapi.responses import JSONResponse
  3. from fastapi.middleware.cors import CORSMiddleware
  4. from starlette.responses import Response, StreamingResponse
  5. from utils.params import DecodeContentParam, PatternContentParam, TopicSearchParam
  6. from dotenv import load_dotenv, find_dotenv
  7. from typing import Any, Dict, List, Optional
  8. from tasks.decode import begin_decode_task
  9. from tasks.detail import get_decode_detail_by_task_id
  10. from tasks.pattern import begin_pattern_task
  11. from tasks.topic_search import search_topics
  12. from scheduler.bootstrap import run_dispatch_once, start_scheduler, stop_scheduler
  13. from loguru import logger
  14. import sys
  15. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  16. # 接口访问日志(与仅 ERROR 的 sink 并存,默认 stderr 仍会输出 INFO)
  17. _MAX_ACCESS_LOG_BYTES = 8192
  18. def _preview_bytes(data: bytes) -> str:
  19. if not data:
  20. return ""
  21. text = data.decode("utf-8", errors="replace")
  22. if len(text) > _MAX_ACCESS_LOG_BYTES:
  23. return text[:_MAX_ACCESS_LOG_BYTES] + f"...<truncated len={len(text)}>"
  24. return text
  25. # 响应消息映射
  26. RESPONSE_MSG_MAP = {
  27. 0: "success",
  28. 1002: "视频不存在",
  29. 2001: "解构/聚类任务创建失败",
  30. -1: "failed",
  31. 404: "任务不存在"
  32. }
  33. load_dotenv(find_dotenv(), override=False)
  34. app = FastAPI()
  35. app.add_middleware(
  36. CORSMiddleware,
  37. allow_origins=["*"],
  38. allow_credentials=True,
  39. allow_methods=["*"],
  40. allow_headers=["*"],
  41. )
  42. @app.on_event("startup")
  43. async def on_startup() -> None:
  44. start_scheduler()
  45. @app.on_event("shutdown")
  46. async def on_shutdown() -> None:
  47. stop_scheduler()
  48. @app.middleware("http")
  49. async def api_access_log_middleware(request: Request, call_next):
  50. """记录每个接口的请求(路径、查询、body)与响应(状态码、body)。"""
  51. body_bytes = await request.body()
  52. async def receive() -> dict:
  53. return {"type": "http.request", "body": body_bytes, "more_body": False}
  54. wrapped = Request(request.scope, receive)
  55. req_body_preview = _preview_bytes(body_bytes) if body_bytes else ""
  56. logger.info(
  57. "api_request method={} path={} query={} body={}",
  58. request.method,
  59. request.url.path,
  60. str(request.query_params),
  61. req_body_preview if req_body_preview else "<empty>",
  62. )
  63. response = await call_next(wrapped)
  64. if isinstance(response, StreamingResponse):
  65. logger.info(
  66. "api_response status={} path={} body=<streaming skipped>",
  67. response.status_code,
  68. request.url.path,
  69. )
  70. return response
  71. resp_chunks: List[bytes] = []
  72. async for chunk in response.body_iterator:
  73. resp_chunks.append(chunk)
  74. resp_body = b"".join(resp_chunks)
  75. resp_preview = _preview_bytes(resp_body) if resp_body else ""
  76. logger.info(
  77. "api_response status={} path={} body={}",
  78. response.status_code,
  79. request.url.path,
  80. resp_preview if resp_preview else "<empty>",
  81. )
  82. return Response(
  83. content=resp_body,
  84. status_code=response.status_code,
  85. headers=dict(response.headers),
  86. media_type=response.media_type,
  87. )
  88. @app.exception_handler(HTTPException)
  89. async def http_exception_handler(request: Request, exc: HTTPException):
  90. """统一处理 HTTPException,保证返回结构与其它接口一致"""
  91. msg = RESPONSE_MSG_MAP.get(exc.status_code, "failed")
  92. content = {
  93. "code": exc.status_code,
  94. "msg": msg,
  95. "data": None,
  96. }
  97. # 将异常 detail 写入 reason,便于排查问题
  98. if exc.detail:
  99. content["reason"] = str(exc.detail)
  100. return JSONResponse(status_code=200, content=content)
  101. def _build_api_response(
  102. code: int,
  103. data: Any = None,
  104. reason: Optional[str] = None
  105. ) -> JSONResponse:
  106. """构建统一的API响应"""
  107. msg = RESPONSE_MSG_MAP.get(code, "failed")
  108. content = {
  109. "code": code,
  110. "msg": msg,
  111. "data": data
  112. }
  113. # 失败时添加 reason 字段
  114. if code != 0 and reason:
  115. content["reason"] = reason
  116. return JSONResponse(status_code=200, content=content)
  117. @app.post("/api/v1/content/tasks/decode")
  118. def decode_content(param: DecodeContentParam):
  119. """创建解构任务"""
  120. res = begin_decode_task(param)
  121. code = res.get("code", -1)
  122. task_id = res.get("task_id")
  123. reason = res.get("reason", "")
  124. return _build_api_response(
  125. code=code,
  126. data={"task_id": task_id} if task_id else None,
  127. reason=reason
  128. )
  129. @app.get("/api/v1/content/tasks/{taskId}")
  130. def get_task_detail(taskId: str):
  131. """获取任务详情"""
  132. result = get_decode_detail_by_task_id(taskId)
  133. # 任务不存在
  134. if result is None:
  135. return _build_api_response(code=404, data=None)
  136. # 直接返回结果(已经包含 code、msg、data、reason)
  137. return JSONResponse(status_code=200, content=result)
  138. @app.post("/api/v1/content/tasks/pattern")
  139. def pattern_content(param: PatternContentParam):
  140. """创建聚类任务"""
  141. res = begin_pattern_task(param)
  142. code = res.get("code", -1)
  143. task_id = res.get("task_id")
  144. reason = res.get("reason", "")
  145. return _build_api_response(
  146. code=code,
  147. data={"task_id": task_id} if task_id else None,
  148. reason=reason
  149. )
  150. @app.post("/api/v1/content/topics/search")
  151. def search_content_topics(param: TopicSearchParam):
  152. """视频选题检索:根据关键词在解构结果中匹配,返回匹配度最高的 top5"""
  153. results = search_topics(param)
  154. return _build_api_response(code=0, data=results)
  155. @app.post("/api/v1/content/tasks/scheduler/decode/run-once")
  156. def run_decode_scheduler_once():
  157. """手动触发一次17点解构调度任务,便于验证。"""
  158. run_dispatch_once()
  159. return _build_api_response(code=0, data={"triggered": True})