diff --git a/.flake8 b/.flake8 index f2ac7f4e9a..951ea6fe5f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,4 @@ [flake8] -# E124: closing bracket does not match visual indentation # E126: continuation line over-indented for hanging indent # This one is bad. Sometimes ordering matters, conditional imports # setting env vars necessary etc. @@ -8,7 +7,7 @@ # https://github.com/PyCQA/pycodestyle/issues/386 # W504: line break after binary operator # (Raised by flake8 even when it is followed) -ignore = E124, E126, E402, E129, W504 +ignore = E126, E402, E129, W504 max-line-length = 158 exclude = test_import_fail.py, parsl/executors/workqueue/parsl_coprocess.py diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 0069ba34ff..8c50a6efcf 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -4,33 +4,21 @@ class Channel(metaclass=ABCMeta): - """For certain resources such as campus clusters or supercomputers at + """Channels are abstractions that enable ExecutionProviders to talk to + resource managers of remote compute facilities. + + For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance some resources may allow access to their job schedulers from - only their login-nodes which require you to authenticate on through SSH, - GSI-SSH and sometimes even require two factor authentication. Channels are - simple abstractions that enable the ExecutionProvider component to talk to - the resource managers of compute facilities. The simplest Channel, - *LocalChannel*, simply executes commands locally on a shell, while the - *SshChannel* authenticates you to remote systems. - - Channels are usually called via the execute_wait function. - For channels that execute remotely, a push_file function allows you to copy over files. - - .. code:: python - - +------------------ - | - cmd, wtime ------->| execute_wait - (ec, stdout, stderr)<-|---+ - | - src, dst_dir ------->| push_file - dst_path <--------|----+ - | - dst_script_dir <------| script_dir - | - +------------------- + only their login-nodes which require you to authenticate through SSH, or + require two factor authentication. + + The simplest Channel, *LocalChannel*, executes commands locally in a + shell, while the *SSHChannel* authenticates you to remote systems. + Channels provide the ability to execute commands remotely, using the + execute_wait method, and manipulate the remote file system using methods + such as push_file, pull_file and makedirs. Channels should ensure that each launched command runs in a new process group, so that providers (such as AdHocProvider and LocalProvider) which diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py index f92768f7f4..a289fb9457 100644 --- a/parsl/configs/ad_hoc.py +++ b/parsl/configs/ad_hoc.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] + } } -} config = Config( @@ -26,7 +26,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 2ae6dab314..94d0f135a4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -479,10 +479,10 @@ def _start_local_interchange_process(self): "heartbeat_threshold": self.heartbeat_threshold, "poll_period": self.poll_period, "logging_level": logging.DEBUG if self.worker_debug else logging.INFO - }, + }, daemon=True, name="HTEX-Interchange" - ) + ) self.interchange_proc.start() try: (self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 6c4ca961ec..f8f1da0f56 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -78,7 +78,7 @@ def __init__(self, logdir=".", logging_level=logging.INFO, poll_period=10, - ) -> None: + ) -> None: """ Parameters ---------- @@ -425,7 +425,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill self.current_platform['parsl_v']), "py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0], msg['parsl_v']) - ) + ) result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)} pkl_package = pickle.dumps(result_package) self.results_outgoing.send(pkl_package) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index f65ec560e5..b95a979221 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -234,7 +234,7 @@ def create_reg_message(self): 'dir': os.getcwd(), 'cpu_count': psutil.cpu_count(logical=False), 'total_memory': psutil.virtual_memory().total, - } + } b_msg = json.dumps(msg).encode('utf-8') return b_msg @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue logger.exception("Caught exception while trying to pickle the result package") pkl_package = pickle.dumps({'type': 'result', 'task_id': tid, 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info())) - }) + }) result_queue.put(pkl_package) tasks_in_progress.pop(worker_id) diff --git a/parsl/executors/taskvine/factory.py b/parsl/executors/taskvine/factory.py index 614413705d..484877d1bd 100644 --- a/parsl/executors/taskvine/factory.py +++ b/parsl/executors/taskvine/factory.py @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config): else: factory = Factory(batch_type=factory_config.batch_type, manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}", - ) + ) except Exception as e: raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}') diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 783ee8d098..94d33c27d1 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -194,10 +194,10 @@ def start(self, run_id: str, run_dir: str) -> int: "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "run_id": run_id - }, + }, name="Monitoring-Router-Process", daemon=True, - ) + ) self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, @@ -205,10 +205,10 @@ def start(self, run_id: str, run_dir: str) -> int: kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, - }, + }, name="Monitoring-DBM-Process", daemon=True, - ) + ) self.dbm_proc.start() self.logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid)) @@ -216,7 +216,7 @@ def start(self, run_id: str, run_dir: str) -> int: args=(self.logdir, self.resource_msgs, run_dir), name="Monitoring-Filesystem-Process", daemon=True - ) + ) self.filesystem_proc.start() self.logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") @@ -359,7 +359,7 @@ def __init__(self, run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3 # in seconds - ): + ): """ Initializes a monitoring configuration class. Parameters diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d42e0079b4..7861891d74 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -143,7 +143,7 @@ def send_first_last_message(try_id: int, 'first_msg': not is_last, 'last_msg': is_last, 'timestamp': datetime.datetime.now() - }) + }) radio.send(msg) return diff --git a/parsl/monitoring/visualization/plots/default/workflow_plots.py b/parsl/monitoring/visualization/plots/default/workflow_plots.py index 4cf876b188..ac5ae47285 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_plots.py @@ -22,7 +22,7 @@ 'exec_done': 'rgb(0, 200, 0)', 'memo_done': 'rgb(64, 200, 64)', 'fail_retryable': 'rgb(200, 128,128)' - } + } def task_gantt_plot(df_task, df_status, time_completed=None): @@ -50,7 +50,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': status['timestamp'], 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) last_status = status @@ -60,7 +60,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': time_completed, 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) fig = ff.create_gantt(parsl_tasks, @@ -205,7 +205,7 @@ def y_axis_setup(value): "fail_retryable": (8, 'rgb(200, 128,128)'), "joining": (9, 'rgb(128, 128, 255)'), "running_ended": (10, 'rgb(64, 64, 255)') - } + } def workflow_dag_plot(df_tasks, group_by_apps=True): diff --git a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py index 44e7d31a22..44ffe8ed98 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py @@ -164,7 +164,7 @@ def worker_efficiency(task, node): y=[total_workers] * (end - start + 1), name='Total of workers in whole run', ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title='Number of workers'), @@ -230,7 +230,7 @@ def resource_efficiency(resource, node, label): y=[total] * (end - start + 1), name=name2, ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title=yaxis), diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py index 83df8e2991..5c90d27918 100644 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ b/parsl/tests/configs/ad_hoc_cluster_htex.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } -} # type: Dict[str, Dict[str, Any]] + } + } # type: Dict[str, Dict[str, Any]] config = Config( executors=[ @@ -25,7 +25,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py index 2e319ed541..80949f1d1e 100644 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ b/parsl/tests/configs/htex_ad_hoc_cluster.py @@ -20,7 +20,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 3ab4305c74..81b9095285 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -8,4 +8,4 @@ hub_port=55055, resource_monitoring_interval=3, ) - ) + ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index a35493810e..fcfd2a594e 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -243,7 +243,7 @@ def setup_data(tmpd_cwd): @pytest.fixture(autouse=True, scope='function') -def wait_for_task_completion(pytestconfig): +def assert_no_outstanding_tasks(pytestconfig): """If we're in a config-file based mode, wait for task completion between each test. This will detect early on (by hanging) if particular test tasks are not finishing, rather than silently falling off the end of @@ -254,7 +254,11 @@ def wait_for_task_completion(pytestconfig): config = pytestconfig.getoption('config')[0] yield if config != 'local': - parsl.dfk().wait_for_current_tasks() + logger.info("Checking no outstanding tasks") + for task_record in parsl.dfk().tasks.values(): + fut = task_record['app_fu'] + assert fut.done(), f"Incomplete task found, task id {task_record['id']}" + logger.info("No outstanding tasks found") def pytest_make_collect_report(collector): diff --git a/parsl/tests/scaling_tests/vineex_condor.py b/parsl/tests/scaling_tests/vineex_condor.py index 62277374d3..59f8e7b07d 100644 --- a/parsl/tests/scaling_tests/vineex_condor.py +++ b/parsl/tests/scaling_tests/vineex_condor.py @@ -6,5 +6,5 @@ config = Config( executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=50055), provider=CondorProvider(), - )] + )] ) diff --git a/parsl/tests/scaling_tests/vineex_local.py b/parsl/tests/scaling_tests/vineex_local.py index e3fc9bba2f..4ade645f31 100644 --- a/parsl/tests/scaling_tests/vineex_local.py +++ b/parsl/tests/scaling_tests/vineex_local.py @@ -7,5 +7,5 @@ executors=[TaskVineExecutor(label='VineExec', worker_launch_method='factory', manager_config=TaskVineManagerConfig(port=50055), - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_condor.py b/parsl/tests/scaling_tests/wqex_condor.py index 91ec66e1ba..67123e7713 100644 --- a/parsl/tests/scaling_tests/wqex_condor.py +++ b/parsl/tests/scaling_tests/wqex_condor.py @@ -8,5 +8,5 @@ provider=CondorProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_local.py b/parsl/tests/scaling_tests/wqex_local.py index 320fedcded..4cf154e993 100644 --- a/parsl/tests/scaling_tests/wqex_local.py +++ b/parsl/tests/scaling_tests/wqex_local.py @@ -8,5 +8,5 @@ provider=LocalProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) diff --git a/parsl/tests/test_python_apps/test_lifted.py b/parsl/tests/test_python_apps/test_lifted.py index 52fba7943f..7d22530a08 100644 --- a/parsl/tests/test_python_apps/test_lifted.py +++ b/parsl/tests/test_python_apps/test_lifted.py @@ -89,8 +89,10 @@ def test_returns_a_class_instance(): def test_returns_a_class_instance_no_underscores(): # test that _underscore attribute references are not lifted + f = returns_a_class_instance() with pytest.raises(AttributeError): - returns_a_class_instance()._nosuchattribute.result() + f._nosuchattribute.result() + f.exception() # wait for f to complete before the test ends def test_returns_a_class():