tts_help.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import subprocess
  2. from datetime import timedelta
  3. import requests
  4. import json
  5. import random
  6. import re
  7. import time
  8. from pydub import AudioSegment
  9. from common import Material, Feishu
  10. class TTS:
  11. @classmethod
  12. def get_pw_zm(cls, text, voice):
  13. max_retries = 3
  14. for attempt in range(max_retries):
  15. url = "http://api.piaoquantv.com/produce-center/speechSynthesis"
  16. payload = json.dumps({
  17. "params": {
  18. "text": text,
  19. "voice": voice,
  20. # "vocie": "zhiyuan",
  21. "format": "pcm",
  22. "volume": 90,
  23. "speechRate": 80,
  24. "pitchRate": 0
  25. }
  26. })
  27. headers = {
  28. 'Content-Type': 'application/json'
  29. }
  30. wait_time = random.uniform(1, 10)
  31. time.sleep(wait_time)
  32. try:
  33. response = requests.request("POST", url, headers=headers, data=payload)
  34. response = response.json()
  35. code = response["code"]
  36. if code == 0:
  37. mp3 = response["data"]
  38. return mp3
  39. else:
  40. if attempt == max_retries - 1:
  41. Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇')
  42. return None
  43. except Exception:
  44. if attempt == max_retries - 1:
  45. Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇')
  46. return None
  47. Feishu.bot("zhangyong", '机器自动改造消息通知', f'获取字幕音频失败,请关注', '张勇')
  48. # @classmethod
  49. # def get_pw_zm(cls, text):
  50. # max_retries = 3
  51. # for attempt in range(max_retries):
  52. # token = Material.get_cookie_data("KsoMsyP2ghleM9tzBfmcEEXBnXg", "U1gySe", "硅语")
  53. # url = "https://zh.api.guiji.cn/avatar2c/tool/sec_tts"
  54. #
  55. # payload = json.dumps({
  56. # "text": text,
  57. # "speaker_id": "160"
  58. # })
  59. # headers = {
  60. # 'accept': 'application/json, text/plain, */*',
  61. # 'content-type': 'application/json',
  62. # 'cookie': 'anylangIsLogin=true',
  63. # 'origin': 'https://app.guiji.cn',
  64. # 'pragma': 'no-cache',
  65. # 'referer': 'https://app.guiji.cn/',
  66. # 'token': token
  67. # }
  68. # wait_time = random.uniform(5, 20)
  69. # time.sleep(wait_time)
  70. # try:
  71. # proxies = {
  72. # "http": "http://t10952018781111:1ap37oc3@d844.kdltps.com:15818",
  73. # "https": "http://t10952018781111:1ap37oc3@d844.kdltps.com:15818"
  74. # }
  75. # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies)
  76. # response = response.json()
  77. # code = response["code"]
  78. # if code == 200:
  79. # mp3 = response["data"]
  80. # return mp3
  81. # else:
  82. # if attempt == max_retries - 1:
  83. # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语错误请排查', '张勇')
  84. # return None
  85. # except Exception:
  86. # if attempt == max_retries - 1:
  87. # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语错误请排查', '张勇')
  88. # return None
  89. # Feishu.bot("zhangyong", '机器自动改造消息通知', f'硅语cookie过期,请及时更换', '张勇')
  90. """
  91. 音频下载到本地
  92. """
  93. @classmethod
  94. def download_mp3(cls, video_file, video_path_url, pw_random_id):
  95. pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
  96. for i in range(3):
  97. payload = {}
  98. headers = {}
  99. response = requests.request("GET", video_file, headers=headers, data=payload)
  100. if response.status_code == 200:
  101. # 以二进制写入模式打开文件
  102. with open(f"{pw_mp3_path}", "wb") as file:
  103. # 将响应内容写入文件
  104. file.write(response.content)
  105. # 增加音频音量
  106. # audio = AudioSegment.from_file(pw_mp3_path)
  107. # louder_audio = audio + 22
  108. # louder_audio.export(pw_mp3_path, format="mp3")
  109. # time.sleep(5)
  110. return pw_mp3_path
  111. return ''
  112. @classmethod
  113. def get_srt_format(cls, pw_srt_text, pw_url_sec):
  114. segments = re.split(r'(,|。|!|?)', pw_srt_text)
  115. segments = [segments[i] + segments[i + 1] for i in range(0, len(segments) - 1, 2)]
  116. pw_url_sec = int(pw_url_sec) + 1
  117. # 确定每段显示时间
  118. num_segments = len(segments)
  119. duration_per_segment = pw_url_sec / num_segments
  120. srt_content = ""
  121. start_time = 0.0
  122. for i, segment in enumerate(segments):
  123. end_time = start_time + duration_per_segment
  124. srt_content += f"{i + 1}\n"
  125. srt_content += f"{int(start_time // 3600):02}:{int((start_time % 3600) // 60):02}:{int(start_time % 60):02},{int((start_time % 1) * 1000):03} --> "
  126. srt_content += f"{int(end_time // 3600):02}:{int((end_time % 3600) // 60):02}:{int(end_time % 60):02},{int((end_time % 1) * 1000):03}\n"
  127. srt_content += f"{segment.strip()}\n\n"
  128. start_time = end_time
  129. print(srt_content)
  130. return srt_content
  131. @classmethod
  132. def process_srt(cls, srt):
  133. lines = srt.strip().split('\n')
  134. processed_lines = []
  135. for line in lines:
  136. if re.match(r'^\d+$', line):
  137. processed_lines.append(line)
  138. elif re.match(r'^\d{2}:\d{2}:\d{2}\.\d{1,3}-->\d{2}:\d{2}:\d{2}\.\d{1,3}$', line):
  139. processed_lines.append(line.replace('-->', ' --> '))
  140. else:
  141. line = re.sub(r'[,。!?;、]$', '', line)
  142. # 添加换行符
  143. processed_lines.append(line + '\n')
  144. return '\n'.join(processed_lines)
  145. @classmethod
  146. def parse_timecode(cls, timecode):
  147. h, m, s = map(float, timecode.replace(',', '.').split(':'))
  148. return timedelta(hours=h, minutes=m, seconds=s)
  149. @classmethod
  150. def format_timecode(cls, delta):
  151. total_seconds = delta.total_seconds()
  152. hours, remainder = divmod(total_seconds, 3600)
  153. minutes, seconds = divmod(remainder, 60)
  154. return f"{int(hours):02}:{int(minutes):02}:{seconds:06.3f}".replace('.', ',')
  155. @classmethod
  156. def split_subtitle(cls, subtitle_string):
  157. max_len = 14
  158. lines = subtitle_string.strip().split('\n')
  159. subtitles = []
  160. for i in range(0, len(lines), 4):
  161. sub_id = int(lines[i].strip())
  162. timecode_line = lines[i + 1].strip()
  163. start_time, end_time = timecode_line.split(' --> ')
  164. text = lines[i + 2].strip()
  165. if re.search(r'[a-zA-Z]', text):
  166. text = re.sub(r'[a-zA-Z]', '', text)
  167. start_delta = cls.parse_timecode(start_time)
  168. end_delta = cls.parse_timecode(end_time)
  169. total_duration = (end_delta - start_delta).total_seconds()
  170. char_duration = total_duration / len(text)
  171. current_start = start_delta
  172. for j in range(0, len(text), max_len):
  173. segment = text[j:j + max_len]
  174. current_end = current_start + timedelta(seconds=char_duration * len(segment))
  175. subtitles.append((sub_id, current_start, current_end, segment))
  176. current_start = current_end
  177. sub_id += 1
  178. return subtitles
  179. @classmethod
  180. def generate_srt(cls, subtitles):
  181. srt_content = ''
  182. for idx, sub in enumerate(subtitles, start=1):
  183. srt_content += f"{idx}\n"
  184. srt_content += f"{cls.format_timecode(sub[1])} --> {cls.format_timecode(sub[2])}\n"
  185. srt_content += f"{sub[3]}\n\n"
  186. return srt_content.strip()
  187. @classmethod
  188. def getSrt(cls, mp3_id):
  189. url = "http://api-internal.piaoquantv.com/produce-center/srt/get/content"
  190. payload = json.dumps({
  191. "params": {
  192. "resourceChannel": "outer",
  193. "videoPath": mp3_id
  194. }
  195. })
  196. headers = {
  197. 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
  198. 'Content-Type': 'application/json',
  199. 'Accept': '*/*',
  200. 'Host': 'api-internal.piaoquantv.com',
  201. 'Connection': 'keep-alive'
  202. }
  203. for i in range(3):
  204. try:
  205. response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
  206. time.sleep(1)
  207. data_list = response.json()
  208. code = data_list["code"]
  209. if code == 0:
  210. srt = data_list["data"]
  211. if srt:
  212. srt = srt.replace("/n", "\n")
  213. # srt = re.sub(r'(\w+)([,。!?])', r'\n\n', srt)
  214. new_srt = cls.process_srt(srt)
  215. result = cls.split_subtitle(new_srt)
  216. # 生成SRT格式内容
  217. srt_content = cls.generate_srt(result)
  218. return srt_content
  219. except Exception as e:
  220. continue
  221. return None
  222. if __name__ == '__main__':
  223. # text = "真是太实用了,分享给身边的准妈妈们吧!这些孕期禁忌一定要记住,赶紧转发给更多人,帮助更多的宝妈们。一起为宝宝的健康加油!"
  224. # mp3 = TTS.get_pw_zm(text)
  225. # print(mp3)
  226. # command = [
  227. # 'ffmpeg',
  228. # '-i', mp3,
  229. # '-q:a', '0',
  230. # '-map', 'a',
  231. # # '-codec:a', 'libmp3lame', # 指定 MP3 编码器
  232. # "/Users/tzld/Desktop/video_rewriting/path/pw_video.mp3"
  233. # ]
  234. # subprocess.run(command)
  235. # print("完成")
  236. video_file = 'http://clipres.yishihui.com/longvideo/crawler/voice/pre/20240821/37fbb8cfc7f1439b8d8a032a1d01d37f1724219959925.mp3'
  237. TTS.getSrt(video_file)
  238. # result = subprocess.run(
  239. # ["ffprobe", "-v", "error", "-show_entries", "format=duration",
  240. # "-of", "default=noprint_wrappers=1:nokey=1", video_file],
  241. # capture_output=True, text=True)
  242. # print(float(result.stdout))