Skip to content

Commit

Permalink
CORE-229:save a workflow message when aborting due to cost limit (#3171)
Browse files Browse the repository at this point in the history
* save a workflow message when aborting due to cost limit

* skip noop cost updates
  • Loading branch information
davidangb authored Jan 24, 2025
1 parent 44fc4ae commit 5131bf8
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,23 +323,38 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
.withName(costBreakdown.status)
)
) {
executionServiceCluster.abort(workflowRec, petUser).map {
executionServiceCluster.abort(workflowRec, petUser).flatMap {
case Success(abortedWfRec) =>
logger.info(
s"Aborted workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap."
)
Option(workflowRec.copy(status = abortedWfRec.status, cost = costBreakdown.cost.some))
datasource
.inTransaction { dataAccess =>
dataAccess.workflowQuery.saveMessages(
Seq(AttributeString("Cost limit reached. Workflow was aborted to prevent cost overrun.")),
workflowRec.id
)
}
.map { _ =>
logger.info(
s"Aborted workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap."
)
Option(workflowRec.copy(status = abortedWfRec.status, cost = costBreakdown.cost.some))
}
case Failure(t) =>
logger.error(
s"Failed to abort workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap. Error: ${t.getMessage}"
)
Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))
Future.successful(
Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))
)
}
} else {
// TODO CORE-217: don't update unless status or cost has actually changed?
// Do we need to incrementally update cost?
// If we track cost changes on every iteration, we're going to blow up the AUDIT_WORKFLOW_STATUS table
Future.successful(Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some)))
// don't update unless status or cost has actually changed
if (costBreakdown.status != workflowRec.status || Option(costBreakdown.cost) != workflowRec.cost) {
Future.successful(
Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))
)
} else {
Future.successful(None)
}
}
} yield updatedWorkflowRec
// fetch workflow status only if cost cap threshold is not defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,60 @@ class SubmissionMonitorSpec(_system: ActorSystem)
).foreach(rec => rec.cost shouldEqual (Option(BigDecimal(5))))
}

it should "skip updates to workflow costs if the cost or status hasn't changed" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val expensiveWorkflowId = UUID.randomUUID().toString
val expensiveWorkflow = Workflow(Some(expensiveWorkflowId),
WorkflowStatuses.Submitted,
new DateTime(),
Some(testData.sample2.toReference),
Seq.empty
)
val submission =
testData.submission1.copy(submissionId = UUID.randomUUID().toString, workflows = Seq(expensiveWorkflow))
runAndWait(submissionQuery.create(testData.workspace, submission))
runAndWait(updateWorkflowExecutionServiceKey("unittestdefault"))

class CostCapTestExecutionServiceDAO(status: String) extends SubmissionTestExecutionServiceDAO(status) {
override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] =
if (id.equals(expensiveWorkflowId)) {
Future.successful(WorkflowCostBreakdown(id, BigDecimal(3), "USD", status, Seq.empty))
} else {
Future.failed(new Exception("Unexpected workflow ID"))
}
}

val monitor = createSubmissionMonitor(
dataSource,
mockSamDAO,
mockGoogleServicesDAO,
submission,
testData.wsName,
new CostCapTestExecutionServiceDAO(WorkflowStatuses.Running.toString),
perWorkflowCostCap = Option(BigDecimal(99))
)

// Trigger a monitor pass. This first pass will update the workflow to Running/cost=3
// due to our CostCapTestExecutionServiceDAO.getCost override
val executionServiceStatusResponse = await(monitor.queryExecutionServiceForStatus())
val workflowsFirstPass = executionServiceStatusResponse.statusResponse.collect {
case Success(Some(recordWithOutputs)) => recordWithOutputs._1
}
workflowsFirstPass should have size 1
workflowsFirstPass.head.status shouldBe "Running"
workflowsFirstPass.head.cost should contain(BigDecimal(3))

// make sure the monitor processes the response of the first pass
await(monitor.handleStatusResponses(executionServiceStatusResponse))

// Trigger another monitor pass. This second pass will notice that nothing has changed about
// the workflow and will not return it.
val workflowsSecondPass = await(monitor.queryExecutionServiceForStatus()).statusResponse.collect {
case Success(Some(recordWithOutputs)) => recordWithOutputs._1
}
workflowsSecondPass should have size 0
}

it should "abort a workflow that has exceeded the per-workflow cost cap but not abort the entire submission" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val cheapWorkflowId = UUID.randomUUID().toString
Expand Down Expand Up @@ -1462,6 +1516,53 @@ class SubmissionMonitorSpec(_system: ActorSystem)
verify(spyExecutionServiceDAO, never()).abort(any[String], any[UserInfo])
}

it should "persist a message when aborting a workflow due to cost cap" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val expensiveWorkflowId = UUID.randomUUID().toString
val expensiveWorkflow = Workflow(Some(expensiveWorkflowId),
WorkflowStatuses.Submitted,
new DateTime(),
Some(testData.sample2.toReference),
Seq.empty
)
val submission =
testData.submission1.copy(submissionId = UUID.randomUUID().toString, workflows = Seq(expensiveWorkflow))
runAndWait(submissionQuery.create(testData.workspace, submission))
runAndWait(updateWorkflowExecutionServiceKey("unittestdefault"))

class CostCapTestExecutionServiceDAO(status: String) extends SubmissionTestExecutionServiceDAO(status) {
override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] =
if (id.equals(expensiveWorkflowId)) {
Future.successful(WorkflowCostBreakdown(id, BigDecimal(11), "USD", status, Seq.empty))
} else {
Future.failed(new Exception("Unexpected workflow ID"))
}
}

val monitor = createSubmissionMonitor(
dataSource,
mockSamDAO,
mockGoogleServicesDAO,
submission,
testData.wsName,
new CostCapTestExecutionServiceDAO(WorkflowStatuses.Running.toString),
perWorkflowCostCap = Option(BigDecimal(2))
)

// trigger a monitor pass, which should abort the workflow due to cost limit
val workflows = await(monitor.queryExecutionServiceForStatus()).statusResponse.collect {
case Success(Some(recordWithOutputs)) => recordWithOutputs._1
}
workflows should have size 1
// check the messages for the workflow
val actualMessages =
runAndWait(workflowQuery.get(workflows.head.id))
.getOrElse(fail())
.messages
actualMessages should have size 1
actualMessages.head.value shouldBe "Cost limit reached. Workflow was aborted to prevent cost overrun."
}

it should "handleOutputs which are unbound by ignoring them" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val unboundExprStr = AttributeString("")
Expand Down

0 comments on commit 5131bf8

Please sign in to comment.