diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index e9bf934cf1..5e91268057 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -484,24 +484,18 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional # now we know each joinable Future is done # so now look for any exceptions - exceptions_tids: List[Tuple[BaseException, Optional[str]]] + exceptions_tids: List[Tuple[BaseException, str]] exceptions_tids = [] if isinstance(joinable, Future): je = joinable.exception() if je is not None: - if hasattr(joinable, 'task_record'): - tid = joinable.task_record['id'] - else: - tid = None + tid = self.render_future_description(joinable) exceptions_tids = [(je, tid)] elif isinstance(joinable, list): for future in joinable: je = future.exception() if je is not None: - if hasattr(joinable, 'task_record'): - tid = joinable.task_record['id'] - else: - tid = None + tid = self.render_future_description(future) exceptions_tids.append((je, tid)) else: raise TypeError(f"Unknown joinable type {type(joinable)}") @@ -918,13 +912,7 @@ def _unwrap_futures(self, args: Sequence[Any], kwargs: Dict[str, Any]) \ dep_failures = [] def append_failure(e: Exception, dep: Future) -> None: - # If this Future is associated with a task inside this DFK, - # then refer to the task ID. - # Otherwise make a repr of the Future object. - if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self: - tid = "task " + repr(dep.task_record['id']) - else: - tid = repr(dep) + tid = self.render_future_description(dep) dep_failures.extend([(e, tid)]) # Replace item in args @@ -1076,10 +1064,7 @@ def submit(self, depend_descs = [] for d in depends: - if isinstance(d, AppFuture) or isinstance(d, DataFuture): - depend_descs.append("task {}".format(d.tid)) - else: - depend_descs.append(repr(d)) + depend_descs.append(self.render_future_description(d)) if depend_descs != []: waiting_message = "waiting on {}".format(", ".join(depend_descs)) @@ -1438,6 +1423,18 @@ def default_std_autopath(self, taskrecord, kw): '' if label is None else '_{}'.format(label), kw)) + def render_future_description(self, dep: Future) -> str: + """Renders a description of the future in the context of the + current DFK. + """ + if isinstance(dep, AppFuture) and dep.task_record['dfk'] == self: + tid = "task " + repr(dep.task_record['id']) + elif isinstance(dep, DataFuture): + tid = "DataFuture from task " + repr(dep.tid) + else: + tid = repr(dep) + return tid + class DataFlowKernelLoader: """Manage which DataFlowKernel is active. diff --git a/parsl/tests/test_python_apps/test_dep_standard_futures.py b/parsl/tests/test_python_apps/test_dep_standard_futures.py index 4856888d35..29bb6b2709 100644 --- a/parsl/tests/test_python_apps/test_dep_standard_futures.py +++ b/parsl/tests/test_python_apps/test_dep_standard_futures.py @@ -43,3 +43,6 @@ def test_future_fail_dependency(): # Future, plain_fut, somewhere in its str assert repr(plain_fut) in str(ex) + assert len(ex.dependent_exceptions_tids) == 1 + assert isinstance(ex.dependent_exceptions_tids[0][0], ValueError) + assert ex.dependent_exceptions_tids[0][1].startswith(" App2 ... -> AppN - """ + with pytest.raises(DependencyError): + t_final.result() - t1_fail_prob, t2_fail_prob = fail_probs - t1 = random_fail(fail_prob=t1_fail_prob) - t2 = random_fail(fail_prob=t2_fail_prob, inputs=[t1]) + assert len(t_final.exception().dependent_exceptions_tids) == 1 + assert isinstance(t_final.exception().dependent_exceptions_tids[0][0], DependencyError) + assert t_final.exception().dependent_exceptions_tids[0][1].startswith("task ") + + +def test_fail_sequence_middle(): + t1 = random_fail(fail_prob=0) + t2 = random_fail(fail_prob=1, inputs=[t1]) t_final = random_fail(fail_prob=0, inputs=[t2]) with pytest.raises(DependencyError): t_final.result() + + assert len(t_final.exception().dependent_exceptions_tids) == 1 + assert isinstance(t_final.exception().dependent_exceptions_tids[0][0], ManufacturedTestFailure) diff --git a/parsl/tests/test_python_apps/test_join.py b/parsl/tests/test_python_apps/test_join.py index dcb855a6f7..926d90bf63 100644 --- a/parsl/tests/test_python_apps/test_join.py +++ b/parsl/tests/test_python_apps/test_join.py @@ -97,7 +97,10 @@ def test_error(): f = outer_error() e = f.exception() assert isinstance(e, JoinError) + + assert len(e.dependent_exceptions_tids) == 1 assert isinstance(e.dependent_exceptions_tids[0][0], InnerError) + assert e.dependent_exceptions_tids[0][1].startswith("task ") def test_two_errors(): @@ -109,10 +112,12 @@ def test_two_errors(): de0 = e.dependent_exceptions_tids[0][0] assert isinstance(de0, InnerError) assert de0.args[0] == "Error A" + assert e.dependent_exceptions_tids[0][1].startswith("task ") de1 = e.dependent_exceptions_tids[1][0] assert isinstance(de1, InnerError) assert de1.args[0] == "Error B" + assert e.dependent_exceptions_tids[1][1].startswith("task ") def test_one_error_one_result(): @@ -125,6 +130,7 @@ def test_one_error_one_result(): de0 = e.dependent_exceptions_tids[0][0] assert isinstance(de0, InnerError) assert de0.args[0] == "Error A" + assert e.dependent_exceptions_tids[0][1].startswith("task ") @join_app