run.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import audioop
  2. import base64
  3. import numpy as np
  4. import soundfile as sf
  5. from fastapi import FastAPI, WebSocket
  6. from fastapi.responses import Response
  7. from loguru import logger
  8. from stream_service import FishAgentPipeline
  9. app = FastAPI()
  10. @app.post("/incoming")
  11. async def handle_incoming():
  12. xml = """<Response>
  13. <Connect>
  14. <Stream url="wss://2427-24-4-31-213.ngrok-free.app/connection" />
  15. </Connect>
  16. </Response>"""
  17. logger.info("Incoming call received")
  18. return Response(media_type="text/xml", content=xml)
  19. async def send_audio(ws, audio, stream_sid=""):
  20. await ws.send_json(
  21. {
  22. "streamSid": stream_sid,
  23. "event": "media",
  24. "media": {
  25. "payload": audio,
  26. },
  27. }
  28. )
  29. def decode_mu_law(data):
  30. samples = audioop.ulaw2lin(data, 2)
  31. samples = np.frombuffer(samples, dtype=np.int16)
  32. samples = samples.astype(np.float32) / 32768.0
  33. return samples
  34. def encode_mu_law(data):
  35. samples = np.clip(data, -1.0, 1.0)
  36. samples = (samples * 32768).astype(np.int16)
  37. samples = audioop.lin2ulaw(samples.tobytes(), 2)
  38. return samples
  39. is_working = False
  40. @app.websocket("/connection")
  41. async def handle_connection(websocket: WebSocket):
  42. global is_working
  43. await websocket.accept()
  44. logger.info("Connection established")
  45. stream_sid = None
  46. call_sid = None
  47. if is_working:
  48. logger.info("Already working, closing connection")
  49. await websocket.close()
  50. return
  51. is_working = True
  52. pipe.reset()
  53. while True:
  54. data = await websocket.receive_json()
  55. if data["event"] == "connected":
  56. logger.info("Connected message received")
  57. elif data["event"] == "start":
  58. stream_sid = data["start"]["streamSid"]
  59. call_sid = data["start"]["callSid"]
  60. logger.info(f"Start media streaming: {stream_sid} - {call_sid}")
  61. elif data["event"] == "media":
  62. payload = data["media"]["payload"]
  63. chunk = base64.b64decode(payload)
  64. samples = decode_mu_law(chunk)
  65. for i in pipe.add_chunk(samples, sr=8000):
  66. await send_audio(
  67. websocket, base64.b64encode(encode_mu_law(i)).decode(), stream_sid
  68. )
  69. elif data["event"] == "closed":
  70. logger.info("Connection closed")
  71. await websocket.close()
  72. break
  73. elif data["event"] == "stop":
  74. logger.info("Stop media streaming")
  75. await websocket.close()
  76. break
  77. else:
  78. logger.info(f"Unknown event: {data}")
  79. is_working = False
  80. if __name__ == "__main__":
  81. import uvicorn
  82. pipe = FishAgentPipeline()
  83. pipe.warmup()
  84. logger.info("Starting server")
  85. uvicorn.run(app, host="localhost", port=5000)