-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserial_connect.py
128 lines (100 loc) · 3.39 KB
/
serial_connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import serial
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
import struct
import numpy as np
from scipy.fft import rfft, rfftfreq
from scipy.signal import find_peaks
import csv
# CHANGE COM PORT AS NECESSARY
COM = "COM9"
BAUD = 115200
def determine_freq():
"""
Perform a Fast Fourier Transform on acquired data
Plot FFT results and print frequency (Hz) of detected peaks
"""
ser = serial.Serial(COM, BAUD)
if ser.is_open is True:
print(f"Serial port successfully configured: {COM} COM at {BAUD} BAUD.")
data = []
times = []
start_time = time.time()
while True:
if time.time() - start_time < 10:
# Read serial data from controller
received = ser.read(4)
decoded_data = struct.unpack('f', received)[0]
data.append(decoded_data)
times.append(time.time() - start_time)
else:
break
N = len(times)
sample_rate = len(times) / (times[-1]-times[0]) # Hz - should be close to 200
print(f'Sampled at {sample_rate} Hz\n')
yf = rfft(np.array(data))
xf = rfftfreq(N, 1 / sample_rate)
pks, _ = find_peaks(np.abs(yf[1:]))
print("Peak Frequencies in Hz:")
for i in pks:
print(xf[i], end=" ")
plt.plot(xf[1:], np.abs(yf[1:]))
plt.xlabel("Hz")
plt.ylabel("Magnitude")
plt.plot(xf[pks], np.abs(yf)[pks], 'rx') # plot detected peaks
plt.show()
def main():
"""
Begin logging data over serial
Plot data in real-time
"""
ser = serial.Serial(COM, BAUD)
if ser.is_open is True:
print(f"Serial port successfully configured: {COM} COM at {BAUD} BAUD.")
# Create figure for plotting
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
index_A = [0] # store trials here (n)
index_B = [0]
data_A = [0] # store mouth data
data_B = [0] # store nose here
def animate(i, idx_A, idx_B, d_A, d_B):
# Read serial data from controller
try:
received = ser.read(4)
decoded_data = struct.unpack('f', received)[0]
except:
print("saving...")
with open(f'{time.time()}.csv', 'w') as f:
writer = csv.writer(f)
writer.writerows(zip(data_A, data_B))
# Update stored data
if decoded_data > 0:
idx_A.append(i)
d_A.append(decoded_data)
else: # sensor B data is always sent as negative from Arduino
idx_B.append(i)
d_B.append(-1.0*decoded_data)
# Limit x and y lists to most recent 20 items
idx_A = idx_A[-20:]
idx_B = idx_B[-20:]
plot_A = d_A[-20:]
plot_B = d_B[-20:]
# Draw
ax.clear()
ax.plot(idx_A, plot_A, label="Mouth")
ax.plot(idx_B, plot_B, label="Nose")
ax.set_xlabel("Index")
ax.set_ylabel("Voltage")
ax.set_title(f"Mouth: {plot_A[-1]:.3f} | Nose: {plot_B[-1]:.3f}")
ax.legend()
# ax.set_ylim(0.85, 1.15)
ax.set_ylim(2.25, 2.75)
# Set up plot to call animate() function periodically
# NOTE: animation frequency MUST be greater than the sampling frequency to compensate for transmission lag
ani = animation.FuncAnimation(fig, animate, fargs=(index_A, index_B, data_A, data_B), interval=50)
plt.show()
if __name__ == "__main__":
# determine_freq()
main()