Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45340: Improve exception handling in quantum executors #297

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,26 @@
CliLog.replayConfigState(logConfigState)

quantum = pickle.loads(quantum_pickle)
report: QuantumReport | None = None
try:
quantumExecutor.execute(task_node, quantum)
_, report = quantumExecutor.execute(task_node, quantum)
except Exception as exc:
_LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc)
report = QuantumReport.from_exception(
exception=exc,
dataId=quantum.dataId,
taskLabel=task_node.label,
)
raise
andy-slac marked this conversation as resolved.
Show resolved Hide resolved
finally:
# If sending fails we do not want this new exception to be exposed.
try:
report = quantumExecutor.getReport()
snd_conn.send(report)
except Exception:
pass
if report is not None:
# If sending fails we do not want this new exception to be
# exposed.
try:
_LOG.debug("sending report for task %s dataId %s", task_node.label, quantum.dataId)
snd_conn.send(report)
except Exception:
pass

Check warning on line 189 in python/lsst/ctrl/mpexec/mpGraphExecutor.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/mpGraphExecutor.py#L188-L189

Added lines #L188 - L189 were not covered by tests

def stop(self) -> None:
"""Stop the process."""
Expand Down Expand Up @@ -480,9 +491,18 @@

_LOG.debug("Executing %s", qnode)
try:
self.quantumExecutor.execute(task_node, qnode.quantum)
_, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
except Exception as exc:
quantum_report = QuantumReport.from_exception(
exception=exc,
dataId=qnode.quantum.dataId,
taskLabel=task_node.label,
)
report.quantaReports.append(quantum_report)

if self.pdb and sys.stdin.isatty() and sys.stdout.isatty():
_LOG.error(
"Task <%s dataId=%s> failed; dropping into pdb.",
Expand Down Expand Up @@ -522,10 +542,6 @@
# times, run a collection loop here explicitly.
gc.collect()

quantum_report = self.quantumExecutor.getReport()
if quantum_report:
report.quantaReports.append(quantum_report)

_LOG.info(
"Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.",
successCount,
Expand Down
25 changes: 7 additions & 18 deletions python/lsst/ctrl/mpexec/quantumGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class QuantumExecutor(ABC):
"""

@abstractmethod
def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum:
def execute(
self, task_node: TaskNode | TaskDef, /, quantum: Quantum
) -> tuple[Quantum, QuantumReport | None]:
"""Execute single quantum.

Parameters
Expand All @@ -66,6 +68,10 @@ def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum
-------
quantum : `~lsst.daf.butler.Quantum`
The quantum actually executed.
report : `~lsst.ctrl.mpexec.QuantumReport`
Structure describing the status of the execution of a quantum.
`None` is returned if implementation does not support this
feature.

Notes
-----
Expand All @@ -74,23 +80,6 @@ def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum
"""
raise NotImplementedError()

def getReport(self) -> QuantumReport | None:
"""Return execution report from last call to `execute`.

Returns
-------
report : `~lsst.ctrl.mpexec.QuantumReport`
Structure describing the status of the execution of a quantum.
`None` is returned if implementation does not support this
feature.

Raises
------
RuntimeError
Raised if this method is called before `execute`.
"""
return None


class QuantumGraphExecutor(ABC):
"""Class which abstracts QuantumGraph execution.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,5 +414,5 @@ def as_generator(
# which might be useful for callers who want to check the state of the
# repo in between.
return (
single_quantum_executor.execute(qnode.task_node, qnode.quantum) for qnode in self.quantum_graph
single_quantum_executor.execute(qnode.task_node, qnode.quantum)[0] for qnode in self.quantum_graph
)
26 changes: 6 additions & 20 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def __init__(
self.clobberOutputs = clobberOutputs
self.exitOnKnownError = exitOnKnownError
self.limited_butler_factory = limited_butler_factory
self.report: QuantumReport | None = None
self.resources = resources

if self.butler is None:
Expand All @@ -164,26 +163,19 @@ def __init__(
collectionTypes=CollectionType.RUN,
)

def execute(self, task_node: TaskDef | TaskNode, /, quantum: Quantum) -> Quantum:
def execute(
self, task_node: TaskNode | TaskDef, /, quantum: Quantum
) -> tuple[Quantum, QuantumReport | None]:
# Docstring inherited from QuantumExecutor.execute
assert quantum.dataId is not None, "Quantum DataId cannot be None"

task_node = self._conform_task_def(task_node)
if self.butler is not None:
self.butler.registry.refresh()

# Catch any exception and make a report based on that.
try:
result = self._execute(task_node, quantum)
self.report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result
except Exception as exc:
self.report = QuantumReport.from_exception(
exception=exc,
dataId=quantum.dataId,
taskLabel=task_node.label,
)
raise
kfindeisen marked this conversation as resolved.
Show resolved Hide resolved
result = self._execute(task_node, quantum)
report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result, report

def _conform_task_def(self, task_node: TaskDef | TaskNode) -> TaskNode:
"""Convert the given object to a TaskNode and emit a deprecation
Expand Down Expand Up @@ -589,9 +581,3 @@ def initGlobals(self, quantum: Quantum) -> None:
else:
oneInstrument = instrument
Instrument.fromName(instrument, self.butler.registry)

def getReport(self) -> QuantumReport | None:
# Docstring inherited from base class
if self.report is None:
raise RuntimeError("getReport() called before execute()")
return self.report
Loading
Loading