from typing import Optional from fastapi import FastAPI, UploadFile, Depends, Request from fastapi.staticfiles import StaticFiles from sqlalchemy.orm import Session import hashlib import time import schema from database import SessionLocal, engine import model base_host = '/data/' model.Base.metadata.create_all(bind=engine) app = FastAPI() app.mount("/data", StaticFiles(directory="data"), name="data") def get_database_session(): try: db = SessionLocal() yield db finally: db.close() # @app.get("/") # def read_root(): # return {"Hello": "World"} # # # @app.get("/items/{item_id}") # def read_item(item_id: int, q: Optional[str] = None): # return {"item_id": item_id, "q": q} @app.post("/uploadfile/") def create_upload_file(file: Optional[UploadFile] = None, db: Session = Depends(get_database_session)): if not file: return {"code": "0", "msg": "no file"} else: file_location = f"./data/{file.filename}" with open(file_location, "wb+") as file_object: data = file.file.read() md5 = hashlib.md5(data).hexdigest() try: audio = model.CryptoAudio(md5=md5, name=file.filename, tm=time.time()) db.add(audio) db.commit() db.refresh(audio) except Exception as e: return {"code": "-1", "msg": str(e)} file_object.write(file.file.read()) return {"code": "0", "msg": "ok", 'data': {'md5': md5}} @app.get("/get_audio_file/{md5}") def get_audio_file(md5: str, db: Session = Depends(get_database_session)): try: item = db.query(model.CryptoAudio).filter(md5 == md5).first() except Exception as e: return {"code": "-1", "msg": str(e)} if item: return {"code": "0", "msg": "ok", 'data': {'url': f"{base_host}{item.name}"}} else: return {"code": "-1", "msg": "no exist"}