from ast import main import json import uuid from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from utils.params import TaskStatusParam, DecodeListParam,EvaluateListParam,EvaluateStatusParam from dotenv import load_dotenv, find_dotenv from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db from typing import List from models.decode_record import DecodeRecord from models.evaluate_record import EvaluateRecord from task_schedule import TaskScheduler from loguru import logger import sys logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) load_dotenv(find_dotenv(), override=False) app = FastAPI() scheduler = TaskScheduler() @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse( status_code=200, content={"code": exc.status_code, "message": exc.message, "data": None} ) @app.on_event("startup") def startup_event(): scheduler.start() @app.post("/decodeVideo/create") def decode_video(param:DecodeListParam): video_list = param.video_list logger.info(f"数据池数据 = {video_list}") data_list = [] for video in video_list: video_id = video.channel_content_id video_url = video.video task_id = str(uuid.uuid4()) DecodeRecord( task_id=task_id, video_id=video_id, video_url=video_url, task_status = 0 ).save() data_list.append({ "task_id": task_id, "video_id": video_id, }) return { "code": 0, "message": "success", "data": data_list } @app.post("/decode/result") def get_decode_result(param: TaskStatusParam): db_res = get_decode_result_by_id_db(param.task_id) logger.info(f"\n查询结构结果的task_id = {param.task_id}") if not db_res: return { "code": -1, "message": '任务不存在', "data": None } result, status,error_reason,search_keywords = db_res return { "code": 0, "message": status == 2 and "success" or error_reason, "data": { "result": result, "status": status, "error":error_reason, "searchKeyword":search_keywords } } @app.post("/evaluate/create") def evaluate_video(param:EvaluateListParam): evaluate_list = param.evaluate_list logger.info(f"创建评估数据 = {evaluate_list}") data_list = [] for evaluate in evaluate_list: evaluate_id = str(uuid.uuid4()) task_id = evaluate.task_id channel_content_id = evaluate.channel_content_id EvaluateRecord( evaluate_id=evaluate_id, task_id=task_id, search_result= json.dumps(evaluate.result), evaluate_result=None, error_reason=None, status = 0 ).save() data_list.append({ "task_id": task_id, "evaluate_id": evaluate_id, "channel_content_id":channel_content_id }) return { "code": 0, "message": "success", "data": data_list } @app.post("/evaluate/result") def get_evaluate_result(param: EvaluateStatusParam): db_res = get_evaluate_result_by_id_db(param.evaluate_id) logger.info(f"\n查询评估结果的evaluate_id = {param.evaluate_id}") if not db_res: return { "code": -1, "message": '评估不存在', "data": None } result, status,error_reason = db_res return { "code": 0, "message": status == 2 and "success" or error_reason, "data": { "result": result, "status": status, "error":error_reason, } }