This repository has been archived by the owner on Jul 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtts_media.py
181 lines (172 loc) · 5.97 KB
/
tts_media.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# Library for TheNewTTS script for Streamlabs Chatbot
# Copyright (C) 2020 Luis Sanchez
import os
import clr
import time
import threading
# Required to download the audio files
clr.AddReference("System.Web")
from System.Web import HttpUtility
from System.Net import WebClient
# Required to run cmd commands without a window and wait for the result
from System.Diagnostics import Process, ProcessStartInfo, ProcessWindowStyle
# Required to play audio files
clr.AddReference("NAudio")
import NAudio
from NAudio.Wave import AudioFileReader, WaveOutEvent, PlaybackState
global player_closed, downloader_closed
player_closed = False
downloader_closed = False
global close_player, close_downloader
close_player= False
close_downloader= False
class Media_Manager:
def __init__(self, settings):
self._downloader = Media_Downloader(settings)
self._player = Media_Player(self._downloader, settings)
def skip(self):
self._player.skip_current = True
def append(self, text):
self._downloader.append(text)
def reload(self):
self.close()
self.__init__(self._sett )
def close(self):
global player_closed, downloader_closed
global close_player, close_downloader
# self._player.close()
# self._downloader.close()
close_player = True
close_downloader = True
# get_parent().Log("Manager", "Waiting for threads...")
self._player._thread.join()
self._downloader._thread.join()
while not player_closed and not downloader_closed:
time.sleep(0.05)
close_player = False
close_downloader = False
player_closed = False
downloader_closed = False
# get_parent().Log("Manager", "Both closed")
class Media_Player:
def __init__(self, media_downloader, settings):
self.media_downloader = media_downloader
self._settings = settings
self._thread = threading.Thread(target=self._play_loop)
self._thread.start()
self.skip_current = False
self._exit = False
def close(self):
self._exit = True
self._thread.join()
def _play_loop(self):
global close_player, player_closed
try:
while True:
if close_player:
player_closed = True
raise ValueError
if len(self.media_downloader.queue) > 0:
started_playing = time.time()
next_tts_path = self.media_downloader.queue.pop(0)
with AudioFileReader(next_tts_path) as reader:
with WaveOutEvent() as device:
device.Init(reader)
device.Play()
self.skip_current = False
while device.PlaybackState == PlaybackState.Playing:
if close_player:
player_closed = True
raise ValueError
if self.skip_current:
self.skip_current = False
break
elapsed = time.time() - started_playing
if elapsed >= self._settings["length"]:
break
time.sleep(0.1)
run_cmd('del "{0}"'.format(next_tts_path))
else:
time.sleep(0.150)
except Exception as e:
player_closed = True
raise ValueError
class Media_Downloader:
def __init__(self, settings):
self._settings = settings
self._queue = []
self._count = 0
self._thread = threading.Thread(target=self._download_async)
self._thread.start()
self.queue = []
self._exit = False
def close(self):
self._exit = True
self._thread.join()
def append(self, text):
self._queue.append(text)
def _download_async(self):
global close_downloader, downloader_closed
try:
while True:
if close_downloader:
downloader_closed = True
raise ValueError
if len(self._queue) > 0:
text = self._queue.pop(0)
file_path = os.path.join(self._settings["_cache"], str(self._count) + ".mp3")
try:
download_tts(file_path, text, self._settings)
if close_downloader:
downloader_closed = True
raise ValueError
process_tts(file_path, self._settings)
self.queue.append(file_path)
self._count += 1
except Exception as e:
get_parent().Log("Media_Downloader", "There was an error downloading this text\n" + str(e))
time.sleep(0.150)
except Exception as e:
# get_parent().Log("Download thread", str(e))
# get_parent().Log("Download thread", str(self.__dict__))
downloader_closed = True
raise ValueError
# [TheNewTTS] Download from Google Translate voice generator using defined TTS language #
def download_tts(file_path, text, settings):
with WebClient() as wc:
url = "https://translate.google.com/translate_tts?ie=UTF-8&tl={1}&client=tw-ob&q={0}".format(
HttpUtility.UrlEncode(text), settings["lang"]
)
wc.Headers["Referer"] = "http://translate.google.com/"
wc.Headers["User-Agent"] = "stagefright/1.2 (Linux;Android 5.0)"
wc.DownloadFile(url, file_path)
# [TheNewTTS] Changes the pitch, speed and volume of downloaded mp3 #
def process_tts(file_path, settings):
temp_mp3 = os.path.join(os.path.dirname(file_path), "processing.mp3")
commands = [
'cd "{0}"'.format(settings["_path"]),
'ffmpeg.exe -t {0} -i "{1}" -af asetrate=24000*{2},atempo={3}/{2},aresample=48000,volume={4} "{5}" -y'.format(
settings["length"],
file_path,
settings["pitch"],
settings["speed"],
settings["volume"],
temp_mp3
),
'del "{0}"'.format(file_path),
'move "{0}" "{1}"'.format(temp_mp3, file_path)
]
run_cmd(" & ".join(commands))
def run_cmd(command):
# os.system(command)
pinfo = ProcessStartInfo()
pinfo.FileName = "cmd.exe"
pinfo.WindowStyle = ProcessWindowStyle.Hidden;
pinfo.Arguments = "/C" + command
cmd = Process.Start(pinfo)
cmd.WaitForExit()
import System
clr.AddReference([asbly for asbly in System.AppDomain.CurrentDomain.GetAssemblies() if "AnkhBotR2" in str(asbly)][0])
import AnkhBotR2
def get_parent():
return AnkhBotR2.Managers.PythonManager()