from wave import open as wave_open from pathlib import Path import re import os import json from typing import Optional import dashscope from dashscope.api_entities.dashscope_response import SpeechSynthesisResponse from dashscope.audio.tts import ResultCallback, SpeechSynthesizer, SpeechSynthesisResult import requests from ..schemas.speech import TextToSpeechResponse, DataPayload from ..core.config import get_settings settings = get_settings() # Configure DashScope API key from env/.env dashscope.api_key = settings.dashscope_api_key or "" UPLOAD_PATH = settings.upload_path or "" def _safe_filename(name: str) -> str: # Keep alphanum, dash, underscore, Chinese; replace others with '_' return re.sub(r"[^\w\-\u4e00-\u9fff]+", "_", name).strip("_") or "output" class SpeechProvider: def text_to_speech(self, volume: int, pitch: float, rate: float, filename: str, text: str, *, model: Optional[str] = None, format: Optional[str] = None) -> TextToSpeechResponse: # Resolve output path under project-root/temp and ensure directory exists project_root = Path(__file__).resolve().parents[2] # repo root audio_dir = project_root / "temp" audio_dir.mkdir(parents=True, exist_ok=True) # determine desired output format (default mp3 for smaller size) audio_format = (format or 'mp3').lower() if audio_format not in {"wav", "mp3"}: audio_format = "mp3" # choose extension and sample rate ext = "wav" if audio_format == "wav" else "mp3" sample_rate = 48000 if audio_format == "wav" else 24000 filename = f"{_safe_filename(filename)}.{ext}" out_path = audio_dir / filename # Prepare callback with audio params callback = Callback( out_path=str(out_path), sample_rate=sample_rate, channels=1, sampwidth=2, audio_format=audio_format, ) SpeechSynthesizer.call( model=(model or 'sambert-zhifei-v1'), volume=volume, text=text, pitch=pitch, rate=rate, format=audio_format, sample_rate=sample_rate, callback=callback, word_timestamp_enabled=True, phoneme_timestamp_enabled=True, ) # After synthesis completes, upload the file to OSS try: url = _upload_file(UPLOAD_PATH, out_path) # Upload succeeded; remove local audio file to save space try: Path(out_path).unlink(missing_ok=True) except Exception as del_err: print(f"[warn] Failed to delete local audio {out_path}: {del_err}") return TextToSpeechResponse( code=0, data=DataPayload(audio_url=url), msg='success' ) except Exception as e: # If upload fails, fall back to local path to avoid breaking print(f"[warn] Upload failed: {e}") return TextToSpeechResponse(code=1, msg='error') class Callback(ResultCallback): def __init__(self, out_path: str, sample_rate: int = 16000, channels: int = 1, sampwidth: int = 2, audio_format: str = "mp3"): self.out_path = out_path self.sample_rate = sample_rate self.channels = channels self.sampwidth = sampwidth self.wav_file = None self._fh = None self.audio_format = audio_format def on_open(self): print('Speech synthesizer is opened.') # Ensure parent directory exists (in case not created earlier) Path(self.out_path).parent.mkdir(parents=True, exist_ok=True) if self.audio_format == "wav": self.wav_file = wave_open(self.out_path, 'wb') self.wav_file.setnchannels(self.channels) self.wav_file.setsampwidth(self.sampwidth) self.wav_file.setframerate(self.sample_rate) else: # For mp3 (and other compressed formats), write raw bytes self._fh = open(self.out_path, 'wb') def on_complete(self): print('Speech synthesizer is completed.') if self.wav_file: self.wav_file.close() self.wav_file = None if self._fh: self._fh.close() self._fh = None def on_error(self, response: SpeechSynthesisResponse): print('Speech synthesizer failed, response is %s' % (str(response))) def on_close(self): print('Speech synthesizer is closed.') def on_event(self, result: SpeechSynthesisResult): frame = result.get_audio_frame() if not frame: return if self.wav_file: self.wav_file.writeframes(frame) elif self._fh: self._fh.write(frame) def _extract_url_from_response(resp_json: dict) -> Optional[str]: # Try common shapes: {data: {url}}, {url}, {data: "http..."} try_keys = [ ("data", "fileUrl"), ("data",), ("fileUrl",), ("result", "fileUrl"), ("payload", "fileUrl"), ] for path in try_keys: cur = resp_json ok = True for k in path: if isinstance(cur, dict) and k in cur: cur = cur[k] print(cur) else: ok = False break if ok and isinstance(cur, str) and cur.startswith("http"): return cur return None def _upload_file(upload_url: str, file_path: Path) -> str: if not upload_url: raise ValueError("upload_url is empty") if not Path(file_path).exists(): raise FileNotFoundError(str(file_path)) filename = Path(file_path).name # Guess content type content_type = "audio/mpeg" if filename.lower().endswith(".mp3") else "audio/wav" with open(file_path, "rb") as f: files = { "file": (filename, f, content_type), "fileType": (None, "VOICE") } resp = requests.post(upload_url, files=files, timeout=30) resp.raise_for_status() # Try to parse JSON for a URL; fallback to raw text if JSON invalid url: Optional[str] = None try: data = resp.json() url = _extract_url_from_response(data) except Exception: pass if not url: # As a last resort, if the response text looks like a URL, use it txt = (resp.text or "").strip() if txt.startswith("http"): url = txt if not url: raise RuntimeError("Upload succeeded but no URL found in response") return url