Skip to content

Commit

Permalink
Merge pull request #320 from lsst/tickets/DM-47730
Browse files Browse the repository at this point in the history
DM-47730: add "success caveats" system to report on NoWorkFound and similar cases
  • Loading branch information
TallJimbo authored Jan 21, 2025
2 parents f3af793 + d771db7 commit 1d9d8a2
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 5 deletions.
4 changes: 4 additions & 0 deletions doc/changes/DM-47730.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Track "success caveats" like `NoWorkFound` in execution.

This adds additional information to the task metadata and additional summary reporting to `pipetask report --force-v2`.
It relies on changes in `lsst.pipe.base` for the `QuantumSuccessCaveats` flag enum and new logic in `QuantumProvenanceGraph`.
15 changes: 14 additions & 1 deletion python/lsst/ctrl/mpexec/cli/script/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from astropy.table import Table
from lsst.daf.butler import Butler
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base import QuantumGraph, QuantumSuccessCaveats
from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary

Expand Down Expand Up @@ -262,11 +262,19 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo
"Messages": quantum_summary.messages,
}
)
if len(task_summary.caveats) > 1:
caveats = "(multiple)"
elif len(task_summary.caveats) == 1:
((code, data_ids),) = task_summary.caveats.items()
caveats = f"{code}({len(data_ids)})"
else:
caveats = ""
quanta_table.append(
{
"Task": label,
"Unknown": task_summary.n_unknown,
"Successful": task_summary.n_successful,
"Caveats": caveats,
"Blocked": task_summary.n_blocked,
"Failed": task_summary.n_failed,
"Wonky": task_summary.n_wonky,
Expand Down Expand Up @@ -294,6 +302,11 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo
)
quanta = Table(quanta_table)
quanta.pprint_all()
print("")
print("Caveat codes:")
for k, v in QuantumSuccessCaveats.legend().items():
print(f"{k}: {v}")
print("")
# Dataset loop
dataset_table = []
cursed_datasets = []
Expand Down
15 changes: 14 additions & 1 deletion python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

__all__ = ("SimplePipelineExecutor",)

import datetime
import getpass
from collections.abc import Iterable, Iterator, Mapping
from typing import Any

Expand Down Expand Up @@ -382,7 +384,18 @@ def from_pipeline_graph(
quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
pipeline_graph, butler, where=where, bind=bind
)
quantum_graph = quantum_graph_builder.build(attach_datastore_records=attach_datastore_records)
metadata = {
"input": list(butler.collections.defaults),
"output_run": butler.run,
"skip_existing_in": [],
"skip_existing": False,
"data_query": where,
"user": getpass.getuser(),
"time": str(datetime.datetime.now()),
}
quantum_graph = quantum_graph_builder.build(
metadata=metadata, attach_datastore_records=attach_datastore_records
)
return cls(
quantum_graph=quantum_graph,
butler=butler,
Expand Down
20 changes: 18 additions & 2 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
NoWorkFound,
PipelineTask,
QuantumContext,
QuantumSuccessCaveats,
TaskFactory,
)
from lsst.pipe.base.pipeline_graph import TaskNode
Expand Down Expand Up @@ -230,6 +231,7 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
quantum.dataId,
str(exc),
)
quantumMetadata["caveats"] = QuantumSuccessCaveats.from_adjust_quantum_no_work().value
# Make empty metadata that looks something like what a
# do-nothing task would write (but we don't bother with empty
# nested PropertySets for subtasks). This is slightly
Expand Down Expand Up @@ -266,7 +268,7 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
task = self.taskFactory.makeTask(task_node, limited_butler, init_input_refs)
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
try:
self.runQuantum(task, quantum, task_node, limited_butler)
quantumMetadata["caveats"] = self.runQuantum(task, quantum, task_node, limited_butler).value
except Exception as e:
_LOG.error(
"Execution of task '%s' on quantum %s failed. Exception %s: %s",
Expand Down Expand Up @@ -473,7 +475,7 @@ def runQuantum(
task_node: TaskNode,
/,
limited_butler: LimitedButler,
) -> None:
) -> QuantumSuccessCaveats:
"""Execute task on a single quantum.
Parameters
Expand All @@ -486,7 +488,14 @@ def runQuantum(
Task definition structure.
limited_butler : `~lsst.daf.butler.LimitedButler`
Butler to use for dataset I/O.
Returns
-------
flags : `QuantumSuccessCaveats`
Flags that describe qualified successes.
"""
flags = QuantumSuccessCaveats.NO_CAVEATS

# Create a butler that operates in the context of a quantum
butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources)

Expand All @@ -504,6 +513,7 @@ def runQuantum(
quantum.dataId,
str(err),
)
flags |= err.FLAGS
except AnnotatedPartialOutputsError as caught:
error: BaseException
if caught.__cause__ is None:
Expand Down Expand Up @@ -531,6 +541,12 @@ def runQuantum(
quantum.dataId,
)
_LOG.error(error, exc_info=error)
flags |= caught.FLAGS
if not butlerQC.outputsPut:
flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
if not butlerQC.outputsPut == butlerQC.allOutputs:
flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
return flags

def writeMetadata(
self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
Expand Down
65 changes: 64 additions & 1 deletion tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
import unittest

import lsst.daf.butler
import lsst.pipe.base.quantum_provenance_graph as qpg
import lsst.utils.tests
from lsst.ctrl.mpexec import SimplePipelineExecutor
from lsst.pipe.base import PipelineGraph, RepeatableQuantumError
from lsst.pipe.base import PipelineGraph, QuantumSuccessCaveats, RepeatableQuantumError
from lsst.pipe.base.tests.mocks import (
DynamicConnectionConfig,
DynamicTestPipelineTask,
Expand Down Expand Up @@ -256,6 +257,68 @@ def test_partial_outputs_success(self):
(_, _) = executor.as_generator(register_dataset_types=True)
self.assertFalse(self.butler.exists("intermediate"))
self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict"))
prov = qpg.QuantumProvenanceGraph()
prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
(quantum_key_a,) = prov.quanta["a"]
quantum_info_a = prov.get_quantum_info(quantum_key_a)
self.assertEqual(
quantum_info_a["caveats"],
QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
| QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
| QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR,
)
(quantum_key_b,) = prov.quanta["b"]
quantum_info_b = prov.get_quantum_info(quantum_key_b)
self.assertEqual(quantum_info_b["caveats"], QuantumSuccessCaveats.NO_CAVEATS)

def test_no_work_found(self):
"""Test executing two quanta where the first raises
`NoWorkFound` in `runQuantum`, leading the next to raise `NoWorkFound`
in `adjustQuantum`.
"""
config_a = DynamicTestPipelineTaskConfig()
config_a.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
)
config_a.fail_exception = "lsst.pipe.base.NoWorkFound"
config_a.fail_condition = "1=1" # butler query expression that is true
config_a.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict"
)
config_b = DynamicTestPipelineTaskConfig()
config_b.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict"
)
config_b.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="output", storage_class="StructuredDataDict"
)
pipeline_graph = PipelineGraph()
pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
# Consider the partial a success and proceed.
executor = SimplePipelineExecutor.from_pipeline_graph(
pipeline_graph, butler=self.butler, raise_on_partial_outputs=False
)
(_, _) = executor.as_generator(register_dataset_types=True)
prov = qpg.QuantumProvenanceGraph()
prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
(quantum_key_a,) = prov.quanta["a"]
quantum_info_a = prov.get_quantum_info(quantum_key_a)
self.assertEqual(
quantum_info_a["caveats"],
QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
| QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
| QuantumSuccessCaveats.NO_WORK,
)
(quantum_key_b,) = prov.quanta["b"]
quantum_info_b = prov.get_quantum_info(quantum_key_b)
self.assertEqual(
quantum_info_b["caveats"],
QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
| QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
| QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED
| QuantumSuccessCaveats.NO_WORK,
)

def test_partial_outputs_failure(self):
"""Test executing two quanta where the first raises
Expand Down

0 comments on commit 1d9d8a2

Please sign in to comment.