Skip to content

Commit

Permalink
Refactor 4 different renderings of task description (#3743)
Browse files Browse the repository at this point in the history
Prior to this PR, there were 4 variant ways to render a future (which is
usually but not always a Parsl task) as a text string:

1. describing failure of inner future in a join_app with 1 future to
join

- rendered task ID if the inner future has a task record attribute,
without checking it is a parsl.dataflow.taskrecord.TaskRecord or that
that attribute contains a task id entry.
 - otherwise render as None

2. describing failure of inner future in a join_app with a list of
futures to join

- is meant to do the same as case 1, but is buggy and always renders as
None (the wrong object is checked for having a task_record attribute)

3. describing failure of a dependency future

 - rendered task ID if there was a task_record, and the
   future is from the same DFK as is rendering the code
   (so that a task number from a different DFK is not
   rendered, in the corner case of a task from one DFK
   being used as a dependency for a task in another DFK)

 - otherwise, renders the repr of the future

4. describing which dependencies will be waited on when submitting a
task to the DFK

- rendered task ID if the future is an instance of AppFuture or
DataFuture
 - otherwise renders the repr of the future

This PR makes a single render method. It is a member of the DFK, because
rendering is done in the context of the DFK: two parsl task AppFutures
will be rendered differently if they are from this DFK or a different
DFK.

This PR implements render_future_description which tries to combine all
of the above:

 - if the future is an AppFuture, and is from the same
   DFK, render the task ID
 - otherwise render the repr of the future


# Changed Behaviour

human readable descriptions of dependencies/join inner futures will be
changed

# Fixes

Fixes # (issue)

## Type of change

Choose which options apply, and delete the ones which do not apply.

- Bug fix
- New feature
- Update to human readable text: Documentation/error messages/comments
- Code maintenance/cleanup
  • Loading branch information
benclifford authored Jan 14, 2025
1 parent c3091a0 commit 784256d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
37 changes: 17 additions & 20 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,24 +484,18 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional

# now we know each joinable Future is done
# so now look for any exceptions
exceptions_tids: List[Tuple[BaseException, Optional[str]]]
exceptions_tids: List[Tuple[BaseException, str]]
exceptions_tids = []
if isinstance(joinable, Future):
je = joinable.exception()
if je is not None:
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
tid = self.render_future_description(joinable)
exceptions_tids = [(je, tid)]
elif isinstance(joinable, list):
for future in joinable:
je = future.exception()
if je is not None:
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
tid = self.render_future_description(future)
exceptions_tids.append((je, tid))
else:
raise TypeError(f"Unknown joinable type {type(joinable)}")
Expand Down Expand Up @@ -918,13 +912,7 @@ def _unwrap_futures(self, args: Sequence[Any], kwargs: Dict[str, Any]) \
dep_failures = []

def append_failure(e: Exception, dep: Future) -> None:
# If this Future is associated with a task inside this DFK,
# then refer to the task ID.
# Otherwise make a repr of the Future object.
if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self:
tid = "task " + repr(dep.task_record['id'])
else:
tid = repr(dep)
tid = self.render_future_description(dep)
dep_failures.extend([(e, tid)])

# Replace item in args
Expand Down Expand Up @@ -1076,10 +1064,7 @@ def submit(self,

depend_descs = []
for d in depends:
if isinstance(d, AppFuture) or isinstance(d, DataFuture):
depend_descs.append("task {}".format(d.tid))
else:
depend_descs.append(repr(d))
depend_descs.append(self.render_future_description(d))

if depend_descs != []:
waiting_message = "waiting on {}".format(", ".join(depend_descs))
Expand Down Expand Up @@ -1438,6 +1423,18 @@ def default_std_autopath(self, taskrecord, kw):
'' if label is None else '_{}'.format(label),
kw))

def render_future_description(self, dep: Future) -> str:
"""Renders a description of the future in the context of the
current DFK.
"""
if isinstance(dep, AppFuture) and dep.task_record['dfk'] == self:
tid = "task " + repr(dep.task_record['id'])
elif isinstance(dep, DataFuture):
tid = "DataFuture from task " + repr(dep.tid)
else:
tid = repr(dep)
return tid


class DataFlowKernelLoader:
"""Manage which DataFlowKernel is active.
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_python_apps/test_dep_standard_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ def test_future_fail_dependency():
# Future, plain_fut, somewhere in its str

assert repr(plain_fut) in str(ex)
assert len(ex.dependent_exceptions_tids) == 1
assert isinstance(ex.dependent_exceptions_tids[0][0], ValueError)
assert ex.dependent_exceptions_tids[0][1].startswith("<Future ")
25 changes: 17 additions & 8 deletions parsl/tests/test_python_apps/test_fail.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ def test_no_deps():
pass


@pytest.mark.parametrize("fail_probs", ((1, 0), (0, 1)))
def test_fail_sequence(fail_probs):
"""Test failure in a sequence of dependencies
def test_fail_sequence_first():
t1 = random_fail(fail_prob=1)
t2 = random_fail(fail_prob=0, inputs=[t1])
t_final = random_fail(fail_prob=0, inputs=[t2])

App1 -> App2 ... -> AppN
"""
with pytest.raises(DependencyError):
t_final.result()

t1_fail_prob, t2_fail_prob = fail_probs
t1 = random_fail(fail_prob=t1_fail_prob)
t2 = random_fail(fail_prob=t2_fail_prob, inputs=[t1])
assert len(t_final.exception().dependent_exceptions_tids) == 1
assert isinstance(t_final.exception().dependent_exceptions_tids[0][0], DependencyError)
assert t_final.exception().dependent_exceptions_tids[0][1].startswith("task ")


def test_fail_sequence_middle():
t1 = random_fail(fail_prob=0)
t2 = random_fail(fail_prob=1, inputs=[t1])
t_final = random_fail(fail_prob=0, inputs=[t2])

with pytest.raises(DependencyError):
t_final.result()

assert len(t_final.exception().dependent_exceptions_tids) == 1
assert isinstance(t_final.exception().dependent_exceptions_tids[0][0], ManufacturedTestFailure)
6 changes: 6 additions & 0 deletions parsl/tests/test_python_apps/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ def test_error():
f = outer_error()
e = f.exception()
assert isinstance(e, JoinError)

assert len(e.dependent_exceptions_tids) == 1
assert isinstance(e.dependent_exceptions_tids[0][0], InnerError)
assert e.dependent_exceptions_tids[0][1].startswith("task ")


def test_two_errors():
Expand All @@ -109,10 +112,12 @@ def test_two_errors():
de0 = e.dependent_exceptions_tids[0][0]
assert isinstance(de0, InnerError)
assert de0.args[0] == "Error A"
assert e.dependent_exceptions_tids[0][1].startswith("task ")

de1 = e.dependent_exceptions_tids[1][0]
assert isinstance(de1, InnerError)
assert de1.args[0] == "Error B"
assert e.dependent_exceptions_tids[1][1].startswith("task ")


def test_one_error_one_result():
Expand All @@ -125,6 +130,7 @@ def test_one_error_one_result():
de0 = e.dependent_exceptions_tids[0][0]
assert isinstance(de0, InnerError)
assert de0.args[0] == "Error A"
assert e.dependent_exceptions_tids[0][1].startswith("task ")


@join_app
Expand Down

0 comments on commit 784256d

Please sign in to comment.