-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbody.py
100 lines (81 loc) · 4.21 KB
/
body.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from flask import Flask, request, jsonify
from naoqi import ALProxy, ALModule, ALBroker
import time
app = Flask(__name__)
nao = True
nao_IP = "nao.local"
nao_port = 9559
sleep_time = 0.01
tts = ALProxy("ALTextToSpeech", nao_IP, nao_port)
tts.setVolume(1.0) # define volume of the robot
animatedSpeech = ALProxy("ALAnimatedSpeech", nao_IP, nao_port)
class AudioCaptureModule(ALModule): # NAOqi module for capturing audio
"""
This is a custom module built on top of NAOqi framework for managing audio capture from NAO's microphones.
It extends ALModule class from the NAOqi framework and defines methods to to start and stop audio capture, process audio data, and retrieve audio chunks.
"""
def __init__(self, name):
ALModule.__init__(self, name)
self.audio_device = ALProxy("ALAudioDevice", nao_IP, nao_port)
self.is_listening = False
self.buffers = []
def start_listening(self):
self.audio_device.setClientPreferences(self.getName(), 16000, 3, 0) # sample rate of 16000 Hz, channelparam 3 (front channel), and a deinterleaving flag of 0 (only relevant for channelparam 0 (all channels))
self.audio_device.subscribe(self.getName())
self.is_listening = True
def stop_listening(self):
self.audio_device.unsubscribe(self.getName())
self.is_listening = False
def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer): # callback method that is triggered whenever new audio data is available
print("received audio data from NAO with the following parameters: nbOfChannels = " + str(nbOfChannels) + ", nbOfSamplesByChannel = " + str(nbOfSamplesByChannel) + ", timeStamp = " + str(timeStamp[0]) + " sec " + str(timeStamp[1]) + " musec" + ", length of inputBuffer = " + str(len(inputBuffer)))
if self.is_listening:
self.buffers.append(inputBuffer)
def get_audio_chunk(self):
if self.buffers:
return self.buffers.pop(0) # return the oldest audio chunk
else:
print("no audio data available")
return None
try:
pythonBroker = ALBroker("pythonBroker", "0.0.0.0", 0, nao_IP, nao_port) # broker connection: essential for communicating between the module and the NAOqi runtime
global AudioCapture
AudioCapture = AudioCaptureModule("AudioCapture") # create an instance of the AudioCaptureModule class
print("AudioCapture module initialized")
except RuntimeError:
print("Error initializing broker!")
exit(1)
# server endpoints ====================================================================================================
@app.route("/talk", methods=["POST"])
def talk():
print("Received a request to talk")
message = request.json.get("message")
animatedSpeech.say(str(message))
return jsonify(success=True)
@app.route("/start_listening", methods=["POST"])
def start_listening():
print("Received a request to start listening, current length of server buffer:", len(AudioCapture.buffers))
AudioCapture.start_listening()
return jsonify(success=True)
@app.route("/stop_listening", methods=["POST"])
def stop_listening():
print("Received a request to stop listening, current length of server buffer:", len(AudioCapture.buffers))
AudioCapture.stop_listening()
return jsonify(success=True)
@app.route("/get_audio_chunk", methods=["GET"])
def get_audio_chunk():
print("Received a request to get an audio chunk, current length of server buffer:", len(AudioCapture.buffers))
audio_data = AudioCapture.get_audio_chunk()
if audio_data is not None:
return audio_data # send the audio data as a response
else:
print("Server buffer is empty, waiting for audio data...")
while audio_data is None: # wait until audio data is available
audio_data = AudioCapture.get_audio_chunk()
time.sleep(sleep_time)
return audio_data
@app.route("/get_server_buffer_length", methods=["GET"])
def get_server_buffer_length():
print("Received a request to print the length of the server buffer, current length of server buffer:", len(AudioCapture.buffers))
return jsonify(length=len(AudioCapture.buffers))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5004)