123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import subprocess
- from datetime import timedelta
- import requests
- import json
- import random
- import re
- import time
- from pydub import AudioSegment
- from common import Material, Feishu
- class TTS:
- @classmethod
- def get_pw_zm(cls, text, voice):
- max_retries = 3
- for attempt in range(max_retries):
- url = "http://api.piaoquantv.com/produce-center/speechSynthesis"
- payload = json.dumps({
- "params": {
- "text": text,
- "voice": voice,
- # "vocie": "zhiyuan",
- "format": "pcm",
- "volume": 90,
- "speechRate": 80,
- "pitchRate": 0
- }
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- wait_time = random.uniform(1, 10)
- time.sleep(wait_time)
- try:
- response = requests.request("POST", url, headers=headers, data=payload)
- response = response.json()
- code = response["code"]
- if code == 0:
- mp3 = response["data"]
- return mp3
- else:
- if attempt == max_retries - 1:
- Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇')
- return None
- except Exception:
- if attempt == max_retries - 1:
- Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇')
- return None
- Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇')
- # @classmethod
- # def get_pw_zm(cls, text):
- # max_retries = 3
- # for attempt in range(max_retries):
- # token = Material.get_cookie_data("KsoMsyP2ghleM9tzBfmcEEXBnXg", "U1gySe", "硅语")
- # url = "https://zh.api.guiji.cn/avatar2c/tool/sec_tts"
- #
- # payload = json.dumps({
- # "text": text,
- # "speaker_id": "160"
- # })
- # headers = {
- # 'accept': 'application/json, text/plain, */*',
- # 'content-type': 'application/json',
- # 'cookie': 'anylangIsLogin=true',
- # 'origin': 'https://app.guiji.cn',
- # 'pragma': 'no-cache',
- # 'referer': 'https://app.guiji.cn/',
- # 'token': token
- # }
- # wait_time = random.uniform(5, 20)
- # time.sleep(wait_time)
- # try:
- # proxies = {
- # "http": "http://t10952018781111:1ap37oc3@d844.kdltps.com:15818",
- # "https": "http://t10952018781111:1ap37oc3@d844.kdltps.com:15818"
- # }
- # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies)
- # response = response.json()
- # code = response["code"]
- # if code == 200:
- # mp3 = response["data"]
- # return mp3
- # else:
- # if attempt == max_retries - 1:
- # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语错误请排查', '张勇')
- # return None
- # except Exception:
- # if attempt == max_retries - 1:
- # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语错误请排查', '张勇')
- # return None
- # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语cookie过期,请及时更换', '张勇')
- """
- 音频下载到本地
- """
- @classmethod
- def download_mp3(cls, video_file, video_path_url, pw_random_id):
- pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
- for i in range(3):
- payload = {}
- headers = {}
- response = requests.request("GET", video_file, headers=headers, data=payload)
- if response.status_code == 200:
- # 以二进制写入模式打开文件
- with open(f"{pw_mp3_path}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- # 增加音频音量
- # audio = AudioSegment.from_file(pw_mp3_path)
- # louder_audio = audio + 22
- # louder_audio.export(pw_mp3_path, format="mp3")
- # time.sleep(5)
- return pw_mp3_path
- return ''
- @classmethod
- def get_srt_format(cls, pw_srt_text, pw_url_sec):
- segments = re.split(r'(,|。|!|?)', pw_srt_text)
- segments = [segments[i] + segments[i + 1] for i in range(0, len(segments) - 1, 2)]
- pw_url_sec = int(pw_url_sec) + 1
- # 确定每段显示时间
- num_segments = len(segments)
- duration_per_segment = pw_url_sec / num_segments
- srt_content = ""
- start_time = 0.0
- for i, segment in enumerate(segments):
- end_time = start_time + duration_per_segment
- srt_content += f"{i + 1}\n"
- srt_content += f"{int(start_time // 3600):02}:{int((start_time % 3600) // 60):02}:{int(start_time % 60):02},{int((start_time % 1) * 1000):03} --> "
- srt_content += f"{int(end_time // 3600):02}:{int((end_time % 3600) // 60):02}:{int(end_time % 60):02},{int((end_time % 1) * 1000):03}\n"
- srt_content += f"{segment.strip()}\n\n"
- start_time = end_time
- print(srt_content)
- return srt_content
- @classmethod
- def process_srt(cls, srt):
- lines = srt.strip().split('\n')
- processed_lines = []
- for line in lines:
- if re.match(r'^\d+$', line):
- processed_lines.append(line)
- elif re.match(r'^\d{2}:\d{2}:\d{2}\.\d{1,3}-->\d{2}:\d{2}:\d{2}\.\d{1,3}$', line):
- processed_lines.append(line.replace('-->', ' --> '))
- else:
- line = re.sub(r'[,。!?;、]$', '', line)
- # 添加换行符
- processed_lines.append(line + '\n')
- return '\n'.join(processed_lines)
- @classmethod
- def parse_timecode(cls, timecode):
- h, m, s = map(float, timecode.replace(',', '.').split(':'))
- return timedelta(hours=h, minutes=m, seconds=s)
- @classmethod
- def format_timecode(cls, delta):
- total_seconds = delta.total_seconds()
- hours, remainder = divmod(total_seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
- return f"{int(hours):02}:{int(minutes):02}:{seconds:06.3f}".replace('.', ',')
- @classmethod
- def split_subtitle(cls, subtitle_string):
- max_len = 14
- lines = subtitle_string.strip().split('\n')
- subtitles = []
- for i in range(0, len(lines), 4):
- sub_id = int(lines[i].strip())
- timecode_line = lines[i + 1].strip()
- start_time, end_time = timecode_line.split(' --> ')
- text = lines[i + 2].strip()
- if re.search(r'[a-zA-Z]', text):
- text = re.sub(r'[a-zA-Z]', '', text)
- start_delta = cls.parse_timecode(start_time)
- end_delta = cls.parse_timecode(end_time)
- total_duration = (end_delta - start_delta).total_seconds()
- char_duration = total_duration / len(text)
- current_start = start_delta
- for j in range(0, len(text), max_len):
- segment = text[j:j + max_len]
- current_end = current_start + timedelta(seconds=char_duration * len(segment))
- subtitles.append((sub_id, current_start, current_end, segment))
- current_start = current_end
- sub_id += 1
- return subtitles
- @classmethod
- def generate_srt(cls, subtitles):
- srt_content = ''
- for idx, sub in enumerate(subtitles, start=1):
- srt_content += f"{idx}\n"
- srt_content += f"{cls.format_timecode(sub[1])} --> {cls.format_timecode(sub[2])}\n"
- srt_content += f"{sub[3]}\n\n"
- return srt_content.strip()
- @classmethod
- def getSrt(cls, mp3_id):
- url = "http://api-internal.piaoquantv.com/produce-center/srt/get/content"
- payload = json.dumps({
- "params": {
- "resourceChannel": "outer",
- "videoPath": mp3_id
- }
- })
- headers = {
- 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
- 'Content-Type': 'application/json',
- 'Accept': '*/*',
- 'Host': 'api-internal.piaoquantv.com',
- 'Connection': 'keep-alive'
- }
- for i in range(3):
- try:
- response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
- time.sleep(1)
- data_list = response.json()
- code = data_list["code"]
- if code == 0:
- srt = data_list["data"]
- if srt:
- srt = srt.replace("/n", "\n")
- # srt = re.sub(r'(\w+)([,。!?])', r'\n\n', srt)
- new_srt = cls.process_srt(srt)
- result = cls.split_subtitle(new_srt)
- # 生成SRT格式内容
- srt_content = cls.generate_srt(result)
- return srt_content
- except Exception as e:
- continue
- return None
- if __name__ == '__main__':
- # text = "真是太实用了,分享给身边的准妈妈们吧!这些孕期禁忌一定要记住,赶紧转发给更多人,帮助更多的宝妈们。一起为宝宝的健康加油!"
- # mp3 = TTS.get_pw_zm(text)
- # print(mp3)
- # command = [
- # 'ffmpeg',
- # '-i', mp3,
- # '-q:a', '0',
- # '-map', 'a',
- # # '-codec:a', 'libmp3lame', # 指定 MP3 编码器
- # "/Users/tzld/Desktop/video_rewriting/path/pw_video.mp3"
- # ]
- # subprocess.run(command)
- # print("完成")
- video_file = 'http://clipres.yishihui.com/longvideo/crawler/voice/pre/20240821/37fbb8cfc7f1439b8d8a032a1d01d37f1724219959925.mp3'
- TTS.getSrt(video_file)
- # result = subprocess.run(
- # ["ffprobe", "-v", "error", "-show_entries", "format=duration",
- # "-of", "default=noprint_wrappers=1:nokey=1", video_file],
- # capture_output=True, text=True)
- # print(float(result.stdout))
|