Skip to content

Commit

Permalink
Merge branch 'master' into inputs-outputs-arg-defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
maddenp-noaa committed Jan 13, 2025
2 parents e22b8fa + c3091a0 commit 482ed57
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 50 deletions.
12 changes: 12 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Configuration
parsl.addresses.address_by_interface
parsl.addresses.address_by_query
parsl.addresses.address_by_route
parsl.addresses.get_all_addresses
parsl.addresses.get_any_address
parsl.utils.get_all_checkpoints
parsl.utils.get_last_checkpoint

Expand Down Expand Up @@ -193,3 +195,13 @@ Internal
parsl.jobs.job_status_poller.JobStatusPoller
parsl.jobs.strategy.Strategy
parsl.utils.Timer

Task Vine configuration
=======================

.. autosummary::
:toctree: stubs
:nosignatures:

parsl.executors.taskvine.TaskVineManagerConfig
parsl.executors.taskvine.TaskVineFactoryConfig
9 changes: 3 additions & 6 deletions parsl/app/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def __init__(self, reason: str, outputs: List[File]) -> None:
self.reason = reason
self.outputs = outputs

def __repr__(self) -> str:
return "Missing Outputs: {0}, Reason:{1}".format(self.outputs, self.reason)
def __str__(self) -> str:
return "Missing Outputs: {0}, Reason: {1}".format(self.outputs, self.reason)


class BadStdStreamFile(ParslError):
Expand All @@ -85,11 +85,8 @@ def __init__(self, reason: str) -> None:
super().__init__(reason)
self._reason = reason

def __repr__(self) -> str:
return "Bad Stream File: {}".format(self._reason)

def __str__(self) -> str:
return self.__repr__()
return "Bad Stream File: {}".format(self._reason)


class RemoteExceptionWrapper:
Expand Down
14 changes: 6 additions & 8 deletions parsl/app/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"""
import logging
from concurrent.futures import Future
from typing import Optional

import typeguard

Expand Down Expand Up @@ -39,24 +38,23 @@ def parent_callback(self, parent_fu):
self.set_result(self.file_obj)

@typeguard.typechecked
def __init__(self, fut: Future, file_obj: File, tid: Optional[int] = None) -> None:
def __init__(self, fut: Future, file_obj: File, tid: int) -> None:
"""Construct the DataFuture object.
If the file_obj is a string convert to a File.
Args:
- fut (AppFuture) : AppFuture that this DataFuture will track
- file_obj (string/File obj) : Something representing file(s)
- fut (Future) : Future that this DataFuture will track.
Completion of ``fut`` indicates that the data is
ready.
- file_obj (File) : File that this DataFuture represents the availability of
Kwargs:
- tid (task_id) : Task id that this DataFuture tracks
"""
super().__init__()
self._tid = tid
if isinstance(file_obj, File):
self.file_obj = file_obj
else:
raise ValueError("DataFuture must be initialized with a File, not {}".format(type(file_obj)))
self.file_obj = file_obj
self.parent = fut

self.parent.add_done_callback(self.parent_callback)
Expand Down
5 changes: 1 addition & 4 deletions parsl/dataflow/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ class BadCheckpoint(DataFlowException):
def __init__(self, reason: str) -> None:
self.reason = reason

def __repr__(self) -> str:
return self.reason

def __str__(self) -> str:
return self.__repr__()
return self.reason


class DependencyError(DataFlowException):
Expand Down
5 changes: 1 addition & 4 deletions parsl/executors/high_throughput/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ def __init__(self, worker_id, hostname):
self.worker_id = worker_id
self.hostname = hostname

def __repr__(self):
return "Task failure due to loss of worker {} on host {}".format(self.worker_id, self.hostname)

def __str__(self):
return self.__repr__()
return "Task failure due to loss of worker {} on host {}".format(self.worker_id, self.hostname)


class CommandClientTimeoutError(Exception):
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/taskvine/factory_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TaskVineFactoryConfig:
worker_options: Optional[str]
Additional options to pass to workers. Run
`vine_worker --help` for more details.
``vine_worker --help`` for more details.
Default is None.
worker_executable: str
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/taskvine/manager_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TaskVineManagerConfig:
address: Optional[str]
Address of the local machine.
If None, socket.gethostname() will be used to determine the address.
If None, :py:func:`parsl.addresses.get_any_address` will be used to determine the address.
project_name: Optional[str]
If given, TaskVine will periodically report its status and performance
Expand Down Expand Up @@ -56,7 +56,7 @@ class TaskVineManagerConfig:
environment name is given, TaskVine will package the conda
environment in a tarball and send it along with tasks to be
executed in a replicated conda environment.
If a tarball of packages (*.tar.gz) is given, TaskVine
If a tarball of packages (``*.tar.gz``) is given, TaskVine
skips the packaging step and sends the tarball along with
tasks to be executed in a replicated conda environment.
Expand Down
25 changes: 0 additions & 25 deletions parsl/tests/test_python_apps/test_fail.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,3 @@ def test_fail_sequence(fail_probs):

with pytest.raises(DependencyError):
t_final.result()


def test_deps(width=3):
"""Random failures in branches of Map -> Map -> reduce"""
# App1 App2 ... AppN
futs = [random_fail(fail_prob=0.4) for _ in range(width)]

# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN

futs = [random_fail(fail_prob=0.8, inputs=[f]) for f in futs]

# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN
# \ | /
# \ | /
# App_Final
try:
random_fail(fail_prob=0, inputs=futs).result()
except DependencyError:
pass

0 comments on commit 482ed57

Please sign in to comment.