import subprocess from datetime import timedelta import requests import json import random import re import time from pydub import AudioSegment from common import Material, Feishu class TTS: @classmethod def get_pw_zm(cls, text, voice): max_retries = 3 for attempt in range(max_retries): url = "http://api.piaoquantv.com/produce-center/speechSynthesis" payload = json.dumps({ "params": { "text": text, "voice": voice, # "vocie": "zhiyuan", "format": "pcm", "volume": 90, "speechRate": 80, "pitchRate": 0 } }) headers = { 'Content-Type': 'application/json' } wait_time = random.uniform(1, 10) time.sleep(wait_time) try: response = requests.request("POST", url, headers=headers, data=payload) response = response.json() code = response["code"] if code == 0: mp3 = response["data"] return mp3 else: if attempt == max_retries - 1: Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇') return None except Exception: if attempt == max_retries - 1: Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇') return None Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇') # @classmethod # def get_pw_zm(cls, text): # max_retries = 3 # for attempt in range(max_retries): # token = Material.get_cookie_data("KsoMsyP2ghleM9tzBfmcEEXBnXg", "U1gySe", "硅语") # url = "https://zh.api.guiji.cn/avatar2c/tool/sec_tts" # # payload = json.dumps({ # "text": text, # "speaker_id": "160" # }) # headers = { # 'accept': 'application/json, text/plain, */*', # 'content-type': 'application/json', # 'cookie': 'anylangIsLogin=true', # 'origin': 'https://app.guiji.cn', # 'pragma': 'no-cache', # 'referer': 'https://app.guiji.cn/', # 'token': token # } # wait_time = random.uniform(5, 20) # time.sleep(wait_time) # try: # proxies = { # "http": "http://t10952018781111:1ap37oc3@d844.kdltps.com:15818", # "https": "http://t10952018781111:1ap37oc3@d844.kdltps.com:15818" # } # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies) # response = response.json() # code = response["code"] # if code == 200: # mp3 = response["data"] # return mp3 # else: # if attempt == max_retries - 1: # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语错误请排查', '张勇') # return None # except Exception: # if attempt == max_retries - 1: # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语错误请排查', '张勇') # return None # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语cookie过期,请及时更换', '张勇') """ 音频下载到本地 """ @classmethod def download_mp3(cls, video_file, video_path_url, pw_random_id): pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3' for i in range(3): payload = {} headers = {} response = requests.request("GET", video_file, headers=headers, data=payload) if response.status_code == 200: # 以二进制写入模式打开文件 with open(f"{pw_mp3_path}", "wb") as file: # 将响应内容写入文件 file.write(response.content) # 增加音频音量 # audio = AudioSegment.from_file(pw_mp3_path) # louder_audio = audio + 22 # louder_audio.export(pw_mp3_path, format="mp3") # time.sleep(5) return pw_mp3_path return '' @classmethod def get_srt_format(cls, pw_srt_text, pw_url_sec): segments = re.split(r'(,|。|!|?)', pw_srt_text) segments = [segments[i] + segments[i + 1] for i in range(0, len(segments) - 1, 2)] pw_url_sec = int(pw_url_sec) + 1 # 确定每段显示时间 num_segments = len(segments) duration_per_segment = pw_url_sec / num_segments srt_content = "" start_time = 0.0 for i, segment in enumerate(segments): end_time = start_time + duration_per_segment srt_content += f"{i + 1}\n" srt_content += f"{int(start_time // 3600):02}:{int((start_time % 3600) // 60):02}:{int(start_time % 60):02},{int((start_time % 1) * 1000):03} --> " srt_content += f"{int(end_time // 3600):02}:{int((end_time % 3600) // 60):02}:{int(end_time % 60):02},{int((end_time % 1) * 1000):03}\n" srt_content += f"{segment.strip()}\n\n" start_time = end_time print(srt_content) return srt_content @classmethod def process_srt(cls, srt): lines = srt.strip().split('\n') processed_lines = [] for line in lines: if re.match(r'^\d+$', line): processed_lines.append(line) elif re.match(r'^\d{2}:\d{2}:\d{2}\.\d{1,3}-->\d{2}:\d{2}:\d{2}\.\d{1,3}$', line): processed_lines.append(line.replace('-->', ' --> ')) else: line = re.sub(r'[,。!?;、]$', '', line) # 添加换行符 processed_lines.append(line + '\n') return '\n'.join(processed_lines) @classmethod def parse_timecode(cls, timecode): h, m, s = map(float, timecode.replace(',', '.').split(':')) return timedelta(hours=h, minutes=m, seconds=s) @classmethod def format_timecode(cls, delta): total_seconds = delta.total_seconds() hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{int(hours):02}:{int(minutes):02}:{seconds:06.3f}".replace('.', ',') @classmethod def split_subtitle(cls, subtitle_string): max_len = 14 lines = subtitle_string.strip().split('\n') subtitles = [] for i in range(0, len(lines), 4): sub_id = int(lines[i].strip()) timecode_line = lines[i + 1].strip() start_time, end_time = timecode_line.split(' --> ') text = lines[i + 2].strip() if re.search(r'[a-zA-Z]', text): text = re.sub(r'[a-zA-Z]', '', text) start_delta = cls.parse_timecode(start_time) end_delta = cls.parse_timecode(end_time) total_duration = (end_delta - start_delta).total_seconds() char_duration = total_duration / len(text) current_start = start_delta for j in range(0, len(text), max_len): segment = text[j:j + max_len] current_end = current_start + timedelta(seconds=char_duration * len(segment)) subtitles.append((sub_id, current_start, current_end, segment)) current_start = current_end sub_id += 1 return subtitles @classmethod def generate_srt(cls, subtitles): srt_content = '' for idx, sub in enumerate(subtitles, start=1): srt_content += f"{idx}\n" srt_content += f"{cls.format_timecode(sub[1])} --> {cls.format_timecode(sub[2])}\n" srt_content += f"{sub[3]}\n\n" return srt_content.strip() @classmethod def getSrt(cls, mp3_id): url = "http://api-internal.piaoquantv.com/produce-center/srt/get/content" payload = json.dumps({ "params": { "resourceChannel": "outer", "videoPath": mp3_id } }) headers = { 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json', 'Accept': '*/*', 'Host': 'api-internal.piaoquantv.com', 'Connection': 'keep-alive' } for i in range(3): try: response = requests.request("POST", url, headers=headers, data=payload, timeout=30) time.sleep(1) data_list = response.json() code = data_list["code"] if code == 0: srt = data_list["data"] if srt: srt = srt.replace("/n", "\n") # srt = re.sub(r'(\w+)([,。!?])', r'\n\n', srt) new_srt = cls.process_srt(srt) result = cls.split_subtitle(new_srt) # 生成SRT格式内容 srt_content = cls.generate_srt(result) return srt_content except Exception as e: continue return None if __name__ == '__main__': # text = "真是太实用了,分享给身边的准妈妈们吧!这些孕期禁忌一定要记住,赶紧转发给更多人,帮助更多的宝妈们。一起为宝宝的健康加油!" # mp3 = TTS.get_pw_zm(text) # print(mp3) # command = [ # 'ffmpeg', # '-i', mp3, # '-q:a', '0', # '-map', 'a', # # '-codec:a', 'libmp3lame', # 指定 MP3 编码器 # "/Users/tzld/Desktop/video_rewriting/path/pw_video.mp3" # ] # subprocess.run(command) # print("完成") video_file = 'http://clipres.yishihui.com/longvideo/crawler/voice/pre/20240821/37fbb8cfc7f1439b8d8a032a1d01d37f1724219959925.mp3' TTS.getSrt(video_file) # result = subprocess.run( # ["ffprobe", "-v", "error", "-show_entries", "format=duration", # "-of", "default=noprint_wrappers=1:nokey=1", video_file], # capture_output=True, text=True) # print(float(result.stdout))