from ast import main import json import uuid from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from utils.params import TaskStatusParam, DecodeListParam from dotenv import load_dotenv from decode_task.decodeTask import get_decode_result_by_id from decode_task.scriptTask import get_script_result_by_id from typing import List from models.decode_record import DecodeRecord from task_schedule import TaskScheduler from loguru import logger import sys logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) load_dotenv() app = FastAPI() scheduler = TaskScheduler() @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse( status_code=200, content={"code": exc.status_code, "message": exc.message, "data": None} ) @app.on_event("startup") def startup_event(): scheduler.start() @app.post("/decodeVideo/create") def decode_video(param:DecodeListParam): video_list = param.video_list data_list = [] for video in video_list: video_id = video.channel_content_id video_url = video.video task_id = str(uuid.uuid4()) DecodeRecord( task_id=task_id, video_id=video_id, video_url=video_url, task_status = 0 ).save() data_list.append({ "task_id": task_id, "video_id": video_id, }) return { "code": 0, "message": "success", "data": data_list } @app.post("/decode/result") def get_script_result(param: TaskStatusParam): decode_result,script_result, task_status = get_script_result_by_id(param.task_id) if script_result: return { "code": 0, "message": "success", "data": { "script_result": script_result, "decode_result": decode_result, "task_status": task_status } } else: return { "code": -1, "message": "error", "data": None }