Browse Source

长文本语音合成

zhangliang 2 weeks ago
parent
commit
8875bfada0
6 changed files with 353 additions and 5 deletions
  1. 53 0
      utils/aliyun_oss.py
  2. 9 0
      utils/config.py
  3. 2 0
      utils/gpt4o_mini_help.py
  4. 250 0
      utils/long_tts_client.py
  5. 36 5
      utils/tts_help.py
  6. 3 0
      workers/consumption_work.py

+ 53 - 0
utils/aliyun_oss.py

@@ -5,6 +5,9 @@ import uuid
 from datetime import datetime
 from typing import Dict, Any,  Optional
 import oss2
+from oss2 import Auth, Bucket, exceptions
+from loguru import logger
+from utils.config import OssConfig
 import requests
 OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm"
 OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
@@ -129,6 +132,56 @@ class Oss():
         time.sleep(5)
         return video_path
 
+    @classmethod
+    def upload_to_aliyun(cls,local_path):
+        """
+        上传文件到阿里云 OSS
+
+        参数:
+        local_path (str): 本地文件路径
+        oss_path (str): OSS 目标路径
+
+        返回:
+        bool: 上传成功返回 True,失败返回 False
+        """
+        try:
+            oss_path = generate_oss_path()
+            logger.info(f"开始上传到阿里云 OSS: {oss_path}")
+
+
+            # 初始化认证和存储空间
+            auth = Auth(OssConfig["OSS_ACCESS_KEY_ID"], OssConfig["OSS_ACCESS_KEY_SECRET"])
+            bucket = Bucket(auth, OssConfig["ENDPOINT"], OssConfig["BUCKETNAME"])
+
+            # 上传文件
+            result = bucket.put_object_from_file(oss_path, local_path)
+
+            # 检查上传结果
+            if result.status == 200:
+                logger.info(f"上传成功,OSS 路径: {oss_path}")
+                return OssConfig["BUCKETNAME_HOST"]+oss_path
+            else:
+                logger.error(f"上传失败,状态码: {result.status}")
+                return False
+
+        except exceptions.OssError as e:
+            logger.error(f"阿里云 OSS 错误: {e}")
+            return False
+        except Exception as e:
+            logger.error(f"上传异常: {e}")
+            return False
+
+def generate_oss_path():
+    """生成唯一OSS路径(日期+UUID)"""
+    # 生成日期目录(格式:YYYYMMDD)
+    date_dir = datetime.now().strftime("%Y%m%d")
+    # 生成随机文件名(使用 UUID)
+    random_filename = f"{uuid.uuid4().hex}.mp3"
+    # 构建完整的 OSS 路径
+    oss_base_dir = OssConfig["OSS_BASE_DIR"]
+    oss_path = f"{oss_base_dir}/{date_dir}/{random_filename}"
+    return f"{oss_path}"
+
 
 if __name__ == '__main__':
     Oss.download_sph_ls('channel/video/sph/14374775553517295881.jpg','asa','1')

+ 9 - 0
utils/config.py

@@ -0,0 +1,9 @@
+OssConfig = {
+        "OSS_ACCESS_KEY_ID": "LTAIP6x1l3DXfSxm",
+        "OSS_ACCESS_KEY_SECRET": "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon",
+        "APP_KEY": "bCRfnYllFl1aTEEH",
+        "ENDPOINT": "oss-cn-hangzhou.aliyuncs.com",
+        "BUCKETNAME": "clipres",
+        "BUCKETNAME_HOST": "http://clipres.yishihui.com/",
+        "OSS_BASE_DIR": "longvideo/crawler/longvoice/prod"
+}

+ 2 - 0
utils/gpt4o_mini_help.py

@@ -306,6 +306,8 @@ class GPT4oMini:
                 data = json.loads(response_data.get('data', '{}'))
                 new_pw = data["新片尾"]
                 if new_pw:
+                    # 去除换行符和多余空格
+                    new_pw = ' '.join(new_pw.split())
                     logger.info(f"[+] 生成片尾引导:{new_pw}")
                     return new_pw
                 else:

+ 250 - 0
utils/long_tts_client.py

@@ -0,0 +1,250 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import base64
+import hashlib
+import hmac
+import json
+import time
+import uuid
+from datetime import datetime
+from urllib import parse
+import requests
+from loguru import logger
+from utils.config import OssConfig
+
+
+class AccessToken:
+    @staticmethod
+    def _encode_text(text):
+        encoded_text = parse.quote_plus(text)
+        return encoded_text.replace('+', '%20').replace('*', '%2A').replace('%7E', '~')
+
+    @staticmethod
+    def _encode_dict(dic):
+        keys = dic.keys()
+        dic_sorted = [(key, dic[key]) for key in sorted(keys)]
+        encoded_text = parse.urlencode(dic_sorted)
+        return encoded_text.replace('+', '%20').replace('*', '%2A').replace('%7E', '~')
+
+    @staticmethod
+    def create_token(access_key_id, access_key_secret):
+        """生成访问令牌"""
+        parameters = {
+            'AccessKeyId': access_key_id,
+            'Action': 'CreateToken',
+            'Format': 'JSON',
+            'RegionId': 'cn-shanghai',
+            'SignatureMethod': 'HMAC-SHA1',
+            'SignatureNonce': str(uuid.uuid1()),
+            'SignatureVersion': '1.0',
+            'Timestamp': time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+            'Version': '2019-02-28'
+        }
+
+        query_string = AccessToken._encode_dict(parameters)
+        string_to_sign = 'GET' + '&' + AccessToken._encode_text('/') + '&' + AccessToken._encode_text(query_string)
+
+        secreted_string = hmac.new(
+            bytes(access_key_secret + '&', encoding='utf-8'),
+            bytes(string_to_sign, encoding='utf-8'),
+            hashlib.sha1
+        ).digest()
+
+        signature = base64.b64encode(secreted_string).decode('utf-8')
+        signature = AccessToken._encode_text(signature)
+
+        full_url = 'http://nls-meta.cn-shanghai.aliyuncs.com/?Signature=%s&%s' % (signature, query_string)
+
+        try:
+            response = requests.get(full_url)
+            response.raise_for_status()
+            root_obj = response.json()
+            if 'Token' in root_obj:
+                token = root_obj['Token']['Id']
+                expire_time = root_obj['Token']['ExpireTime']
+                return token, expire_time
+        except requests.exceptions.RequestException as e:
+            logger.error(f"获取Token失败: {e}")
+
+        logger.error(f"获取Token失败: {response.text}")
+        return None, None
+
+
+class TtsHeader:
+    def __init__(self, appkey, token):
+        self.appkey = appkey
+        self.token = token
+
+    def to_dict(self):
+        return {'appkey': self.appkey, 'token': self.token}
+
+
+class TtsContext:
+    def __init__(self, device_id):
+        self.device_id = device_id
+
+    def to_dict(self):
+        return {'device_id': self.device_id}
+
+
+class TtsRequest:
+    def __init__(self, voice, sample_rate, format, enable_subtitle, text):
+        self.voice = voice
+        self.sample_rate = sample_rate
+        self.format = format
+        self.enable_subtitle = enable_subtitle
+        self.text = text
+
+    def to_dict(self):
+        return {
+            'voice': self.voice,
+            'sample_rate': self.sample_rate,
+            'format': self.format,
+            'enable_subtitle': self.enable_subtitle,
+            'text': self.text
+        }
+
+
+class TtsPayload:
+    def __init__(self, enable_notify, notify_url, tts_request):
+        self.enable_notify = enable_notify
+        self.notify_url = notify_url
+        self.tts_request = tts_request
+
+    def to_dict(self):
+        return {
+            'enable_notify': self.enable_notify,
+            'notify_url': self.notify_url,
+            'tts_request': self.tts_request.to_dict()
+        }
+
+
+class TtsBody:
+    def __init__(self, tts_header, tts_context, tts_payload):
+        self.tts_header = tts_header
+        self.tts_context = tts_context
+        self.tts_payload = tts_payload
+
+    def to_dict(self):
+        return {
+            'header': self.tts_header.to_dict(),
+            'context': self.tts_context.to_dict(),
+            'payload': self.tts_payload.to_dict()
+        }
+
+
+class AliyunTTS:
+    def __init__(self):
+        self.access_key_id = OssConfig["OSS_ACCESS_KEY_ID"]
+        self.access_key_secret = OssConfig["OSS_ACCESS_KEY_SECRET"]
+        self.app_key = OssConfig["APP_KEY"]
+        self.token = None
+        self.expire_time = None
+
+    def get_token(self):
+        """获取并缓存访问令牌"""
+        if not self.token or time.time() + 60 > self.expire_time:
+            self.token, self.expire_time = AccessToken.create_token(
+                self.access_key_id, self.access_key_secret
+            )
+            if self.token:
+                logger.info(f"获取Token成功,有效期至: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.expire_time))}")
+            else:
+                logger.error("获取Token失败")
+        return self.token
+
+    def synthesize(self, text, voice="xiaoyun", format="mp3", sample_rate=16000, use_polling=True):
+        """
+        当 use_polling=False 时,阿里云语音合成会采用 回调模式 而非轮询模式。此时,需要提供一个 公网可访问的回调 URL,
+        阿里云会在语音合成完成后主动发送请求到该 URL,通知合成结果。
+        """
+        """长文本语音合成"""
+        token = self.get_token()
+        if not token:
+            return None
+
+        th = TtsHeader(self.app_key, token)
+        tc = TtsContext("mydevice")
+        tr = TtsRequest(voice, sample_rate, format, False, text)
+
+        notify_url = "" if use_polling else "http://your-public-server.com/tts-callback"
+        tp = TtsPayload(use_polling, notify_url, tr)
+        tb = TtsBody(th, tc, tp)
+
+        body = json.dumps(tb.to_dict())
+        polling_url = "https://nls-gateway.cn-shanghai.aliyuncs.com/rest/v1/tts/async"
+
+        return request_long_tts(body, self.app_key, token, use_polling, polling_url)
+
+
+def request_long_tts(tts_body, appkey, token, use_polling=True, polling_url=None):
+    """发送长文本语音合成请求"""
+    url = 'https://nls-gateway.cn-shanghai.aliyuncs.com/rest/v1/tts/async'
+    headers = {'Content-Type': 'application/json'}
+
+    try:
+        response = requests.post(url, data=tts_body, headers=headers)
+        response.raise_for_status()
+
+        json_data = response.json()
+        if "error_code" in json_data and json_data["error_code"] == 20000000:
+            task_id = json_data['data']['task_id']
+            request_id = json_data['request_id']
+            logger.info(f"语音合成任务已提交,task_id: {task_id}")
+
+            if use_polling and polling_url:
+                return wait_loop_for_complete(polling_url, appkey, token, task_id, request_id)
+
+            return task_id, request_id
+        else:
+            logger.error(f"请求失败: {json_data}")
+            return None, None
+
+    except requests.exceptions.RequestException as e:
+        logger.error(f'请求异常: {e}')
+        return None, None
+
+
+def wait_loop_for_complete(url, appkey, token, task_id, request_id, max_retries=30):
+    """轮询等待合成完成"""
+    full_url = f"{url}?appkey={appkey}&task_id={task_id}&token={token}&request_id={request_id}"
+    logger.info(f"开始轮询任务状态: {task_id}")
+
+    for retries in range(max_retries):
+        try:
+            response = requests.get(full_url)
+            response.raise_for_status()
+            json_data = response.json()
+
+            if "data" in json_data and "audio_address" in json_data["data"]:
+                audio_address = json_data["data"]["audio_address"]
+                if audio_address:
+                    logger.info(f"合成完成! audio_address = {audio_address}")
+                    return audio_address
+                else:
+                    logger.info(f"第 {retries + 1}/{max_retries} 次轮询: 合成中...")
+            elif "error_code" in json_data and json_data["error_code"] != 20000000:
+                logger.error(f"合成失败: {json_data.get('error_message', '未知错误')}")
+                return None
+
+        except requests.exceptions.RequestException as e:
+            logger.warning(f"轮询请求异常: {e}")
+
+        time.sleep(10)
+
+    logger.warning(f"已达到最大轮询次数({max_retries}),任务可能仍在处理中")
+    return None
+
+
+
+if __name__ == "__main__":
+    # 准备请求文本
+    tts_text = """生活中总有一些故事能让我们感受到温暖和智慧,赵元任的传奇经历就是这样一个值得分享的好故事..."""
+    tts_client = AliyunTTS()
+
+    # 语音合成
+    logger.info("开始语音合成...")
+    mp3_url = tts_client.synthesize(tts_text)
+    logger.info(f"合成的url:  {mp3_url}")
+    if not mp3_url:
+        logger.error("语音合成失败,程序退出")

+ 36 - 5
utils/tts_help.py

@@ -5,8 +5,9 @@ import json
 import random
 import re
 import time
-
-
+from utils.long_tts_client import AliyunTTS
+from utils.aliyun_oss import Oss
+from loguru import logger
 
 class TTS:
     @classmethod
@@ -34,6 +35,7 @@ class TTS:
             try:
                 response = requests.request("POST", url, headers=headers, data=payload, timeout=60)
                 response = response.json()
+                print(response)
                 code = response["code"]
                 if code == 0:
                     mp3 = response["data"]
@@ -184,6 +186,18 @@ class TTS:
             return None
 
 
+    # 长文本语音生成
+    @classmethod
+    def get_lone_pw_zm(cls, text, voice, file_path):
+        # 阿里云根据文本生成语音
+        mps_url = AliyunTTS().synthesize(text, voice)
+        if not mps_url:
+            return
+        pw_mp3_path = TTS.download_mp3(mps_url, file_path)
+        if not pw_mp3_path:
+            return
+        return Oss.upload_to_aliyun(pw_mp3_path)
+
 if __name__ == '__main__':
     # text = "真是太实用了,分享给身边的准妈妈们吧!这些孕期禁忌一定要记住,赶紧转发给更多人,帮助更多的宝妈们。一起为宝宝的健康加油!"
     # mp3 = TTS.get_pw_zm(text)
@@ -198,10 +212,27 @@ if __name__ == '__main__':
     # ]
     # subprocess.run(command)
     # print("完成")
-    video_file = 'http://clipres.yishihui.com/longvideo/crawler/voice/pre/20240821/37fbb8cfc7f1439b8d8a032a1d01d37f1724219959925.mp3'
-    TTS.getSrt(video_file)
+    # video_file = 'http://clipres.yishihui.com/longvideo/crawler/voice/pre/20240821/37fbb8cfc7f1439b8d8a032a1d01d37f1724219959925.mp3'
+    # TTS.getSrt(video_file)
     # result = subprocess.run(
     #     ["ffprobe", "-v", "error", "-show_entries", "format=duration",
     #      "-of", "default=noprint_wrappers=1:nokey=1", video_file],
     #     capture_output=True, text=True)
-    # print(float(result.stdout))
+    # print(float(result.stdout))
+    voice = "zhifeng_emo"
+    localpath = "/Users/zhangliang/Downloads/47f26a"
+    # pw_srt_text = "讲述张鹏程校长爷爷的故事,不仅让人感动,更让我们看到教育的力量和责任。这样的故事值得让更多人知道,您可以通过下方黄色按钮,把这段感人的视频分享至您的家庭群、老乡群、朋友群,让身边的人一同感受这份温暖和感动。每一次转发,都是在传递这份爱与责任。也请您点击下方绿色按钮,发给三五个好友,尤其是那些有孩子的朋友们,让他们也了解到这位校长爷爷的无私奉献。张鹏程用自己的行动告诉我们,教育不仅仅是知识的传授,更是心灵的关怀,让孩子们在爱的环境中健康成长。您分享一次,可能会启发更多人,让我们共同传递这份爱心,让更多孩子在爱的阳光下茁壮成长。请立即行动起来,分享这个故事,让更多的人加入到关心教育、关爱孩子的行列中来。每一次转发,都是在为孩子们的未来贡献一份力量。让我们一起把这份美好传播开去!"
+    # pw_srt_text = "生活中总有一些故事能让我们感受到温暖和智慧,赵元任的传奇经历就是这样一个值得分享的好故事。各位朋友,看到这里请您立即点击下方的黄色按钮,把这个视频分享至您所在的家庭群、邻里群和兴趣群,让更多人了解这位天才的事迹,激励我们每一个人追求更好的生活。同时,也请您点击绿色按钮,直接发给您身边的朋友们,让他们一起感受这份智慧。每一次分享,都是在传播正能量,鼓励更多人去探索和学习。赵元任不仅是一位语言天才,更是教育的先行者,他用自己的方式告诉我们,学习不仅要有趣,更要通过实践来获得真正的成长。您觉得这条视频内容对您有帮助吗?如果有,请不要犹豫,赶紧分享给更多的朋友和群组,让这些感人的故事温暖每一个人的心。您的每一次分享,都是在让更多人了解到知识的力量和爱的深沉。让我们一起在这个温暖的秋季,传播智慧,分享爱,携手共进,创造更美好的未来!"
+    # pw_srt_text = "在这个充满温情与希望的故事中,张鹏程老师用自己的行动改变了无数孩子的命运,您是否也想为这样的善举点赞呢?请您现在就点击下方黄色按钮,分享到您的家庭群、同学群和社区群,让更多人看到张老师的感人故事,感受到教育的力量。同时,别忘了点击绿色按钮,将这份感动分享给您的好友,让他们也能一起传递这份爱与责任。每一次转发,都是对张鹏程老师的支持,也是对无数孩子未来的关注。您知道吗?这样的故事值得让更多人听到,让更多人感受到教育的温暖和希望。每一个孩子都值得被关爱,他们的未来需要我们共同的努力和关心。转发这个视频,或许就能激励更多人加入到关爱贫困儿童的行列中来,赶紧分享给更多的朋友和群组"
+    pw_srt_text="每一句老话都蕴含着深深的智慧,想必大家看完这个视频后也有很多感触。请您马上点击下方黄色按钮,把这段珍贵的内容分享给您的微信群,让更多的人一起感受这些老祖宗的智慧,让我们的社区更加和谐。也别忘了点击绿色按钮,选几个亲密的朋友,单独发给他们,让大家都来思考一下这些道理。生活中,有些真理是我们需要时刻铭记的,尤其是对于我们的晚年生活来说,能帮助我们更好地面对生活的挑战。每一个分享,都是在传递这份智慧,让更多的人受益。您想想,如果大家都能明白这些道理,生活会变得多么美好。现在就动手,让这段视频在您的朋友圈传开,转发到家庭群、邻里群、老友群,让每个人都能感受到这份智慧的力量。您转发一次,就是在为大家的生活添砖加瓦,让我们共同努力,让这份智慧传播得更远。"
+    # TTS.get_pw_zm(pw_srt_text, voice)
+    pw_srt = TTS.get_lone_pw_zm(pw_srt_text, voice,localpath)
+    print(pw_srt)
+    # print(len(pw_srt_text))
+    # {'code': 0, 'msg': 'success',
+    #  'data': 'http://clipres.yishihui.com/longvideo/crawler/voice/prod/20250516/7bb5e293a0af43b38701419e28a5e3c11747364158315.mp3',
+    #  'redirect': None, 'success': True}
+
+    # mp3_id = "http://clipres.yishihui.com/longvideo/crawler/voice/prod/20250516/7bb5e293a0af43b38701419e28a5e3c11747364158315.mp3"
+    # # mp3_id = "http://nls-cloud-cn-shanghai.oss-cn-shanghai.aliyuncs.com/jupiter-flow/tmp/f02e0751b96b4f4ea03d877e11fee4ae.wav?Expires=1747970050&OSSAccessKeyId=LTAI4G588hXC7P47wauY5e2K&Signature=u8Cn2WKpSv7xlmMWDy4Vzos1nV0%3D"
+    # print(TTS.getSrt(mp3_id))

+ 3 - 0
workers/consumption_work.py

@@ -270,6 +270,9 @@ class ConsumptionRecommend(object):
                     voice = random.choice(voices)
                 else:
                     voice = "zhifeng_emo"
+                # 需要判断文本长度
+                if len(pw_srt_text) > 200:
+                    pw_url = TTS.get_lone_pw_zm(pw_srt_text, voice,file_path)
                 pw_url = TTS.get_pw_zm(pw_srt_text, voice)
                 if not pw_url:
                     logger.error(f"[处理] 数据片尾获取失败")