Skip to content

Commit

Permalink
Merge branch 'master' into benc-cctools-7.7.2
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Nov 6, 2023
2 parents 7a81cc0 + 74e63ec commit 5b2add3
Show file tree
Hide file tree
Showing 23 changed files with 112 additions and 122 deletions.
3 changes: 1 addition & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[flake8]
# E124: closing bracket does not match visual indentation
# E126: continuation line over-indented for hanging indent
# This one is bad. Sometimes ordering matters, conditional imports
# setting env vars necessary etc.
Expand All @@ -8,7 +7,7 @@
# https://github.com/PyCQA/pycodestyle/issues/386
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E124, E126, E402, E129, W504
ignore = E126, E402, E129, W504
max-line-length = 158
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
Expand Down
11 changes: 2 additions & 9 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
[mypy]
plugins = sqlalchemy.ext.mypy.plugin

# globally disabled error codes:
# str-bytes-safe warns that a byte string is formatted into a string.
# which is commonly done with manager IDs in the parsl
# codebase.
disable_error_code = str-bytes-safe
enable_error_code = ignore-without-code
no_implicit_reexport = True
warn_redundant_casts = True
Expand Down Expand Up @@ -78,10 +73,8 @@ disallow_untyped_defs = True
disallow_any_expr = True

[mypy-parsl.executors.high_throughput.interchange.*]
check_untyped_defs = True

[mypy-parsl.executors.extreme_scale.*]
ignore_errors = True
disallow_untyped_defs = True
warn_unreachable = True

[mypy-parsl.monitoring.*]
disallow_untyped_decorators = True
Expand Down
36 changes: 12 additions & 24 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,21 @@


class Channel(metaclass=ABCMeta):
"""For certain resources such as campus clusters or supercomputers at
"""Channels are abstractions that enable ExecutionProviders to talk to
resource managers of remote compute facilities.
For certain resources such as campus clusters or supercomputers at
research laboratories, resource requirements may require authentication.
For instance some resources may allow access to their job schedulers from
only their login-nodes which require you to authenticate on through SSH,
GSI-SSH and sometimes even require two factor authentication. Channels are
simple abstractions that enable the ExecutionProvider component to talk to
the resource managers of compute facilities. The simplest Channel,
*LocalChannel*, simply executes commands locally on a shell, while the
*SshChannel* authenticates you to remote systems.
Channels are usually called via the execute_wait function.
For channels that execute remotely, a push_file function allows you to copy over files.
.. code:: python
+------------------
|
cmd, wtime ------->| execute_wait
(ec, stdout, stderr)<-|---+
|
src, dst_dir ------->| push_file
dst_path <--------|----+
|
dst_script_dir <------| script_dir
|
+-------------------
only their login-nodes which require you to authenticate through SSH, or
require two factor authentication.
The simplest Channel, *LocalChannel*, executes commands locally in a
shell, while the *SSHChannel* authenticates you to remote systems.
Channels provide the ability to execute commands remotely, using the
execute_wait method, and manipulate the remote file system using methods
such as push_file, pull_file and makedirs.
Channels should ensure that each launched command runs in a new process
group, so that providers (such as AdHocProvider and LocalProvider) which
Expand Down
4 changes: 2 additions & 2 deletions parsl/configs/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
}
}
}


config = Config(
Expand All @@ -26,7 +26,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
3 changes: 1 addition & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ def submit(self,
'joins': None,
'try_id': 0,
'id': task_id,
'task_launch_lock': threading.Lock(),
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
Expand Down Expand Up @@ -1029,8 +1030,6 @@ def submit(self,
task_record['func_name'],
waiting_message))

task_record['task_launch_lock'] = threading.Lock()

app_fu.add_done_callback(partial(self.handle_app_update, task_record))
self.update_task_state(task_record, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,10 @@ def _start_local_interchange_process(self):
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO
},
},
daemon=True,
name="HTEX-Interchange"
)
)
self.interchange_proc.start()
try:
(self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120)
Expand Down
Loading

0 comments on commit 5b2add3

Please sign in to comment.