main.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. from ast import main
  2. import json
  3. import uuid
  4. from fastapi import FastAPI, HTTPException, Request
  5. from fastapi.responses import JSONResponse
  6. from pydantic import BaseModel
  7. from utils.params import TaskStatusParam, DecodeListParam,TopicListParam,EvaluateListParam,EvaluateStatusParam,DecodeWorkflowParam
  8. from dotenv import load_dotenv, find_dotenv
  9. from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db
  10. from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db
  11. from decode_task.topicTask import get_topic_result_by_id as get_topic_result_by_id_db
  12. from decode_task.topicTask import update_topic_result_by_id as update_topic_result_by_id_db
  13. from typing import List
  14. from models.decode_record import DecodeRecord
  15. from models.evaluate_record import EvaluateRecord
  16. from models.decode_workflow import DecodeWorkflow
  17. from task_schedule import TaskScheduler
  18. from loguru import logger
  19. import sys
  20. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  21. load_dotenv(find_dotenv(), override=False)
  22. app = FastAPI()
  23. scheduler = TaskScheduler()
  24. @app.exception_handler(HTTPException)
  25. async def http_exception_handler(request: Request, exc: HTTPException):
  26. return JSONResponse(
  27. status_code=200,
  28. content={"code": exc.status_code, "message": exc.message, "data": None}
  29. )
  30. @app.on_event("startup")
  31. def startup_event():
  32. scheduler.start()
  33. @app.post("/decodeWorkflow/create")
  34. def decode_topic(param:TopicListParam):
  35. video_list = param.video_list
  36. logger.info(f"数据池数据 = {video_list}")
  37. data_list = []
  38. for video in video_list:
  39. video_id = video.channel_content_id
  40. video_url = video.video
  41. video_title = video.title
  42. task_id = str(uuid.uuid4())
  43. DecodeWorkflow(
  44. task_id=task_id,
  45. video_id=video_id,
  46. video_url=video_url,
  47. title=video_title,
  48. type=param.type,
  49. result=None,
  50. task_status = 0
  51. ).save()
  52. data_list.append({
  53. "task_id": task_id,
  54. "video_id": video_id,
  55. "video_url": video_url,
  56. "title": video_title,
  57. })
  58. return {
  59. "code": 0,
  60. "message": "success",
  61. "data":{
  62. "type": param.type,
  63. "tasks": data_list
  64. }
  65. }
  66. @app.post("/decodeWorkflow/update")
  67. def update_topic_result(param: DecodeWorkflowParam):
  68. db_res = update_topic_result_by_id_db(param)
  69. logger.info(f"\n查询结构结果的task_id = {param.task_id}")
  70. if not db_res:
  71. return {
  72. "code": -1,
  73. "message": '任务不存在',
  74. "data": None
  75. }
  76. result, status,error_reason,video_url,title = db_res
  77. return {
  78. "code": 0,
  79. "message": status == 2 and "success" or error_reason,
  80. "data": {
  81. "result": result,
  82. "status": status,
  83. "error":error_reason,
  84. "video_url":video_url,
  85. "title":title,
  86. }
  87. }
  88. @app.post("/decodeWorkflow/result")
  89. def get_topic_result(param: TaskStatusParam):
  90. db_res = get_topic_result_by_id_db(param.task_id)
  91. logger.info(f"\n查询结构结果的task_id = {param.task_id}")
  92. if not db_res:
  93. return {
  94. "code": -1,
  95. "message": '任务不存在',
  96. "data": None
  97. }
  98. result, status,error_reason = db_res
  99. return {
  100. "code": 0,
  101. "message": status == 2 and "success" or error_reason,
  102. "data": {
  103. "result": result,
  104. "status": status,
  105. "error":error_reason,
  106. }
  107. }
  108. @app.post("/decodeVideo/create")
  109. def decode_video(param:DecodeListParam):
  110. video_list = param.video_list
  111. logger.info(f"数据池数据 = {video_list}")
  112. data_list = []
  113. for video in video_list:
  114. video_id = video.channel_content_id
  115. video_url = video.video
  116. task_id = str(uuid.uuid4())
  117. DecodeRecord(
  118. task_id=task_id,
  119. video_id=video_id,
  120. video_url=video_url,
  121. task_status = 0
  122. ).save()
  123. data_list.append({
  124. "task_id": task_id,
  125. "video_id": video_id,
  126. })
  127. return {
  128. "code": 0,
  129. "message": "success",
  130. "data": data_list
  131. }
  132. @app.post("/decode/result")
  133. def get_decode_result(param: TaskStatusParam):
  134. db_res = get_decode_result_by_id_db(param.task_id)
  135. logger.info(f"\n查询结构结果的task_id = {param.task_id}")
  136. if not db_res:
  137. return {
  138. "code": -1,
  139. "message": '任务不存在',
  140. "data": None
  141. }
  142. result, status,error_reason,search_keywords = db_res
  143. return {
  144. "code": 0,
  145. "message": status == 2 and "success" or error_reason,
  146. "data": {
  147. "result": result,
  148. "status": status,
  149. "error":error_reason,
  150. "searchKeyword":search_keywords
  151. }
  152. }
  153. @app.post("/evaluate/create")
  154. def evaluate_video(param:EvaluateListParam):
  155. evaluate_list = param.evaluate_list
  156. logger.info(f"创建评估数据 = {evaluate_list}")
  157. data_list = []
  158. for evaluate in evaluate_list:
  159. evaluate_id = str(uuid.uuid4())
  160. task_id = evaluate.task_id
  161. channel_content_id = evaluate.channel_content_id
  162. EvaluateRecord(
  163. evaluate_id=evaluate_id,
  164. task_id=task_id,
  165. search_result= json.dumps(evaluate.result),
  166. evaluate_result=None,
  167. error_reason=None,
  168. status = 0
  169. ).save()
  170. data_list.append({
  171. "task_id": task_id,
  172. "evaluate_id": evaluate_id,
  173. "channel_content_id":channel_content_id
  174. })
  175. return {
  176. "code": 0,
  177. "message": "success",
  178. "data": data_list
  179. }
  180. @app.post("/evaluate/result")
  181. def get_evaluate_result(param: EvaluateStatusParam):
  182. db_res = get_evaluate_result_by_id_db(param.evaluate_id)
  183. logger.info(f"\n查询评估结果的evaluate_id = {param.evaluate_id}")
  184. if not db_res:
  185. return {
  186. "code": -1,
  187. "message": '评估不存在',
  188. "data": None
  189. }
  190. result, status,error_reason = db_res
  191. return {
  192. "code": 0,
  193. "message": status == 2 and "success" or error_reason,
  194. "data": {
  195. "result": result,
  196. "status": status,
  197. "error":error_reason,
  198. }
  199. }