-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathudp_switch_modes.py
143 lines (124 loc) · 6.62 KB
/
udp_switch_modes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import time,socket,datetime,json
from threading import Event, Thread
from gpiozero import PWMOutputDevice
# Set the to be loaded slots. Has to be full paths or else it won't start on boot!
play1 = "/home/pi/Desktop/whos_afraid/slot_2.json"
# Set the UPD port here
UDP_PORT = 6006
# Set the debug level, 0 = no debug messages, 1 = UDP messages, 2 = inverter messages, 3 = both
DEBUG = 3
# Set the gpio out and in.
# Check pinout.xyz for the black pin numbers aka the board numbers.
# BCM numbering is also possible and are usable as integers.
inv_1 = PWMOutputDevice("BOARD37")
inv_2 = PWMOutputDevice("BOARD35")
inv_3 = PWMOutputDevice("BOARD33")
inv_4 = PWMOutputDevice("BOARD31")
inv_5 = PWMOutputDevice("BOARD29")
inv_6 = PWMOutputDevice("BOARD27")
# Declare variables
data = '' # empty var for incoming data
y = [] # list for the recording to json
rec = 0 # to check for if recording is in progress
global exit # make the exit var global
exit = Event() # link exit to the Event class
playing = False # to check if something is playing
# UDP setup for listening
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setblocking(1)
sock.bind(('', UDP_PORT))
#Set console color for easier UDP income reading
CRED = '\033[91m'
CEND = '\033[0m'
#load composition 1
print(f"{datetime.datetime.now().time()}: opening {play1}")
f = open(play1, "r")
recording = json.loads(f.read())
rec_dict1 = {entry["time"]:entry["values"] for entry in recording}
if DEBUG == 2 or DEBUG == 3:
print(rec_dict1)
last_time1 = list(recording)[-1]["time"]
print(f"{datetime.datetime.now().time()}: {play1} is last for {last_time1} seconds")
# Define a function for the composition playback
def player (dict, last_entry, slot):
global playing
playing = True
print(f"{datetime.datetime.now().time()}: Starting composition from {slot} and will be playing for: {last_entry}")
t0 = time.time()
while not exit.is_set():
t1 = time.time() - t0
t_check = round(t1, 3)
values = dict.get(t_check, None)
if values:
if DEBUG == 2 or DEBUG == 3:
print(f"{datetime.datetime.now().time()} time: {t_check} value: {values}")
inv_1.value = values[0]
inv_2.value = values[1]
inv_3.value = values[2]
inv_4.value = values[3]
inv_5.value = values[4]
inv_6.value = values[5]
if t1 >= last_entry:
print(f"{datetime.datetime.now().time()} done playing {slot}")
t0 = 0
t1 = 0
playing = False
return
exit.wait(0.0001)
while True:
data_raw, addr = sock.recvfrom(1024)
data = data_raw.decode() # My test message is encoded
if DEBUG == 1 or DEBUG == 3:
print(f"{CRED}{datetime.datetime.now().time()} UDP MESSAGE: {data}{CEND}")
if data: # only do something when there's data
decode_list = data.split() # byte decode the incoming list and split in two
if decode_list[0].startswith("PLAY") and not playing: # if the first part of the list starts with "PLAY" and we're not playing already
exit.clear() # clear the Event() so the while loop becomes True
try:
Thread(target=player(rec_dict1, last_time1, play1)).start() # start the player thread
except:
print ("Error: unable to start runner thread. Exit.")
quit()
if decode_list[0].startswith("REC") and not rec: # if the first part of the list starts with "rec" and not recording already
print(decode_list[0],decode_list[1]) # for debug purposes
t0 = time.time() # start the timer
f = open(decode_list[1], 'w') # open or new file with the chosen file in the Max4Live patch
rec = 1
elif decode_list[0].startswith("VALUES"): # if the first part of the list starts with "VALUES"
inv_1.value = float(decode_list[1]) # send the values to the right gpio pin
inv_2.value = float(decode_list[2])
inv_3.value = float(decode_list[3])
inv_4.value = float(decode_list[4])
inv_5.value = float(decode_list[5])
inv_6.value = float(decode_list[6])
if rec: # if recording we're also writing all the values to a dict
t1 = time.time() - t0
x = { # build a dict with the info from UDP
"time": round(t1, 3),
"values":[
float(decode_list[1]),
float(decode_list[2]),
float(decode_list[3]),
float(decode_list[4]),
float(decode_list[5]),
float(decode_list[6])
]
}
y.append(x) # append the dict to the list
elif decode_list[0].startswith("STOP"): # if the list starts with "stop"
if rec: # if we're recording then write everything to a json file
rec = 0 # make sure to be able to record again
json_dump = json.dumps(y, sort_keys=True, ensure_ascii=False) #transfer the list of dicts into a json format
f.write(json_dump) # write it to the file opened in "rec"
f.close() # close the file
print("done writing file")
print(decode_list[0],decode_list[1]) # debug purposes
elif playing: # if we're playing then stop the player
exit.set() # set the while loop to False
print(f"{datetime.datetime.now().time()} stopped playing {play1}")
t0 = 0
t1 = 0
playing = False
elif decode_list[0].startswith("EXIT"): # if the list starts with "exit"
sock.close() # close the socket
exit()