from wave import open as wave_open from pathlib import Path import re import os import json from typing import Optional import dashscope from dashscope.api_entities.dashscope_response import SpeechSynthesisResponse from dashscope.audio.tts import ResultCallback, SpeechSynthesizer, SpeechSynthesisResult import requests from ..schemas.speech import TextToSpeechResponse, DataPayload from ..core.config import get_settings from ..core.logger import get_logger settings = get_settings() # Configure DashScope API key from env/.env dashscope.api_key = settings.dashscope_api_key or "" UPLOAD_PATH = settings.upload_path or "" # module logger logger = get_logger("speech_provider") def _safe_filename(name: str) -> str: # Keep alphanum, dash, underscore, Chinese; replace others with '_' return re.sub(r"[^\w\-\u4e00-\u9fff]+", "_", name).strip("_") or "output" class SpeechProvider: def text_to_speech(self, volume: int, pitch: float, rate: float, filename: str, text: str, *, model: Optional[str] = None, format: Optional[str] = None) -> TextToSpeechResponse: # Resolve output path under project-root/temp and ensure directory exists project_root = Path(__file__).resolve().parents[2] # repo root audio_dir = project_root / "temp" try: audio_dir.mkdir(parents=True, exist_ok=True) except Exception as e: logger.error("Failed to create audio directory %s: %s", audio_dir, e, exc_info=True) return TextToSpeechResponse(code=1, data=None, msg=f"create audio dir failed: {e}") # Basic input validation if not isinstance(text, str) or not text.strip(): msg = "text is required" logger.error(msg) return TextToSpeechResponse(code=1, data=None, msg=msg) if not isinstance(filename, str) or not filename.strip(): msg = "filename is required" logger.error(msg) return TextToSpeechResponse(code=1, data=None, msg=msg) if not dashscope.api_key: msg = "DASHSCOPE_API_KEY is missing" logger.error(msg) return TextToSpeechResponse(code=1, data=None, msg=msg) # determine desired output format (default mp3 for smaller size) audio_format = (format or 'mp3').lower() if audio_format not in {"wav", "mp3"}: logger.info("unsupported format '%s', fallback to mp3", audio_format) audio_format = "mp3" # choose extension and sample rate ext = "wav" if audio_format == "wav" else "mp3" sample_rate = 48000 if audio_format == "wav" else 24000 filename = f"{_safe_filename(filename)}.{ext}" out_path = audio_dir / filename # Prepare callback with audio params callback = Callback( out_path=str(out_path), sample_rate=sample_rate, channels=1, sampwidth=2, audio_format=audio_format, ) # Run TTS with robust error handling try: SpeechSynthesizer.call( model=(model or 'sambert-zhifei-v1'), volume=volume, text=text, pitch=pitch, rate=rate, format=audio_format, sample_rate=sample_rate, callback=callback, word_timestamp_enabled=True, phoneme_timestamp_enabled=True, ) except Exception as e: logger.error("TTS call failed", exc_info=True) # Ensure any open file handles are closed try: callback.on_complete() except Exception: pass return TextToSpeechResponse(code=1, data=None, msg=str(e)) if callback.had_error: # TTS reported an error via callback base_msg = callback.error_message or "speech synthesis failed" # Enrich message with model error code/status when available if callback.error_code or callback.status_code is not None: msg = f"[{callback.error_code or 'Error'}] {base_msg} (status={callback.status_code})" else: msg = base_msg logger.error("TTS callback error: %s", msg) return TextToSpeechResponse(code=1, data=None, msg=msg) # After synthesis completes, upload the file to OSS try: url = _upload_file(UPLOAD_PATH, out_path) # Upload succeeded; remove local audio file to save space try: Path(out_path).unlink(missing_ok=True) except Exception as del_err: logger.warning("Failed to delete local audio %s: %s", out_path, del_err) return TextToSpeechResponse( code=0, data=DataPayload(audio_url=url), msg='success' ) except Exception as e: # Keep local file for inspection; report error message logger.error("Upload failed", exc_info=True) return TextToSpeechResponse(code=1, data=None, msg=str(e)) class Callback(ResultCallback): def __init__(self, out_path: str, sample_rate: int = 16000, channels: int = 1, sampwidth: int = 2, audio_format: str = "mp3"): self.out_path = out_path self.sample_rate = sample_rate self.channels = channels self.sampwidth = sampwidth self.wav_file = None self._fh = None self.audio_format = audio_format self.had_error = False self.error_message: Optional[str] = None self.error_code: Optional[str] = None self.status_code: Optional[int] = None def on_open(self): logger.info('Speech synthesizer opened') try: # Ensure parent directory exists (in case not created earlier) Path(self.out_path).parent.mkdir(parents=True, exist_ok=True) if self.audio_format == "wav": self.wav_file = wave_open(self.out_path, 'wb') self.wav_file.setnchannels(self.channels) self.wav_file.setsampwidth(self.sampwidth) self.wav_file.setframerate(self.sample_rate) else: # For mp3 (and other compressed formats), write raw bytes self._fh = open(self.out_path, 'wb') except Exception as e: self.had_error = True self.error_message = f"open output failed: {e}" logger.error("Failed to open output file %s: %s", self.out_path, e, exc_info=True) def on_complete(self): logger.info('Speech synthesizer completed') if self.wav_file: self.wav_file.close() self.wav_file = None if self._fh: self._fh.close() self._fh = None def on_error(self, response: SpeechSynthesisResponse): # Capture error and mark state for upstream handling code, detail, status = _extract_dashscope_error(response) self.had_error = True self.error_message = detail self.error_code = code self.status_code = status # Log with structured context if code or status is not None: logger.error('Speech synthesizer failed: code=%s status=%s msg=%s', code, status, detail) else: logger.error('Speech synthesizer failed: %s', detail) # Ensure file handles are closed even on error try: self.on_complete() except Exception: pass def on_close(self): logger.info('Speech synthesizer closed') def on_event(self, result: SpeechSynthesisResult): frame = result.get_audio_frame() if not frame: return try: if self.wav_file: self.wav_file.writeframes(frame) elif self._fh: self._fh.write(frame) else: # No open handle; mark error to surface upstream self.had_error = True self.error_message = "audio handle not initialized" logger.error("Audio handle not initialized when receiving frame") except Exception as e: self.had_error = True self.error_message = f"write frame failed: {e}" logger.error("Failed writing audio frame: %s", e, exc_info=True) def _extract_url_from_response(resp_json: dict) -> Optional[str]: # Try common shapes: {data: {url}}, {url}, {data: "http..."} try_keys = [ ("data", "fileUrl"), ("data",), ("fileUrl",), ("result", "fileUrl"), ("payload", "fileUrl"), ] for path in try_keys: cur = resp_json ok = True for k in path: if isinstance(cur, dict) and k in cur: cur = cur[k] else: ok = False break if ok and isinstance(cur, str) and cur.startswith("http"): return cur return None def _extract_dashscope_error(resp: object) -> tuple[Optional[str], str, Optional[int]]: """Best-effort extraction of (code, message, http_status) from DashScope response. Compatible with SpeechSynthesisResponse or dict-like payloads. """ code: Optional[str] = None msg: str = "speech synthesis failed" status: Optional[int] = None # If it looks like a dict if isinstance(resp, dict): code = str(resp.get("code")) if resp.get("code") is not None else None status = resp.get("status_code") if isinstance(resp.get("status_code"), int) else None msg = str(resp.get("message") or msg) return code, msg, status # Try attribute-style access try: status_attr = getattr(resp, "status_code", None) if isinstance(status_attr, int): status = status_attr except Exception: pass try: code_attr = getattr(resp, "code", None) if code_attr is not None: code = str(code_attr) except Exception: pass try: msg_attr = getattr(resp, "message", None) if msg_attr: msg = str(msg_attr) except Exception: pass # As a last resort, try to parse JSON from str(resp) try: s = str(resp) if s and s.strip().startswith("{"): data = json.loads(s) if isinstance(data, dict): code = str(data.get("code")) if data.get("code") is not None else code status = data.get("status_code") if isinstance(data.get("status_code"), int) else status msg = str(data.get("message") or msg) except Exception: pass return code, msg, status def _upload_file(upload_url: str, file_path: Path) -> str: if not upload_url: logger.error("upload_url is empty") raise ValueError("upload_url is empty") if not Path(file_path).exists(): logger.error("audio file not found: %s", file_path) raise FileNotFoundError(str(file_path)) filename = Path(file_path).name # Guess content type content_type = "audio/mpeg" if filename.lower().endswith(".mp3") else "audio/wav" with open(file_path, "rb") as f: files = { "file": (filename, f, content_type), "fileType": (None, "VOICE") } resp = requests.post(upload_url, files=files, timeout=30) resp.raise_for_status() # Try to parse JSON for a URL; fallback to raw text if JSON invalid url: Optional[str] = None try: data = resp.json() url = _extract_url_from_response(data) except Exception: logger.warning("Upload response is not valid JSON") if not url: # As a last resort, if the response text looks like a URL, use it txt = (resp.text or "").strip() if txt.startswith("http"): url = txt if not url: logger.error("upload succeeded but no URL found in response") raise RuntimeError("upload succeeded but no URL found in response") return url