123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import base64
- import hashlib
- import hmac
- import json
- import time
- import uuid
- from datetime import datetime
- from urllib import parse
- import requests
- from loguru import logger
- from utils.config import OssConfig
- class AccessToken:
- @staticmethod
- def _encode_text(text):
- encoded_text = parse.quote_plus(text)
- return encoded_text.replace('+', '%20').replace('*', '%2A').replace('%7E', '~')
- @staticmethod
- def _encode_dict(dic):
- keys = dic.keys()
- dic_sorted = [(key, dic[key]) for key in sorted(keys)]
- encoded_text = parse.urlencode(dic_sorted)
- return encoded_text.replace('+', '%20').replace('*', '%2A').replace('%7E', '~')
- @staticmethod
- def create_token(access_key_id, access_key_secret):
- """生成访问令牌"""
- parameters = {
- 'AccessKeyId': access_key_id,
- 'Action': 'CreateToken',
- 'Format': 'JSON',
- 'RegionId': 'cn-shanghai',
- 'SignatureMethod': 'HMAC-SHA1',
- 'SignatureNonce': str(uuid.uuid1()),
- 'SignatureVersion': '1.0',
- 'Timestamp': time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
- 'Version': '2019-02-28'
- }
- query_string = AccessToken._encode_dict(parameters)
- string_to_sign = 'GET' + '&' + AccessToken._encode_text('/') + '&' + AccessToken._encode_text(query_string)
- secreted_string = hmac.new(
- bytes(access_key_secret + '&', encoding='utf-8'),
- bytes(string_to_sign, encoding='utf-8'),
- hashlib.sha1
- ).digest()
- signature = base64.b64encode(secreted_string).decode('utf-8')
- signature = AccessToken._encode_text(signature)
- full_url = 'http://nls-meta.cn-shanghai.aliyuncs.com/?Signature=%s&%s' % (signature, query_string)
- try:
- response = requests.get(full_url)
- response.raise_for_status()
- root_obj = response.json()
- if 'Token' in root_obj:
- token = root_obj['Token']['Id']
- expire_time = root_obj['Token']['ExpireTime']
- return token, expire_time
- except requests.exceptions.RequestException as e:
- logger.error(f"获取Token失败: {e}")
- logger.error(f"获取Token失败: {response.text}")
- return None, None
- class TtsHeader:
- def __init__(self, appkey, token):
- self.appkey = appkey
- self.token = token
- def to_dict(self):
- return {'appkey': self.appkey, 'token': self.token}
- class TtsContext:
- def __init__(self, device_id):
- self.device_id = device_id
- def to_dict(self):
- return {'device_id': self.device_id}
- class TtsRequest:
- def __init__(self, voice, sample_rate, format, enable_subtitle, text):
- self.voice = voice
- self.sample_rate = sample_rate
- self.format = format
- self.enable_subtitle = enable_subtitle
- self.text = text
- def to_dict(self):
- return {
- 'voice': self.voice,
- 'sample_rate': self.sample_rate,
- 'format': self.format,
- 'enable_subtitle': self.enable_subtitle,
- 'text': self.text
- }
- class TtsPayload:
- def __init__(self, enable_notify, notify_url, tts_request):
- self.enable_notify = enable_notify
- self.notify_url = notify_url
- self.tts_request = tts_request
- def to_dict(self):
- return {
- 'enable_notify': self.enable_notify,
- 'notify_url': self.notify_url,
- 'tts_request': self.tts_request.to_dict()
- }
- class TtsBody:
- def __init__(self, tts_header, tts_context, tts_payload):
- self.tts_header = tts_header
- self.tts_context = tts_context
- self.tts_payload = tts_payload
- def to_dict(self):
- return {
- 'header': self.tts_header.to_dict(),
- 'context': self.tts_context.to_dict(),
- 'payload': self.tts_payload.to_dict()
- }
- class AliyunTTS:
- def __init__(self):
- self.access_key_id = OssConfig["OSS_ACCESS_KEY_ID"]
- self.access_key_secret = OssConfig["OSS_ACCESS_KEY_SECRET"]
- self.app_key = OssConfig["APP_KEY"]
- self.token = None
- self.expire_time = None
- def get_token(self):
- """获取并缓存访问令牌"""
- if not self.token or time.time() + 60 > self.expire_time:
- self.token, self.expire_time = AccessToken.create_token(
- self.access_key_id, self.access_key_secret
- )
- if self.token:
- logger.info(f"获取Token成功,有效期至: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.expire_time))}")
- else:
- logger.error("获取Token失败")
- return self.token
- def synthesize(self, text, voice="xiaoyun", format="mp3", sample_rate=16000, use_polling=True):
- """
- 当 use_polling=False 时,阿里云语音合成会采用 回调模式 而非轮询模式。此时,需要提供一个 公网可访问的回调 URL,
- 阿里云会在语音合成完成后主动发送请求到该 URL,通知合成结果。
- """
- """长文本语音合成"""
- token = self.get_token()
- if not token:
- return None
- th = TtsHeader(self.app_key, token)
- tc = TtsContext("mydevice")
- tr = TtsRequest(voice, sample_rate, format, False, text)
- notify_url = "" if use_polling else "http://your-public-server.com/tts-callback"
- tp = TtsPayload(use_polling, notify_url, tr)
- tb = TtsBody(th, tc, tp)
- body = json.dumps(tb.to_dict())
- polling_url = "https://nls-gateway.cn-shanghai.aliyuncs.com/rest/v1/tts/async"
- return request_long_tts(body, self.app_key, token, use_polling, polling_url)
- def request_long_tts(tts_body, appkey, token, use_polling=True, polling_url=None):
- """发送长文本语音合成请求"""
- url = 'https://nls-gateway.cn-shanghai.aliyuncs.com/rest/v1/tts/async'
- headers = {'Content-Type': 'application/json'}
- try:
- response = requests.post(url, data=tts_body, headers=headers)
- response.raise_for_status()
- json_data = response.json()
- if "error_code" in json_data and json_data["error_code"] == 20000000:
- task_id = json_data['data']['task_id']
- request_id = json_data['request_id']
- logger.info(f"语音合成任务已提交,task_id: {task_id}")
- if use_polling and polling_url:
- return wait_loop_for_complete(polling_url, appkey, token, task_id, request_id)
- return task_id, request_id
- else:
- logger.error(f"请求失败: {json_data}")
- return None, None
- except requests.exceptions.RequestException as e:
- logger.error(f'请求异常: {e}')
- return None, None
- def wait_loop_for_complete(url, appkey, token, task_id, request_id, max_retries=30):
- """轮询等待合成完成"""
- full_url = f"{url}?appkey={appkey}&task_id={task_id}&token={token}&request_id={request_id}"
- logger.info(f"开始轮询任务状态: {task_id}")
- for retries in range(max_retries):
- try:
- response = requests.get(full_url)
- response.raise_for_status()
- json_data = response.json()
- if "data" in json_data and "audio_address" in json_data["data"]:
- audio_address = json_data["data"]["audio_address"]
- if audio_address:
- logger.info(f"合成完成! audio_address = {audio_address}")
- return audio_address
- else:
- logger.info(f"第 {retries + 1}/{max_retries} 次轮询: 合成中...")
- elif "error_code" in json_data and json_data["error_code"] != 20000000:
- logger.error(f"合成失败: {json_data.get('error_message', '未知错误')}")
- return None
- except requests.exceptions.RequestException as e:
- logger.warning(f"轮询请求异常: {e}")
- time.sleep(10)
- logger.warning(f"已达到最大轮询次数({max_retries}),任务可能仍在处理中")
- return None
- if __name__ == "__main__":
- # 准备请求文本
- tts_text = """生活中总有一些故事能让我们感受到温暖和智慧,赵元任的传奇经历就是这样一个值得分享的好故事..."""
- tts_client = AliyunTTS()
- # 语音合成
- logger.info("开始语音合成...")
- mp3_url = tts_client.synthesize(tts_text)
- logger.info(f"合成的url: {mp3_url}")
- if not mp3_url:
- logger.error("语音合成失败,程序退出")
|