From 0cbf96db28e0ee6bd67ab5b34d5d26e5325d7d82 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 13:40:57 -0500 Subject: [PATCH 1/3] Make pytest fail when tasks are running after a test (#2927) When pressing ctrl-c in pytest, prior to this PR, it would hang as wait_for_task_completion would be called in the unwinding of the fixture stack. However, because ctrl-c was pressed, tasks wouldn't be expected to all complete. This PR changes that to assert that all tasks are completed, rather than *waiting* for all tasks to complete. A non-finished task now gives a test error - which is arguably better anyway because it more aggressively flushes out tests that do not perform a complete shutdown. This means that pressing ctrl-C in a pytest leads to an assertion error; when previously it led to a hang. One recently introduced test is fixed to comply. This is part of safe-shutdown work. --- parsl/tests/conftest.py | 8 ++++++-- parsl/tests/test_python_apps/test_lifted.py | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index a35493810e..fcfd2a594e 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -243,7 +243,7 @@ def setup_data(tmpd_cwd): @pytest.fixture(autouse=True, scope='function') -def wait_for_task_completion(pytestconfig): +def assert_no_outstanding_tasks(pytestconfig): """If we're in a config-file based mode, wait for task completion between each test. This will detect early on (by hanging) if particular test tasks are not finishing, rather than silently falling off the end of @@ -254,7 +254,11 @@ def wait_for_task_completion(pytestconfig): config = pytestconfig.getoption('config')[0] yield if config != 'local': - parsl.dfk().wait_for_current_tasks() + logger.info("Checking no outstanding tasks") + for task_record in parsl.dfk().tasks.values(): + fut = task_record['app_fu'] + assert fut.done(), f"Incomplete task found, task id {task_record['id']}" + logger.info("No outstanding tasks found") def pytest_make_collect_report(collector): diff --git a/parsl/tests/test_python_apps/test_lifted.py b/parsl/tests/test_python_apps/test_lifted.py index 52fba7943f..7d22530a08 100644 --- a/parsl/tests/test_python_apps/test_lifted.py +++ b/parsl/tests/test_python_apps/test_lifted.py @@ -89,8 +89,10 @@ def test_returns_a_class_instance(): def test_returns_a_class_instance_no_underscores(): # test that _underscore attribute references are not lifted + f = returns_a_class_instance() with pytest.raises(AttributeError): - returns_a_class_instance()._nosuchattribute.result() + f._nosuchattribute.result() + f.exception() # wait for f to complete before the test ends def test_returns_a_class(): From d7bc4b9139e1a42188f27495e1127ef413a57539 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 14:13:45 -0500 Subject: [PATCH 2/3] Fix all instances of flake8 E124 indentation warnings and re-enable (#2937) --- .flake8 | 3 +-- parsl/configs/ad_hoc.py | 4 ++-- parsl/executors/high_throughput/executor.py | 4 ++-- parsl/executors/high_throughput/interchange.py | 4 ++-- .../executors/high_throughput/process_worker_pool.py | 4 ++-- parsl/executors/taskvine/factory.py | 2 +- parsl/monitoring/monitoring.py | 12 ++++++------ parsl/monitoring/remote.py | 2 +- .../visualization/plots/default/workflow_plots.py | 8 ++++---- .../plots/default/workflow_resource_plots.py | 4 ++-- parsl/tests/configs/ad_hoc_cluster_htex.py | 6 +++--- parsl/tests/configs/htex_ad_hoc_cluster.py | 2 +- parsl/tests/configs/local_threads_monitoring.py | 2 +- parsl/tests/scaling_tests/vineex_condor.py | 2 +- parsl/tests/scaling_tests/vineex_local.py | 2 +- parsl/tests/scaling_tests/wqex_condor.py | 2 +- parsl/tests/scaling_tests/wqex_local.py | 2 +- 17 files changed, 32 insertions(+), 33 deletions(-) diff --git a/.flake8 b/.flake8 index f2ac7f4e9a..951ea6fe5f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,4 @@ [flake8] -# E124: closing bracket does not match visual indentation # E126: continuation line over-indented for hanging indent # This one is bad. Sometimes ordering matters, conditional imports # setting env vars necessary etc. @@ -8,7 +7,7 @@ # https://github.com/PyCQA/pycodestyle/issues/386 # W504: line break after binary operator # (Raised by flake8 even when it is followed) -ignore = E124, E126, E402, E129, W504 +ignore = E126, E402, E129, W504 max-line-length = 158 exclude = test_import_fail.py, parsl/executors/workqueue/parsl_coprocess.py diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py index f92768f7f4..a289fb9457 100644 --- a/parsl/configs/ad_hoc.py +++ b/parsl/configs/ad_hoc.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] + } } -} config = Config( @@ -26,7 +26,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 2ae6dab314..94d0f135a4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -479,10 +479,10 @@ def _start_local_interchange_process(self): "heartbeat_threshold": self.heartbeat_threshold, "poll_period": self.poll_period, "logging_level": logging.DEBUG if self.worker_debug else logging.INFO - }, + }, daemon=True, name="HTEX-Interchange" - ) + ) self.interchange_proc.start() try: (self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 6c4ca961ec..f8f1da0f56 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -78,7 +78,7 @@ def __init__(self, logdir=".", logging_level=logging.INFO, poll_period=10, - ) -> None: + ) -> None: """ Parameters ---------- @@ -425,7 +425,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill self.current_platform['parsl_v']), "py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0], msg['parsl_v']) - ) + ) result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)} pkl_package = pickle.dumps(result_package) self.results_outgoing.send(pkl_package) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index f65ec560e5..b95a979221 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -234,7 +234,7 @@ def create_reg_message(self): 'dir': os.getcwd(), 'cpu_count': psutil.cpu_count(logical=False), 'total_memory': psutil.virtual_memory().total, - } + } b_msg = json.dumps(msg).encode('utf-8') return b_msg @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue logger.exception("Caught exception while trying to pickle the result package") pkl_package = pickle.dumps({'type': 'result', 'task_id': tid, 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info())) - }) + }) result_queue.put(pkl_package) tasks_in_progress.pop(worker_id) diff --git a/parsl/executors/taskvine/factory.py b/parsl/executors/taskvine/factory.py index 614413705d..484877d1bd 100644 --- a/parsl/executors/taskvine/factory.py +++ b/parsl/executors/taskvine/factory.py @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config): else: factory = Factory(batch_type=factory_config.batch_type, manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}", - ) + ) except Exception as e: raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}') diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 783ee8d098..94d33c27d1 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -194,10 +194,10 @@ def start(self, run_id: str, run_dir: str) -> int: "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "run_id": run_id - }, + }, name="Monitoring-Router-Process", daemon=True, - ) + ) self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, @@ -205,10 +205,10 @@ def start(self, run_id: str, run_dir: str) -> int: kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, - }, + }, name="Monitoring-DBM-Process", daemon=True, - ) + ) self.dbm_proc.start() self.logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid)) @@ -216,7 +216,7 @@ def start(self, run_id: str, run_dir: str) -> int: args=(self.logdir, self.resource_msgs, run_dir), name="Monitoring-Filesystem-Process", daemon=True - ) + ) self.filesystem_proc.start() self.logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") @@ -359,7 +359,7 @@ def __init__(self, run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3 # in seconds - ): + ): """ Initializes a monitoring configuration class. Parameters diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d42e0079b4..7861891d74 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -143,7 +143,7 @@ def send_first_last_message(try_id: int, 'first_msg': not is_last, 'last_msg': is_last, 'timestamp': datetime.datetime.now() - }) + }) radio.send(msg) return diff --git a/parsl/monitoring/visualization/plots/default/workflow_plots.py b/parsl/monitoring/visualization/plots/default/workflow_plots.py index 4cf876b188..ac5ae47285 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_plots.py @@ -22,7 +22,7 @@ 'exec_done': 'rgb(0, 200, 0)', 'memo_done': 'rgb(64, 200, 64)', 'fail_retryable': 'rgb(200, 128,128)' - } + } def task_gantt_plot(df_task, df_status, time_completed=None): @@ -50,7 +50,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': status['timestamp'], 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) last_status = status @@ -60,7 +60,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': time_completed, 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) fig = ff.create_gantt(parsl_tasks, @@ -205,7 +205,7 @@ def y_axis_setup(value): "fail_retryable": (8, 'rgb(200, 128,128)'), "joining": (9, 'rgb(128, 128, 255)'), "running_ended": (10, 'rgb(64, 64, 255)') - } + } def workflow_dag_plot(df_tasks, group_by_apps=True): diff --git a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py index 44e7d31a22..44ffe8ed98 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py @@ -164,7 +164,7 @@ def worker_efficiency(task, node): y=[total_workers] * (end - start + 1), name='Total of workers in whole run', ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title='Number of workers'), @@ -230,7 +230,7 @@ def resource_efficiency(resource, node, label): y=[total] * (end - start + 1), name=name2, ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title=yaxis), diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py index 83df8e2991..5c90d27918 100644 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ b/parsl/tests/configs/ad_hoc_cluster_htex.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } -} # type: Dict[str, Dict[str, Any]] + } + } # type: Dict[str, Dict[str, Any]] config = Config( executors=[ @@ -25,7 +25,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py index 2e319ed541..80949f1d1e 100644 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ b/parsl/tests/configs/htex_ad_hoc_cluster.py @@ -20,7 +20,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 3ab4305c74..81b9095285 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -8,4 +8,4 @@ hub_port=55055, resource_monitoring_interval=3, ) - ) + ) diff --git a/parsl/tests/scaling_tests/vineex_condor.py b/parsl/tests/scaling_tests/vineex_condor.py index 62277374d3..59f8e7b07d 100644 --- a/parsl/tests/scaling_tests/vineex_condor.py +++ b/parsl/tests/scaling_tests/vineex_condor.py @@ -6,5 +6,5 @@ config = Config( executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=50055), provider=CondorProvider(), - )] + )] ) diff --git a/parsl/tests/scaling_tests/vineex_local.py b/parsl/tests/scaling_tests/vineex_local.py index e3fc9bba2f..4ade645f31 100644 --- a/parsl/tests/scaling_tests/vineex_local.py +++ b/parsl/tests/scaling_tests/vineex_local.py @@ -7,5 +7,5 @@ executors=[TaskVineExecutor(label='VineExec', worker_launch_method='factory', manager_config=TaskVineManagerConfig(port=50055), - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_condor.py b/parsl/tests/scaling_tests/wqex_condor.py index 91ec66e1ba..67123e7713 100644 --- a/parsl/tests/scaling_tests/wqex_condor.py +++ b/parsl/tests/scaling_tests/wqex_condor.py @@ -8,5 +8,5 @@ provider=CondorProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_local.py b/parsl/tests/scaling_tests/wqex_local.py index 320fedcded..4cf154e993 100644 --- a/parsl/tests/scaling_tests/wqex_local.py +++ b/parsl/tests/scaling_tests/wqex_local.py @@ -8,5 +8,5 @@ provider=LocalProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) From f8b18366269c308138e2eeadbc00a4dceb63bf01 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 14:50:05 -0500 Subject: [PATCH 3/3] Refresh docstring for base Channel class (#2931) This removes many uses of the word "simple" and makes the first sentence summarise the class (see PEP-257). --- parsl/channels/base.py | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 0069ba34ff..8c50a6efcf 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -4,33 +4,21 @@ class Channel(metaclass=ABCMeta): - """For certain resources such as campus clusters or supercomputers at + """Channels are abstractions that enable ExecutionProviders to talk to + resource managers of remote compute facilities. + + For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance some resources may allow access to their job schedulers from - only their login-nodes which require you to authenticate on through SSH, - GSI-SSH and sometimes even require two factor authentication. Channels are - simple abstractions that enable the ExecutionProvider component to talk to - the resource managers of compute facilities. The simplest Channel, - *LocalChannel*, simply executes commands locally on a shell, while the - *SshChannel* authenticates you to remote systems. - - Channels are usually called via the execute_wait function. - For channels that execute remotely, a push_file function allows you to copy over files. - - .. code:: python - - +------------------ - | - cmd, wtime ------->| execute_wait - (ec, stdout, stderr)<-|---+ - | - src, dst_dir ------->| push_file - dst_path <--------|----+ - | - dst_script_dir <------| script_dir - | - +------------------- + only their login-nodes which require you to authenticate through SSH, or + require two factor authentication. + + The simplest Channel, *LocalChannel*, executes commands locally in a + shell, while the *SSHChannel* authenticates you to remote systems. + Channels provide the ability to execute commands remotely, using the + execute_wait method, and manipulate the remote file system using methods + such as push_file, pull_file and makedirs. Channels should ensure that each launched command runs in a new process group, so that providers (such as AdHocProvider and LocalProvider) which