| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- from ast import main
- import json
- import uuid
- from fastapi import FastAPI, HTTPException, Request
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel
- from utils.params import TaskStatusParam, DecodeListParam,TopicListParam,EvaluateListParam,EvaluateStatusParam,DecodeWorkflowParam
- from dotenv import load_dotenv, find_dotenv
- from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db
- from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db
- from decode_task.topicTask import get_topic_result_by_id as get_topic_result_by_id_db
- from decode_task.topicTask import update_topic_result_by_id as update_topic_result_by_id_db
- from typing import List
- from models.decode_record import DecodeRecord
- from models.evaluate_record import EvaluateRecord
- from models.decode_workflow import DecodeWorkflow
- from task_schedule import TaskScheduler
- from loguru import logger
- import sys
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- load_dotenv(find_dotenv(), override=False)
- app = FastAPI()
- scheduler = TaskScheduler()
- @app.exception_handler(HTTPException)
- async def http_exception_handler(request: Request, exc: HTTPException):
- return JSONResponse(
- status_code=200,
- content={"code": exc.status_code, "message": exc.message, "data": None}
- )
- @app.on_event("startup")
- def startup_event():
- scheduler.start()
- @app.post("/decodeWorkflow/create")
- def decode_topic(param:TopicListParam):
- video_list = param.video_list
- logger.info(f"数据池数据 = {video_list}")
- data_list = []
- for video in video_list:
- video_id = video.channel_content_id
- video_url = video.video
- video_title = video.title
- task_id = str(uuid.uuid4())
- DecodeWorkflow(
- task_id=task_id,
- video_id=video_id,
- video_url=video_url,
- title=video_title,
- type=param.type,
- result=None,
- task_status = 0
- ).save()
- data_list.append({
- "task_id": task_id,
- "video_id": video_id,
- "video_url": video_url,
- "title": video_title,
-
- })
- return {
- "code": 0,
- "message": "success",
- "data":{
- "type": param.type,
- "tasks": data_list
- }
- }
- @app.post("/decodeWorkflow/update")
- def update_topic_result(param: DecodeWorkflowParam):
- db_res = update_topic_result_by_id_db(param)
- logger.info(f"\n查询结构结果的task_id = {param.task_id}")
- if not db_res:
- return {
- "code": -1,
- "message": '任务不存在',
- "data": None
- }
- result, status,error_reason,video_url,title = db_res
- return {
- "code": 0,
- "message": status == 2 and "success" or error_reason,
- "data": {
- "result": result,
- "status": status,
- "error":error_reason,
- "video_url":video_url,
- "title":title,
- }
- }
-
- @app.post("/decodeWorkflow/result")
- def get_topic_result(param: TaskStatusParam):
- db_res = get_topic_result_by_id_db(param.task_id)
- logger.info(f"\n查询结构结果的task_id = {param.task_id}")
- if not db_res:
- return {
- "code": -1,
- "message": '任务不存在',
- "data": None
- }
- result, status,error_reason = db_res
- return {
- "code": 0,
- "message": status == 2 and "success" or error_reason,
- "data": {
- "result": result,
- "status": status,
- "error":error_reason,
- }
- }
-
- @app.post("/decodeVideo/create")
- def decode_video(param:DecodeListParam):
- video_list = param.video_list
- logger.info(f"数据池数据 = {video_list}")
- data_list = []
- for video in video_list:
- video_id = video.channel_content_id
- video_url = video.video
- task_id = str(uuid.uuid4())
- DecodeRecord(
- task_id=task_id,
- video_id=video_id,
- video_url=video_url,
- task_status = 0
- ).save()
- data_list.append({
- "task_id": task_id,
- "video_id": video_id,
- })
- return {
- "code": 0,
- "message": "success",
- "data": data_list
- }
- @app.post("/decode/result")
- def get_decode_result(param: TaskStatusParam):
- db_res = get_decode_result_by_id_db(param.task_id)
- logger.info(f"\n查询结构结果的task_id = {param.task_id}")
- if not db_res:
- return {
- "code": -1,
- "message": '任务不存在',
- "data": None
- }
- result, status,error_reason,search_keywords = db_res
- return {
- "code": 0,
- "message": status == 2 and "success" or error_reason,
- "data": {
- "result": result,
- "status": status,
- "error":error_reason,
- "searchKeyword":search_keywords
- }
- }
-
- @app.post("/evaluate/create")
- def evaluate_video(param:EvaluateListParam):
- evaluate_list = param.evaluate_list
-
- logger.info(f"创建评估数据 = {evaluate_list}")
- data_list = []
- for evaluate in evaluate_list:
- evaluate_id = str(uuid.uuid4())
- task_id = evaluate.task_id
- channel_content_id = evaluate.channel_content_id
- EvaluateRecord(
- evaluate_id=evaluate_id,
- task_id=task_id,
- search_result= json.dumps(evaluate.result),
- evaluate_result=None,
- error_reason=None,
- status = 0
- ).save()
- data_list.append({
- "task_id": task_id,
- "evaluate_id": evaluate_id,
- "channel_content_id":channel_content_id
- })
- return {
- "code": 0,
- "message": "success",
- "data": data_list
- }
- @app.post("/evaluate/result")
- def get_evaluate_result(param: EvaluateStatusParam):
- db_res = get_evaluate_result_by_id_db(param.evaluate_id)
- logger.info(f"\n查询评估结果的evaluate_id = {param.evaluate_id}")
- if not db_res:
- return {
- "code": -1,
- "message": '评估不存在',
- "data": None
- }
- result, status,error_reason = db_res
- return {
- "code": 0,
- "message": status == 2 and "success" or error_reason,
- "data": {
- "result": result,
- "status": status,
- "error":error_reason,
- }
- }
|