from ast import main import json import uuid from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from utils.params import TaskStatusParam, DecodeListParam,TopicListParam,EvaluateListParam,EvaluateStatusParam,DecodeWorkflowParam from dotenv import load_dotenv, find_dotenv from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db from decode_task.topicTask import get_topic_result_by_id as get_topic_result_by_id_db from decode_task.topicTask import update_topic_result_by_id as update_topic_result_by_id_db from decode_task.topicTask import search_topic_list as search_topic_list_db from typing import List from models.decode_record import DecodeRecord from models.evaluate_record import EvaluateRecord from models.decode_workflow import DecodeWorkflow from task_schedule import TaskScheduler from loguru import logger import sys logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) load_dotenv(find_dotenv(), override=False) app = FastAPI() scheduler = TaskScheduler() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse( status_code=200, content={"code": exc.status_code, "message": exc.message, "data": None} ) @app.on_event("startup") def startup_event(): scheduler.start() @app.post("/decodeWorkflow/create") def decode_topic(param:TopicListParam): video_list = param.video_list logger.info(f"数据池数据 = {video_list}") data_list = [] for video in video_list: video_id = video.video_id video_url = video.video_url video_title = video.title task_id = str(uuid.uuid4()) DecodeWorkflow( task_id=task_id, video_id=video_id, video_url=video_url, title=video_title, type=param.type, result=None, ).save() data_list.append({ "task_id": task_id, "video_id": video_id, "video_url": video_url, "title": video_title, "task_status": 0, "type": param.type, }) return { "code": 0, "message": "success", "data":{ "type": param.type, "tasks": data_list } } @app.post("/decodeWorkflow/update") def update_topic_result(param: DecodeWorkflowParam): db_res = update_topic_result_by_id_db(param) logger.info(f"\n查询结构结果的task_id = {param.task_id}") if not db_res: return { "code": -1, "message": '任务不存在', "data": None } result, status,error_reason,video_url,title = db_res return { "code": 0, "message": status == 2 and "success" or error_reason, "data": { "result": result, "status": status, "error":error_reason, "video_url":video_url, "title":title, } } @app.get("/decodeWorkflow/result") def get_topic_result(task_id: str): result = get_topic_result_by_id_db(task_id) logger.info(f"\n查询结构结果的task_id = {task_id}") if not result: return { "code": -1, "message": '任务不存在', "data": None } status = result["task_status"] error_reason = result["error_reason"] return { "code": status, "message": status == 2 and "success" or error_reason, "data": result } @app.get("/decodeWorkflow/list") def search_topic_list(video_id: str = None, video_url: str = None, title: str = None, task_status: int = None): params = {} if video_id: params["video_id"] = video_id if video_url: params["video_url"] = video_url if title: params["title"] = title if task_status is not None: params["task_status"] = task_status tasks = search_topic_list_db(params if params else None) if not tasks: return { "code": -1, "message": "暂无数据", "data": [] } return { "code": 0, "message": "success", "data": tasks } @app.post("/decodeVideo/create") def decode_video(param:DecodeListParam): video_list = param.video_list logger.info(f"数据池数据 = {video_list}") data_list = [] for video in video_list: video_id = video.channel_content_id video_url = video.video task_id = str(uuid.uuid4()) DecodeRecord( task_id=task_id, video_id=video_id, video_url=video_url, task_status = 0 ).save() data_list.append({ "task_id": task_id, "video_id": video_id, }) return { "code": 0, "message": "success", "data": data_list } @app.post("/decode/result") def get_decode_result(param: TaskStatusParam): db_res = get_decode_result_by_id_db(param.task_id) logger.info(f"\n查询结构结果的task_id = {param.task_id}") if not db_res: return { "code": -1, "message": '任务不存在', "data": None } result, status,error_reason,search_keywords = db_res return { "code": 0, "message": status == 2 and "success" or error_reason, "data": { "result": result, "status": status, "error":error_reason, "searchKeyword":search_keywords } } @app.post("/evaluate/create") def evaluate_video(param:EvaluateListParam): evaluate_list = param.evaluate_list logger.info(f"创建评估数据 = {evaluate_list}") data_list = [] for evaluate in evaluate_list: evaluate_id = str(uuid.uuid4()) task_id = evaluate.task_id channel_content_id = evaluate.channel_content_id EvaluateRecord( evaluate_id=evaluate_id, task_id=task_id, search_result= json.dumps(evaluate.result), evaluate_result=None, error_reason=None, status = 0 ).save() data_list.append({ "task_id": task_id, "evaluate_id": evaluate_id, "channel_content_id":channel_content_id }) return { "code": 0, "message": "success", "data": data_list } @app.post("/evaluate/result") def get_evaluate_result(param: EvaluateStatusParam): db_res = get_evaluate_result_by_id_db(param.evaluate_id) logger.info(f"\n查询评估结果的evaluate_id = {param.evaluate_id}") if not db_res: return { "code": -1, "message": '评估不存在', "data": None } result, status,error_reason = db_res return { "code": 0, "message": status == 2 and "success" or error_reason, "data": { "result": result, "status": status, "error":error_reason, } }