From c2fd322a23f0cf3ebac19a64bab625f0459fa7b4 Mon Sep 17 00:00:00 2001 From: Dennis-van-Gils Date: Tue, 2 Apr 2024 20:41:03 +0200 Subject: [PATCH] All f-strings. Type checking via `isinstance`. All tests passed. --- src/dvg_qdeviceio.py | 184 +++++++++++++++++++++---------------------- 1 file changed, 91 insertions(+), 93 deletions(-) diff --git a/src/dvg_qdeviceio.py b/src/dvg_qdeviceio.py index 82bc72d..b748fe4 100644 --- a/src/dvg_qdeviceio.py +++ b/src/dvg_qdeviceio.py @@ -6,9 +6,9 @@ __author__ = "Dennis van Gils" __authoremail__ = "vangils.dennis@gmail.com" __url__ = "https://github.com/Dennis-van-Gils/python-dvg-qdeviceio" -__date__ = "27-02-2023" +__date__ = "02-04-2024" __version__ = "1.2.0" -# pylint: disable=protected-access +# pylint: disable=protected-access, wrong-import-position import os import sys @@ -101,7 +101,7 @@ def _coverage_resolve_trace(fn): @wraps(fn) def wrapped(*args, **kwargs): if running_coverage: - sys.settrace(threading._trace_hook) + sys.settrace(threading._trace_hook) # type: ignore fn(*args, **kwargs) return wrapped @@ -365,12 +365,12 @@ def attach_device(self, dev): if not hasattr(dev, "is_alive"): dev.is_alive = True # Assume the device is alive from the start - if type(self.dev) == self._NoDevice: + if isinstance(self.dev, self._NoDevice): self.dev = dev else: pft( - "Device can be attached only once. Already attached to '%s'." - % self.dev.name + "Device can be attached only once. Already attached to " + f"'{self.dev.name}'." ) sys.exit(22) @@ -387,7 +387,7 @@ def create_worker_DAQ(self, **kwargs): Will be passed directly to :class:`Worker_DAQ` as initialization parameters, :ref:`see here `. """ - if type(self.dev) == self._NoDevice: + if isinstance(self.dev, self._NoDevice): pft( "Can't create worker_DAQ, because there is no device attached." " Did you forget to call 'attach_device()' first?" @@ -400,7 +400,7 @@ def create_worker_DAQ(self, **kwargs): self._request_worker_DAQ_unpause.connect(self.worker_DAQ.unpause) self._thread_DAQ = QtCore.QThread() - self._thread_DAQ.setObjectName("%s_DAQ" % self.dev.name) + self._thread_DAQ.setObjectName(f"{self.dev.name}_DAQ") self._thread_DAQ.started.connect(self.worker_DAQ._do_work) self.worker_DAQ.moveToThread(self._thread_DAQ) @@ -416,7 +416,7 @@ def create_worker_jobs(self, **kwargs): Will be passed directly to :class:`Worker_jobs` as initialization parameters, :ref:`see here `. """ - if type(self.dev) == self._NoDevice: + if isinstance(self.dev, self._NoDevice): pft( "Can't create worker_jobs, because there is no device attached." " Did you forget to call 'attach_device()' first?" @@ -426,7 +426,7 @@ def create_worker_jobs(self, **kwargs): self.worker_jobs = Worker_jobs(qdev=self, **kwargs) self._thread_jobs = QtCore.QThread() - self._thread_jobs.setObjectName("%s_jobs" % self.dev.name) + self._thread_jobs.setObjectName(f"{self.dev.name}_jobs") self._thread_jobs.started.connect(self.worker_jobs._do_work) self.worker_jobs.moveToThread(self._thread_jobs) @@ -484,23 +484,23 @@ def start_worker_DAQ( """ if self._thread_DAQ is None: pft( - "Worker_DAQ %s: Can't start thread, because it does not exist. " - "Did you forget to call 'create_worker_DAQ()' first?" - % self.dev.name + f"Worker_DAQ {self.dev.name}: Can't start thread, because it " + "does not exist. Did you forget to call 'create_worker_DAQ()' " + "first?" ) sys.exit(404) # --> leaving elif not self.dev.is_alive: dprint( - "Worker_DAQ %s: WARNING - Device is not alive.\n" - % self.dev.name, + f"Worker_DAQ {self.dev.name}: " + "WARNING - Device is not alive.\n", ANSI.RED, ) return False # --> leaving if self.worker_DAQ.debug: tprint( - "Worker_DAQ %s: start requested..." % self.dev.name, + f"Worker_DAQ {self.dev.name}: start requested...", ANSI.WHITE, ) @@ -549,23 +549,23 @@ def start_worker_jobs( """ if self._thread_jobs is None: pft( - "Worker_jobs %s: Can't start thread because it does not exist. " - "Did you forget to call 'create_worker_jobs()' first?" - % self.dev.name + f"Worker_jobs {self.dev.name}: Can't start thread because it " + "does not exist. Did you forget to call 'create_worker_jobs()' " + "first?" ) sys.exit(404) # --> leaving elif not self.dev.is_alive: dprint( - "Worker_jobs %s: WARNING - Device is not alive.\n" - % self.dev.name, + f"Worker_jobs {self.dev.name}: " + "WARNING - Device is not alive.\n", ANSI.RED, ) return False # --> leaving if self.worker_jobs.debug: tprint( - "Worker_jobs %s: start requested..." % self.dev.name, + f"Worker_jobs {self.dev.name}: start requested...", ANSI.WHITE, ) @@ -616,15 +616,15 @@ def quit_worker_DAQ(self) -> bool: # CASE: Device has had a 'connection_lost' event during run-time, # which already stopped and closed the thread. print( - "Closing thread %s already closed." - % "{:.<16}".format(self._thread_DAQ.objectName()) + "Closing thread " + f"{self._thread_DAQ.objectName():.<16} already closed." ) return True if not self.worker_DAQ._has_stopped: if self.worker_DAQ.debug: tprint( - "Worker_DAQ %s: stop requested..." % self.dev.name, + f"Worker_DAQ {self.dev.name}: stop requested...", ANSI.WHITE, ) @@ -657,17 +657,13 @@ def quit_worker_DAQ(self) -> bool: locker_wait.unlock() self._thread_DAQ.quit() - print( - "Closing thread %s " - % "{:.<16}".format(self._thread_DAQ.objectName()), - end="", - ) + print(f"Closing thread {self._thread_DAQ.objectName():.<16} ", end="") if self._thread_DAQ.wait(2000): print("done.\n", end="") return True - else: - print("FAILED.\n", end="") # pragma: no cover - return False # pragma: no cover + + print("FAILED.\n", end="") # pragma: no cover + return False # pragma: no cover def quit_worker_jobs(self) -> bool: """Stop :attr:`worker_jobs` and close its thread. @@ -683,15 +679,15 @@ def quit_worker_jobs(self) -> bool: # CASE: Device has had a 'connection_lost' event during run-time, # which already stopped the worker and closed the thread. print( - "Closing thread %s already closed." - % "{:.<16}".format(self._thread_jobs.objectName()) + "Closing thread " + f"{self._thread_jobs.objectName():.<16} already closed." ) return True if not self.worker_jobs._has_stopped: if self.worker_jobs.debug: tprint( - "Worker_jobs %s: stop requested..." % self.dev.name, + f"Worker_jobs {self.dev.name}: stop requested...", ANSI.WHITE, ) @@ -709,17 +705,13 @@ def quit_worker_jobs(self) -> bool: locker_wait.unlock() self._thread_jobs.quit() - print( - "Closing thread %s " - % "{:.<16}".format(self._thread_jobs.objectName()), - end="", - ) + print(f"Closing thread {self._thread_jobs.objectName():.<16} ", end="") if self._thread_jobs.wait(2000): print("done.\n", end="") return True - else: - print("FAILED.\n", end="") # pragma: no cover - return False # pragma: no cover + + print("FAILED.\n", end="") # pragma: no cover + return False # pragma: no cover # -------------------------------------------------------------------------- # worker_DAQ related @@ -1024,13 +1016,19 @@ def __init__( # and trigger a 'signal_DAQ_paused' PyQt signal elif self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: self._running = True - self._pause = None # Will be set at init of '_do_work()' when 'start_worker_DAQ()' is called - self._paused = None # Will be set at init of '_do_work()' when 'start_worker_DAQ()' is called + + self._pause = None + """Will be set at init of '_do_work()' when 'start_worker_DAQ()' is + called""" + + self._paused = None + """Will be set at init of '_do_work()' when 'start_worker_DAQ()' is + called""" if self.debug: tprint( - "Worker_DAQ %s: init @ thread %s" - % (self.dev.name, _cur_thread_name()), + f"Worker_DAQ {self.dev.name}: " + f"init @ thread {_cur_thread_name()}", self.debug_color, ) @@ -1058,7 +1056,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: has started" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has started", self.debug_color, ) @@ -1068,8 +1066,8 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: starting @ thread %s" - % (self.dev.name, _cur_thread_name()), + f"Worker_DAQ {self.dev.name}: " + f"starting @ thread {_cur_thread_name()}", self.debug_color, ) @@ -1088,8 +1086,8 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: waiting for wake-up trigger" - % self.dev.name, + f"Worker_DAQ {self.dev.name}: " + "waiting for wake-up trigger", self.debug_color, ) @@ -1101,7 +1099,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: has woken up" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has woken up", self.debug_color, ) @@ -1114,7 +1112,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: has stopped" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has stopped", self.debug_color, ) @@ -1137,8 +1135,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: starting up paused" - % self.dev.name, + f"Worker_DAQ {self.dev.name}: starting up paused", self.debug_color, ) @@ -1151,7 +1148,7 @@ def confirm_has_started(self): if self._pause != self._paused: if self.debug and not init: tprint( - "Worker_DAQ %s: has paused" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has paused", self.debug_color, ) self.qdev.signal_DAQ_paused.emit() @@ -1163,7 +1160,7 @@ def confirm_has_started(self): if self._pause != self._paused: if self.debug: tprint( - "Worker_DAQ %s: has unpaused" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has unpaused", self.debug_color, ) self._paused = False @@ -1172,7 +1169,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_DAQ %s: has stopped" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has stopped", self.debug_color, ) @@ -1193,8 +1190,8 @@ def _perform_DAQ(self): if self.debug: tprint( - "Worker_DAQ %s: lock # %i" - % (self.dev.name, self.qdev.update_counter_DAQ), + f"Worker_DAQ {self.dev.name}: " + f"lock # {self.qdev.update_counter_DAQ}", self.debug_color, ) @@ -1231,7 +1228,7 @@ def _perform_DAQ(self): except Exception as err: # pylint: disable=broad-except pft(err) dprint( - "@ Worker_DAQ %s\n" % self.dev.name, + f"@ Worker_DAQ {self.dev.name}\n", ANSI.RED, ) else: @@ -1249,8 +1246,8 @@ def _perform_DAQ(self): if self.debug: tprint( - "Worker_DAQ %s: unlock # %i" - % (self.dev.name, self.qdev.update_counter_DAQ), + f"Worker_DAQ {self.dev.name}: " + f"unlock # {self.qdev.update_counter_DAQ}", self.debug_color, ) @@ -1259,7 +1256,7 @@ def _perform_DAQ(self): # Check the not alive counter if self.qdev.not_alive_counter_DAQ >= self.critical_not_alive_count: dprint( - "Worker_DAQ %s: Lost connection to device." % self.dev.name, + f"Worker_DAQ {self.dev.name}: " f"Lost connection to device.", ANSI.RED, ) self.dev.is_alive = False @@ -1273,7 +1270,7 @@ def _perform_DAQ(self): def _stop(self): """Stop the worker to prepare for quitting the worker thread.""" if self.debug: - tprint("Worker_DAQ %s: stopping" % self.dev.name, self.debug_color) + tprint(f"Worker_DAQ {self.dev.name}: stopping", self.debug_color) if self._DAQ_trigger == DAQ_TRIGGER.INTERNAL_TIMER: # NOTE: The timer /must/ be stopped from the worker_DAQ thread! @@ -1281,7 +1278,7 @@ def _stop(self): if self.debug: tprint( - "Worker_DAQ %s: has stopped" % self.dev.name, + f"Worker_DAQ {self.dev.name}: has stopped", self.debug_color, ) @@ -1316,7 +1313,7 @@ def pause(self): if self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: if self.debug: tprint( - "Worker_DAQ %s: pause requested..." % self.dev.name, + f"Worker_DAQ {self.dev.name}: pause requested...", ANSI.WHITE, ) @@ -1338,7 +1335,7 @@ def unpause(self): if self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: if self.debug: tprint( - "Worker_DAQ %s: unpause requested..." % self.dev.name, + f"Worker_DAQ {self.dev.name}: unpause requested...", ANSI.WHITE, ) @@ -1361,7 +1358,7 @@ def wake_up(self): if self._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP: if self.debug: tprint( - "Worker_DAQ %s: wake-up requested..." % self.dev.name, + f"Worker_DAQ {self.dev.name}: wake-up requested...", ANSI.WHITE, ) @@ -1516,8 +1513,8 @@ def __init__( if self.debug: tprint( - "Worker_jobs %s: init @ thread %s" - % (self.dev.name, _cur_thread_name()), + f"Worker_jobs {self.dev.name}: " + f"init @ thread {_cur_thread_name()}", self.debug_color, ) @@ -1545,7 +1542,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_jobs %s: has started" % self.dev.name, + f"Worker_jobs {self.dev.name}: has started", self.debug_color, ) @@ -1555,8 +1552,8 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_jobs %s: starting @ thread %s" - % (self.dev.name, _cur_thread_name()), + f"Worker_jobs {self.dev.name}: " + f"starting @ thread {_cur_thread_name()}", self.debug_color, ) @@ -1568,8 +1565,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_jobs %s: waiting for wake-up trigger" - % self.dev.name, + f"Worker_jobs {self.dev.name}: waiting for wake-up trigger", self.debug_color, ) @@ -1581,7 +1577,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_jobs %s: has woken up" % self.dev.name, + f"Worker_jobs {self.dev.name}: has woken up", self.debug_color, ) @@ -1594,7 +1590,7 @@ def confirm_has_started(self): if self.debug: tprint( - "Worker_jobs %s: has stopped" % self.dev.name, + f"Worker_jobs {self.dev.name}: has stopped", self.debug_color, ) @@ -1614,8 +1610,8 @@ def _perform_jobs(self): if self.debug: tprint( - "Worker_jobs %s: lock # %i" - % (self.dev.name, self.qdev.update_counter_jobs), + f"Worker_jobs {self.dev.name}: " + f"lock # {self.qdev.update_counter_jobs}", self.debug_color, ) @@ -1629,16 +1625,18 @@ def _perform_jobs(self): args = job[1:] if self.debug: - if type(func) == str: + if isinstance(func, str): tprint( - "Worker_jobs %s: %s %s" - % (self.dev.name, func, args), + f"Worker_jobs {self.dev.name}: " + f"{func} " + f"{args}", self.debug_color, ) else: tprint( - "Worker_jobs %s: %s %s" - % (self.dev.name, func.__name__, args), + f"Worker_jobs {self.dev.name}: " + f"{func.__name__} " + f"{args}", self.debug_color, ) @@ -1650,7 +1648,7 @@ def _perform_jobs(self): except Exception as err: # pylint: disable=broad-except pft(err) dprint( - "@ Worker_jobs %s\n" % self.dev.name, + f"@ Worker_jobs {self.dev.name}\n", ANSI.RED, ) else: @@ -1662,8 +1660,8 @@ def _perform_jobs(self): if self.debug: tprint( - "Worker_jobs %s: unlock # %i" - % (self.dev.name, self.qdev.update_counter_jobs), + f"Worker_jobs {self.dev.name}: " + f"unlock # {self.qdev.update_counter_jobs}", self.debug_color, ) @@ -1675,7 +1673,7 @@ def _stop(self): """Stop the worker to prepare for quitting the worker thread""" if self.debug: tprint( - "Worker_jobs %s: stopping" % self.dev.name, + f"Worker_jobs {self.dev.name}: stopping", self.debug_color, ) @@ -1703,7 +1701,7 @@ def add_to_queue(self, instruction, pass_args=()): This method can be called from another thread. """ - if type(pass_args) is not tuple: + if not isinstance(pass_args, tuple): pass_args = (pass_args,) self._queue.put((instruction, *pass_args)) @@ -1718,7 +1716,7 @@ def process_queue(self): """ if self.debug: tprint( - "Worker_jobs %s: wake-up requested..." % self.dev.name, + f"Worker_jobs {self.dev.name}: wake-up requested...", ANSI.WHITE, )