123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- from datetime import timedelta
- import requests
- import json
- import random
- import re
- import time
- class TTS:
- @classmethod
- def get_pw_zm(cls, text, voice):
- max_retries = 3
- for attempt in range(max_retries):
- url = "http://api.piaoquantv.com/produce-center/speechSynthesis"
- payload = json.dumps({
- "params": {
- "text": text,
- "voice": voice,
- # "vocie": "zhiyuan",
- "format": "pcm",
- "volume": 90,
- "speechRate": 80,
- "pitchRate": 0
- }
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- wait_time = random.uniform(1, 10)
- time.sleep(wait_time)
- try:
- response = requests.request("POST", url, headers=headers, data=payload, timeout=60)
- response = response.json()
- code = response["code"]
- if code == 0:
- mp3 = response["data"]
- return mp3
- except Exception:
- if attempt == max_retries - 1:
- return None
- return None
- """
- 音频下载到本地
- """
- @classmethod
- def download_mp3(cls, pw_url, file_path):
- pw_mp3_path = file_path +'pw_video.mp3'
- for i in range(3):
- payload = {}
- headers = {}
- response = requests.request("GET", pw_url, headers=headers, data=payload, timeout= 30)
- if response.status_code == 200:
- # 以二进制写入模式打开文件
- with open(f"{pw_mp3_path}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- return pw_mp3_path
- return None
- @classmethod
- def get_srt_format(cls, pw_srt_text, pw_url_sec):
- segments = re.split(r'(,|。|!|?)', pw_srt_text)
- segments = [segments[i] + segments[i + 1] for i in range(0, len(segments) - 1, 2)]
- pw_url_sec = int(pw_url_sec) + 1
- # 确定每段显示时间
- num_segments = len(segments)
- duration_per_segment = pw_url_sec / num_segments
- srt_content = ""
- start_time = 0.0
- for i, segment in enumerate(segments):
- end_time = start_time + duration_per_segment
- srt_content += f"{i + 1}\n"
- srt_content += f"{int(start_time // 3600):02}:{int((start_time % 3600) // 60):02}:{int(start_time % 60):02},{int((start_time % 1) * 1000):03} --> "
- srt_content += f"{int(end_time // 3600):02}:{int((end_time % 3600) // 60):02}:{int(end_time % 60):02},{int((end_time % 1) * 1000):03}\n"
- srt_content += f"{segment.strip()}\n\n"
- start_time = end_time
- print(srt_content)
- return srt_content
- @classmethod
- def process_srt(cls, srt):
- lines = srt.strip().split('\n')
- processed_lines = []
- for line in lines:
- if re.match(r'^\d+$', line):
- processed_lines.append(line)
- elif re.match(r'^\d{2}:\d{2}:\d{2}\.\d{1,3}-->\d{2}:\d{2}:\d{2}\.\d{1,3}$', line):
- processed_lines.append(line.replace('-->', ' --> '))
- else:
- line = re.sub(r'[,。!?;、]$', '', line)
- # 添加换行符
- processed_lines.append(line + '\n')
- return '\n'.join(processed_lines)
- @classmethod
- def parse_timecode(cls, timecode):
- h, m, s = map(float, timecode.replace(',', '.').split(':'))
- return timedelta(hours=h, minutes=m, seconds=s)
- @classmethod
- def format_timecode(cls, delta):
- total_seconds = delta.total_seconds()
- hours, remainder = divmod(total_seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
- return f"{int(hours):02}:{int(minutes):02}:{seconds:06.3f}".replace('.', ',')
- @classmethod
- def split_subtitle(cls, subtitle_string):
- max_len = 14
- lines = subtitle_string.strip().split('\n')
- subtitles = []
- for i in range(0, len(lines), 4):
- sub_id = int(lines[i].strip())
- timecode_line = lines[i + 1].strip()
- start_time, end_time = timecode_line.split(' --> ')
- text = lines[i + 2].strip()
- if re.search(r'[a-zA-Z]', text):
- text = re.sub(r'[a-zA-Z]', '', text)
- start_delta = cls.parse_timecode(start_time)
- end_delta = cls.parse_timecode(end_time)
- total_duration = (end_delta - start_delta).total_seconds()
- char_duration = total_duration / len(text)
- current_start = start_delta
- for j in range(0, len(text), max_len):
- segment = text[j:j + max_len]
- current_end = current_start + timedelta(seconds=char_duration * len(segment))
- subtitles.append((sub_id, current_start, current_end, segment))
- current_start = current_end
- sub_id += 1
- return subtitles
- @classmethod
- def generate_srt(cls, subtitles):
- srt_content = ''
- for idx, sub in enumerate(subtitles, start=1):
- srt_content += f"{idx}\n"
- srt_content += f"{cls.format_timecode(sub[1])} --> {cls.format_timecode(sub[2])}\n"
- srt_content += f"{sub[3]}\n\n"
- return srt_content.strip()
- @classmethod
- def getSrt(cls, mp3_id):
- url = "http://api.piaoquantv.com/produce-center/srt/get/content"
- payload = json.dumps({
- "params": {
- "resourceChannel": "outer",
- "videoPath": mp3_id
- }
- })
- headers = {
- 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
- 'Content-Type': 'application/json',
- 'Accept': '*/*',
- 'Host': 'api-internal.piaoquantv.com',
- 'Connection': 'keep-alive'
- }
- response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
- time.sleep(1)
- data_list = response.json()
- code = data_list["code"]
- if code == 0:
- srt = data_list["data"]
- if srt:
- srt = srt.replace("/n", "\n")
- new_srt = cls.process_srt(srt)
- result = cls.split_subtitle(new_srt)
- # 生成SRT格式内容
- srt_content = cls.generate_srt(result)
- return srt_content
- else:
- return None
- else:
- return None
- if __name__ == '__main__':
- # text = "真是太实用了,分享给身边的准妈妈们吧!这些孕期禁忌一定要记住,赶紧转发给更多人,帮助更多的宝妈们。一起为宝宝的健康加油!"
- # mp3 = TTS.get_pw_zm(text)
- # print(mp3)
- # command = [
- # 'ffmpeg',
- # '-i', mp3,
- # '-q:a', '0',
- # '-map', 'a',
- # # '-codec:a', 'libmp3lame', # 指定 MP3 编码器
- # "/Users/tzld/Desktop/video_rewriting/path/pw_video.mp3"
- # ]
- # subprocess.run(command)
- # print("完成")
- video_file = 'http://clipres.yishihui.com/longvideo/crawler/voice/pre/20240821/37fbb8cfc7f1439b8d8a032a1d01d37f1724219959925.mp3'
- TTS.getSrt(video_file)
- # result = subprocess.run(
- # ["ffprobe", "-v", "error", "-show_entries", "format=duration",
- # "-of", "default=noprint_wrappers=1:nokey=1", video_file],
- # capture_output=True, text=True)
- # print(float(result.stdout))
|