-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanemometer_test.py
290 lines (244 loc) · 10.8 KB
/
anemometer_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""
COPYRIGHT © 2018 BY DATAQ INSTRUMENTS, INC.
!!!!!!!! VERY IMPORTANT !!!!!!!!
!!!!!!!! READ THIS FIRST !!!!!!!!
This program works only with the following models:
DI-1120, -2108, -4108, -4208, AND -4718B
Any other instrument model should be disconnected from the PC
to prevent the program from detecting a device with a DATAQ
Instruments VID and attempting to use it.
Such attempts will fail.
You'll need to uncomment the appropriate
slist
analog_ranges
tuples for the instrument model you will use with this program.
Prototypes are provided for each supported model, so it's a
simple matter of commenting the ones that don't apply, and
uncommenting the one that does. In its as-delivered state
the program assumes that model DI-2108 is connected.
Instruments used with this program MUST be placed in their
CDC communication mode.
Follow this link for guidance:
https://www.dataq.com/blog/data-acquisition/usb-daq-products-support-libusb-cdc/
Any specific instrument's protocol document can be downloaded from the instrument's
product page: Once there, click the DETAILS tab.
"""
import serial
import serial.tools.list_ports
import keyboard
import time
import time
import csv
"""
Uncomment the slist tuple depending upon the hardware model you're using.
You can modify the example tuples to change the measurement configuration
as needed. Refer to the instrument's protocol for details. Note that
only one slist tuple can be enabled at a time.
slist for model DI-2108
0x0000 = Analog channel 0, ±10 V range
0x0001 = Analog channel 1, ±10 V range
0x0709 = Rate input, 0-500 Hz range
0x000A = Counter input
"""
slist = [0x0000,0x0001]
"""
Uncomment an analog_ranges tuple depending upon the hardware model you're using.
Note that only one can be enabled at a time.
The first item in the tuple is the lowest gain code (e.g. ±100 V range = gain code 0)
for the DI-4208. Some instrument models do not support programmable gain, so
their tuples contain only one value (e.g. model DI-2108.)
"""
# Analog ranges for model DI-2108 (fixed ±10 V measurement range)
analog_ranges = [10]
"""
Define a tuple that contains an ordered list of rate measurement ranges supported by the hardware.
The first item in the list is the lowest gain code (e.g. 50 kHz range = gain code 1).
"""
rate_ranges = tuple((50000,20000,10000,5000,2000,1000,500,200,100,50,20,10))
# This is a list of analog and rate ranges to apply in slist order
range_table = list(())
ser=serial.Serial()
# Define flag to indicate if acquiring is active
acquiring = False
""" Discover DATAQ Instruments devices and models. Note that if multiple devices are connected, only the
device discovered first is used. We leave it to you to ensure that it's the desired device model."""
def discovery():
# Get a list of active com ports to scan for possible DATAQ Instruments devices
available_ports = list(serial.tools.list_ports.comports())
# Will eventually hold the com port of the detected device, if any
hooked_port = ""
for p in available_ports:
# Do we have a DATAQ Instruments device?
if ("VID:PID=0683" in p.hwid):
# Yes! Dectect and assign the hooked com port
hooked_port = p.device
break
if hooked_port:
print("Found a DATAQ Instruments device on",hooked_port)
ser.timeout = 0
ser.port = hooked_port
ser.baudrate = '115200'
ser.open()
return(True)
else:
# Get here if no DATAQ Instruments devices are detected
print("Please connect a DATAQ Instruments device")
input("Press ENTER to try again...")
return(False)
# Sends a passed command string after appending <cr>
def send_cmd(command):
ser.write((command+'\r').encode())
time.sleep(.1)
if not(acquiring):
# Echo commands if not acquiring
while True:
if(ser.inWaiting() > 0):
while True:
try:
s = ser.readline().decode()
s = s.strip('\n')
s = s.strip('\r')
s = s.strip(chr(0))
break
except:
continue
if s != "":
print (s)
break
# Configure the instrment's scan list
def config_scn_lst():
# Scan list position must start with 0 and increment sequentially
position = 0
for item in slist:
send_cmd("slist "+ str(position ) + " " + str(item))
position += 1
# Update the Range table
if ((item) & (0xf)) < 8:
# This is an analog channel. Refer to the slist prototype for your instrument
# as defined in the instrument protocol.
range_table.append(analog_ranges[item >> 8])
elif ((item) & (0xf)) == 8:
# This is a dig in channel. No measurement range support.
# Update range_table with any value to keep it aligned.
range_table.append(0)
elif ((item) & (0xf)) == 9:
# This is a rate channel
# Rate ranges begin with 1, so subtract 1 to maintain zero-based index
# in the rate_ranges tuple
range_table.append(rate_ranges[(item >> 8)-1])
else:
# This is a count channel. No measurement range support.
# Update range_table with any value to keep it aligned.
range_table.append(0)
with open('anemometer.csv', mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['Time', 'Wind speed', 'Wind direction: North(180)/South(360/0)/East(270)/West(90)'])
start_time = time.time()
while discovery() == False:
discovery()
# Stop in case Device was left running
send_cmd("stop")
# Define binary output mode
send_cmd("encode 0")
# Keep the packet size small for responsiveness
send_cmd("ps 0")
# Configure the instrument's scan list
config_scn_lst()
# Define sample rate = 10 Hz:
# 60,000,000/(srate * dec) = 60,000,000/(11718 * 512) = 10 Hz
send_cmd("dec 512")
send_cmd("srate 11718")
print("")
print("Ready to acquire...")
print ("")
print("Press <g> to go, <s> to stop, <r> to reset counter, and <q> to quit:")
# This is the slist position pointer. Ranges from 0 (first position)
# to len(slist)
slist_pointer = 0
# This is the constructed output string
output_string = ""
while True:
# If key 'SPACE' start scanning
if keyboard.is_pressed('g' or 'G'):
keyboard.read_key()
acquiring = True
send_cmd("start")
# If key 'r' reset counter
if keyboard.is_pressed('r' or 'R'):
keyboard.read_key()
send_cmd("reset 1")
# If key 'esc' stop scanning
if keyboard.is_pressed('s' or 'S'):
keyboard.read_key()
send_cmd("stop")
time.sleep(1)
ser.flushInput()
print ("")
print ("stopped")
acquiring = False
# If key 'q' exit
if keyboard.is_pressed('q' or 'Q'):
keyboard.read_key()
send_cmd("stop")
ser.flushInput()
break
while (ser.inWaiting() > (2 * len(slist))):
for i in range(len(slist)):
# The four LSBs of slist determine measurement function
function = (slist[slist_pointer]) & (0xf)
# Always two bytes per sample...read them
bytes = ser.read(2)
# Handling analog input channel (wind speed and direction)
if function < 8:
min_voltage = 1.32 # V
max_voltage = 6.6 # V
# Working with an Analog input channel
result = range_table[slist_pointer] * int.from_bytes(bytes, byteorder='little', signed=True) / 32768
# Check if we're on channel 1 (wind speed)
if slist_pointer == 0: # Assuming channel 1 is the first in the slist
# Wind speed conversion (voltage -> wind speed)
if result <= min_voltage:
wind_speed = 0
elif result >= max_voltage:
wind_speed = 40
else:
wind_speed = (result - min_voltage) / (max_voltage - min_voltage) * 40
output_string = output_string + "Wind Speed: {: 3.2f} m/s, ".format(wind_speed)
# Check if we're on channel 2 (wind direction)
elif slist_pointer == 1: # Assuming channel 2 is the second in the slist
# Wind direction conversion (voltage -> degrees)
if result <= min_voltage:
wind_direction = 0
elif result >= max_voltage:
wind_direction = 360
else:
wind_direction = (result - min_voltage) / (max_voltage - min_voltage) * 360
# Adjust to make 180 degrees the reference point
wind_direction = (wind_direction + 8.2) % 360
output_string = output_string + "Wind Direction: {: 3.2f}°, ".format(wind_direction)
current_time = time.time()
elapsed_time = current_time - start_time
writer.writerow([elapsed_time, wind_speed, wind_direction])
time.sleep(1)
elif function == 8:
# Handling Digital input channel (if needed)
result = (int.from_bytes(bytes, byteorder='big', signed=False)) & (0x007f)
output_string = output_string + "{: 3d}, ".format(result)
elif function == 9:
# Handling Rate input channel (if needed)
result = (int.from_bytes(bytes, byteorder='little', signed=True) + 32768) / 65535 * (range_table[slist_pointer])
output_string = output_string + "{: 3.1f}, ".format(result)
else:
# Handling Counter input channel (if needed)
result = (int.from_bytes(bytes, byteorder='little', signed=True)) + 32768
output_string = output_string + "{: 1d}, ".format(result)
# Get the next position in slist
slist_pointer += 1
if (slist_pointer + 1) > (len(slist)):
# End of a pass through slist items...output, reset, continue
print(output_string.rstrip(", ") + " ", end="\r")
output_string = ""
slist_pointer = 0
time.sleep(1)
ser.close()
SystemExit