-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathframework.py
634 lines (499 loc) · 16.9 KB
/
framework.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
"""
The framework for stepping and scanning axis and measuring the sensor values
"""
import os
import time
import csv
from abc import ABC, abstractmethod
from enum import Enum
from PyQt5.QtCore import QTimer
class AxisType(Enum):
"""
A type of axis as used for movement
"""
X_Axis = "X Axis"
Y_Axis = "Y Axis"
Auxiliary_Axis = "Aux Axis"
class ControlAxis(ABC):
"""
Controls a single axis used for calibration
An Axis is anything that gets changed while scanning.
For example, The X/Y Axis of an area, the power of the laser, etc
Each Axis has a list of points it goes to
There should be the same number of points in every axis
Must be subclassed for each actual axis being used
Any subclasses that are imported into main.py will show up in the drop downs
The _write_value(self, value) method must be overrided in the subclass to actually move the axis
Everything else can be overrided if needed
"""
points = None
_value = 0
_string_value = "0.0"
_name = ""
_type = None
_max = 0
_min = 0
_norm_min = 0
_norm_max = 1
_saved_points = {}
def __init__(self, name):
self._name = name
self.points = []
# This method **must** be overrided in subclasses
@abstractmethod
def _write_value(self, value):
"""
Write the value to the physical device
Returns whether the write was successful
"""
pass
# These can be overrided if needed
def update(self):
"""
Gets called very quickly repeatedly while scanning
"""
pass
def is_done(self):
"""
Returns whether the axis is done moving. Also gets called quickely while scanning
"""
return True
def get_custom_config(self):
"""
Gets a custom pywidgets BaseWidget to display in the axis configuation
area of the gui when this axis is selected
"""
return None
def goto_home(self):
"""
Homes the axis to go to the endstop
"""
self.goto_value(0)
def update_events(self, events):
"""
Call with new events when available
"""
#print("Axis", events)
if 'saved_points' in events:
self._saved_points = events['saved_points']
# These methods should not be overrided unless absolutly required
def set_min(self, min_value):
"""
Sets the min value
"""
self._min = min_value
def set_max(self, max_value):
"""
Sets the max value
"""
self._max = max_value
def get_min(self):
"""
Gets the min value
"""
return self._min
def get_max(self):
"""
Gets the max value
"""
return self._max
def set_norm_min(self, min_value):
"""
Sets the norm_min value
"""
self._norm_min = min_value
def set_norm_max(self, max_value):
"""
Sets the norm_max value
"""
self._norm_max = max_value
def get_norm_min(self):
"""
Gets the norm_min value
"""
return self._norm_min
def get_norm_max(self):
"""
Gets the norm_max value
"""
return self._norm_max
def get_value(self):
"""
Gets the target value
"""
return self._value
def get_current_value(self):
"""
Gets the current value (may not be the target)
"""
return self._value
def get_string_value(self):
"""
Gets the current value as it was set (keep saved point names and percents)
"""
return self._string_value
def goto_value(self, value):
"""
Gots to a specified value, clipping to the min and max
Name of saved point => resolve value of saved point
Number (54) => go to value with world units
Percent (27%) => Go to normalized value
Returns if successful
"""
done_value, is_string = self.resolve_point(value)
self._value = done_value
if is_string:
self._string_value = str(value)
else:
self._string_value = str(done_value)
return self._write_value(done_value)
def resolve_point(self, value):
"""
Resovle a point in the form of a number, percent, or saved point into an actual point value
"""
# print(value)
working_value = str(value)
done_value = None
is_string = False
try:
# Try to convert to number immediately
#print("Trying to convert", working_value, "to float")
done_value = float(working_value)
is_string = False
except ValueError:
assert isinstance(working_value, str)
is_string = True
# Resolve saved points to values
if working_value in self._saved_points:
working_value = self._saved_points[working_value]
#print("Resolved to saved point")
try:
#print("Trying again to convert", working_value, "to float")
if working_value.endswith('%'):
# Percent value
working_value = float(working_value[:-1])
done_value = (working_value / 100) * \
(self._norm_max - self._norm_min) + self._norm_min
else:
done_value = float(working_value)
except ValueError:
#print("Could not convert", value, "to float")
return
# Clamp the actual value sent to device
if done_value < self._min:
done_value = self._min
if done_value > self._max:
done_value = self._max
return done_value, is_string
def get_name(self):
"""
Gets the name of this axis
"""
return self._name
def set_name(self, name):
"""
Sets the name of this axis
"""
self._name = name
def get_units(self):
"""
Return a string of the units used for this axis
"""
return ""
class LightSource(ABC):
"""
The thing that gets enabled when measuring
This class must be subclassed for each lightsource device
"""
@abstractmethod
def set_enabled(self, enable=True):
"""
Set the lightsource to enabled or disabled
"""
pass
def get_enabled(self):
"""
Get enabled
"""
return False
def get_custom_config(self):
"""
Get the GUI config for this lightsource device
"""
return None
class Sensor:
"""
The thing that is being calibrated
This must be subclasses for each sensor
"""
def get_custom_config(self):
"""
Get the GUI config for this lightsource device
"""
return None
def update(self):
"""
Gets called repeatedly while measuring
Should return the most recent measurements
"""
return []
def begin_measuring(self, save_dir):
"""
Begin measuring
Gets called before measurement begins
"""
pass
def finish_measuring(self):
"""
Stop measuring
Gets called after measurement ends
"""
pass
def begin_live_data(self):
"""
Start getting live data for viewing
"""
self.begin_measuring(None)
def get_live_data(self):
"""
Get the live data for viewing
"""
return self.update()
def stop_live_data(self):
"""
Stop live data for viewing
"""
self.finish_measuring()
def get_live_headers(self):
"""
Gets the headers used for live data
"""
return self.get_headers()
def get_headers(self):
"""
Gets the headers for the data returned
"""
return []
def update_events(self, events):
"""
Updates any events sent out
"""
pass
class AxisControllerState(Enum):
"""
The state for the AxisController
"""
BEGIN_STEP = 'beginstep'
WAIT_STEP = 'waitstep'
BEGIN_ENABLE = 'beginsensor'
WAIT_ENABLE = 'waitsensor'
BEGIN_PRE_DELAY = 'beginpredelay'
WAIT_PRE_DELAY = 'waitpredelay'
BEGIN_POST_DELAY = 'beginpostdelay'
WAIT_POST_DELAY = 'waitpostdelay'
NEXT_STEP = 'nextstep'
DONE = 'done'
class AxisController:
"""
Controls many ControlAxis to scan a grid
"""
_axis = []
_sensor = None
_lightsource = None
_state = AxisControllerState.DONE
_pre_delay = 0
_measure_delay = 0
_post_delay = 0
_scan_frequency = 0
_delay_start_time = 0
_saved_points = None
_data = []
_step_data = []
_measuring = False
_outfile = None
_step_file = None
_update_function = None
_step = 0
_total_steps = 0
_timer = QTimer()
def __init__(self, control_axis, sensor, lightsource, pre_delay, measure_delay, post_delay,
scan_frequency, saved_points=None, outfile=None, update_function=None):
"""
Creates a new Axis Controller with a list of ControlAxis to control
:param control_axis: a list of ControlAxis in the order that they should be controlled
"""
self._axis = control_axis
self._sensor = sensor
self._lightsource = lightsource
self._pre_delay = pre_delay
self._measure_delay = measure_delay
self._post_delay = post_delay
self._scan_frequency = scan_frequency
self._saved_points = saved_points
self._outfile = outfile
self._update_function = update_function
def begin(self):
"""
Starts scanning
"""
if self._lightsource is not None:
self._lightsource.set_enabled(False)
self._step = 0
self._total_steps = len(self._axis[0].points)
headers = []
headers.append("Time")
for axis in self._axis:
if isinstance(axis, ControlAxis):
headers.append(axis.get_name())
headers.append("Folder Number")
self._data = [headers]
self._set_state(AxisControllerState.BEGIN_STEP)
self._timer.timeout.connect(self._scan)
if self._scan_frequency == 0:
self._timer.start()
else:
self._timer.start(1000 / self._scan_frequency)
def stop(self):
"""
Stops the current scan
"""
self._timer.stop()
if self._state == AxisControllerState.BEGIN_ENABLE \
or self._state == AxisControllerState.WAIT_ENABLE:
if isinstance(self._sensor, Sensor):
self._sensor.finish_measuring()
self._set_state(AxisControllerState.DONE)
if self._lightsource is not None:
self._lightsource.set_enabled(False)
def get_state(self):
"""
Gets the current state
"""
return self._state
def _set_state(self, state):
self._state = state
if self._update_function is not None:
self._update_function(
{'scan': (self._state, self._step, self._total_steps)})
def _scan(self):
"""
Scans through all of the axis given in the constructor in order
:return: Nothing
"""
if (self._state == AxisControllerState.BEGIN_PRE_DELAY
or self._state == AxisControllerState.WAIT_PRE_DELAY
or self._state == AxisControllerState.BEGIN_ENABLE
or self._state == AxisControllerState.WAIT_ENABLE
or self._state == AxisControllerState.BEGIN_POST_DELAY
or self._state == AxisControllerState.WAIT_POST_DELAY):
datarow = [float(time.time())]
if self._sensor is not None:
datarow += self._sensor.update()
if self._lightsource is not None:
datarow += [1.0] if self._lightsource.get_enabled() else [0.0]
self._step_data.append(datarow)
# Begin Step
if self._state == AxisControllerState.BEGIN_STEP:
print("Moving to step:", self._step)
done = True
for axis in self._axis:
if len(axis.points) > self._step:
value = axis.points[self._step]
axis.goto_value(value)
done = False
if done:
self._set_state(AxisControllerState.DONE)
else:
self._set_state(AxisControllerState.WAIT_STEP)
# Reset the per step data
headers = []
headers.append("Time")
if isinstance(self._sensor, Sensor):
headers.extend(self._sensor.get_headers())
if isinstance(self._lightsource, LightSource):
headers.append("Light Source Enabled")
self._step_data = [headers]
# Deal with directories
if self._outfile is not None and self._outfile is not '':
self._step_file = "{}/{}".format(self._outfile, self._step)
# Make the directory for this step
if not os.path.exists(self._step_file):
print("Making dir for step")
print(self._step_file)
os.makedirs(self._step_file)
else:
self._step_file = None
# Wait Step
elif self._state == AxisControllerState.WAIT_STEP:
print('.', end='')
done = True
for axis in self._axis:
if not axis.is_done():
done = False
if done:
print()
self._set_state(AxisControllerState.BEGIN_PRE_DELAY)
# Begin Pre Delay
elif self._state == AxisControllerState.BEGIN_PRE_DELAY:
print("Pre Delay")
self._delay_start_time = time.time()
self._set_state(AxisControllerState.WAIT_PRE_DELAY)
# Wait Pre Delay
elif self._state == AxisControllerState.WAIT_PRE_DELAY:
print('.', end='')
if time.time() - self._delay_start_time > self._pre_delay:
print()
self._set_state(AxisControllerState.BEGIN_ENABLE)
# Begin Measuring
elif self._state == AxisControllerState.BEGIN_ENABLE:
print("Taking measurement")
if self._lightsource is not None:
self._lightsource.set_enabled(True)
if self._sensor is not None:
self._sensor.begin_measuring(self._step_file)
self._delay_start_time = time.time()
self._set_state(AxisControllerState.WAIT_ENABLE)
# Wait Measuring
elif self._state == AxisControllerState.WAIT_ENABLE:
print('.', end='')
if time.time() - self._delay_start_time > self._measure_delay:
print()
self._set_state(AxisControllerState.BEGIN_POST_DELAY)
if self._lightsource is not None:
self._lightsource.set_enabled(False)
# Begin Post Delay
elif self._state == AxisControllerState.BEGIN_POST_DELAY:
print("Post Delay")
self._delay_start_time = time.time()
self._set_state(AxisControllerState.WAIT_POST_DELAY)
# Wait Post Delay
elif self._state == AxisControllerState.WAIT_POST_DELAY:
print('.', end='')
if time.time() - self._delay_start_time > self._post_delay:
print()
self._set_state(AxisControllerState.NEXT_STEP)
# Next Step
elif self._state == AxisControllerState.NEXT_STEP:
print("Saving Data")
if self._step_file is not None and self._step_file != '':
# Write the step data to csv
with open("{}/data.csv".format(self._step_file), 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
csvwriter.writerows(self._step_data)
# Put data in the per scan data
step_meta = [float(time.time())]
for axis in self._axis:
step_meta.append(axis.get_current_value())
step_meta.append(self._step)
self._data.append(step_meta)
# Increase the step and move on
self._step += 1
self._state = AxisControllerState.BEGIN_STEP
# Done
elif self._state == AxisControllerState.DONE:
print("Done.")
if self._outfile is not None and self._outfile is not '':
with open("{}/data.csv".format(self._outfile), 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
csvwriter.writerows(self._data)
self._timer.stop()