123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from wave import open as wave_open
- from pathlib import Path
- import re
- import os
- import json
- from typing import Optional
- import dashscope
- from dashscope.api_entities.dashscope_response import SpeechSynthesisResponse
- from dashscope.audio.tts import ResultCallback, SpeechSynthesizer, SpeechSynthesisResult
- import requests
- from ..schemas.speech import TextToSpeechResponse, DataPayload
- from ..core.config import get_settings
- settings = get_settings()
- # Configure DashScope API key from env/.env
- dashscope.api_key = settings.dashscope_api_key or ""
- UPLOAD_PATH = settings.upload_path or ""
- def _safe_filename(name: str) -> str:
- # Keep alphanum, dash, underscore, Chinese; replace others with '_'
- return re.sub(r"[^\w\-\u4e00-\u9fff]+", "_", name).strip("_") or "output"
- class SpeechProvider:
- def text_to_speech(self, volume: int, pitch: float, rate: float, filename: str, text: str, *, model: Optional[str] = None, format: Optional[str] = None) -> TextToSpeechResponse:
- # Resolve output path under project-root/temp and ensure directory exists
- project_root = Path(__file__).resolve().parents[2] # repo root
- audio_dir = project_root / "temp"
- audio_dir.mkdir(parents=True, exist_ok=True)
- # determine desired output format (default mp3 for smaller size)
- audio_format = (format or 'mp3').lower()
- if audio_format not in {"wav", "mp3"}:
- audio_format = "mp3"
- # choose extension and sample rate
- ext = "wav" if audio_format == "wav" else "mp3"
- sample_rate = 48000 if audio_format == "wav" else 24000
- filename = f"{_safe_filename(filename)}.{ext}"
- out_path = audio_dir / filename
- # Prepare callback with audio params
- callback = Callback(
- out_path=str(out_path),
- sample_rate=sample_rate,
- channels=1,
- sampwidth=2,
- audio_format=audio_format,
- )
- SpeechSynthesizer.call(
- model=(model or 'sambert-zhifei-v1'),
- volume=volume,
- text=text,
- pitch=pitch,
- rate=rate,
- format=audio_format,
- sample_rate=sample_rate,
- callback=callback,
- word_timestamp_enabled=True,
- phoneme_timestamp_enabled=True,
- )
- # After synthesis completes, upload the file to OSS
- try:
- url = _upload_file(UPLOAD_PATH, out_path)
- # Upload succeeded; remove local audio file to save space
- try:
- Path(out_path).unlink(missing_ok=True)
- except Exception as del_err:
- print(f"[warn] Failed to delete local audio {out_path}: {del_err}")
- return TextToSpeechResponse(
- code=0,
- data=DataPayload(audio_url=url),
- msg='success'
- )
- except Exception as e:
- # If upload fails, fall back to local path to avoid breaking
- print(f"[warn] Upload failed: {e}")
- return TextToSpeechResponse(code=1, msg='error')
- class Callback(ResultCallback):
- def __init__(self, out_path: str, sample_rate: int = 16000, channels: int = 1, sampwidth: int = 2, audio_format: str = "mp3"):
- self.out_path = out_path
- self.sample_rate = sample_rate
- self.channels = channels
- self.sampwidth = sampwidth
- self.wav_file = None
- self._fh = None
- self.audio_format = audio_format
- def on_open(self):
- print('Speech synthesizer is opened.')
- # Ensure parent directory exists (in case not created earlier)
- Path(self.out_path).parent.mkdir(parents=True, exist_ok=True)
- if self.audio_format == "wav":
- self.wav_file = wave_open(self.out_path, 'wb')
- self.wav_file.setnchannels(self.channels)
- self.wav_file.setsampwidth(self.sampwidth)
- self.wav_file.setframerate(self.sample_rate)
- else:
- # For mp3 (and other compressed formats), write raw bytes
- self._fh = open(self.out_path, 'wb')
- def on_complete(self):
- print('Speech synthesizer is completed.')
- if self.wav_file:
- self.wav_file.close()
- self.wav_file = None
- if self._fh:
- self._fh.close()
- self._fh = None
- def on_error(self, response: SpeechSynthesisResponse):
- print('Speech synthesizer failed, response is %s' % (str(response)))
- def on_close(self):
- print('Speech synthesizer is closed.')
- def on_event(self, result: SpeechSynthesisResult):
- frame = result.get_audio_frame()
- if not frame:
- return
- if self.wav_file:
- self.wav_file.writeframes(frame)
- elif self._fh:
- self._fh.write(frame)
- def _extract_url_from_response(resp_json: dict) -> Optional[str]:
- # Try common shapes: {data: {url}}, {url}, {data: "http..."}
- try_keys = [
- ("data", "fileUrl"),
- ("data",),
- ("fileUrl",),
- ("result", "fileUrl"),
- ("payload", "fileUrl"),
- ]
- for path in try_keys:
- cur = resp_json
- ok = True
- for k in path:
- if isinstance(cur, dict) and k in cur:
- cur = cur[k]
- print(cur)
- else:
- ok = False
- break
- if ok and isinstance(cur, str) and cur.startswith("http"):
- return cur
- return None
- def _upload_file(upload_url: str, file_path: Path) -> str:
- if not upload_url:
- raise ValueError("upload_url is empty")
- if not Path(file_path).exists():
- raise FileNotFoundError(str(file_path))
- filename = Path(file_path).name
- # Guess content type
- content_type = "audio/mpeg" if filename.lower().endswith(".mp3") else "audio/wav"
- with open(file_path, "rb") as f:
- files = {
- "file": (filename, f, content_type),
- "fileType": (None, "VOICE")
- }
- resp = requests.post(upload_url, files=files, timeout=30)
- resp.raise_for_status()
- # Try to parse JSON for a URL; fallback to raw text if JSON invalid
- url: Optional[str] = None
- try:
- data = resp.json()
- url = _extract_url_from_response(data)
- except Exception:
- pass
- if not url:
- # As a last resort, if the response text looks like a URL, use it
- txt = (resp.text or "").strip()
- if txt.startswith("http"):
- url = txt
- if not url:
- raise RuntimeError("Upload succeeded but no URL found in response")
- return url
|