Skip to content

Commit

Permalink
[CORE-226] check for aborted/ing status before aborting costcap workf…
Browse files Browse the repository at this point in the history
…low (#3165)

* check for aborted/ing status before aborting costcap workflow
  • Loading branch information
calypsomatic authored Jan 17, 2025
1 parent 88dbae0 commit 9c4e0f9
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,13 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
for {
costBreakdown <- executionServiceCluster.getCost(workflowRec, petUser)
updatedWorkflowRec <-
// TODO CORE-217: if workflow is already in Aborted or Aborting, don't try to re-abort it
if (costBreakdown.cost > costCap) {
if (
costBreakdown.cost > costCap &&
WorkflowStatuses.abortableStatuses.contains(
WorkflowStatuses
.withName(costBreakdown.status)
)
) {
executionServiceCluster.abort(workflowRec, petUser).map {
case Success(abortedWfRec) =>
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.broadinstitute.dsde.rawls.util.MockitoTestUtils
import org.broadinstitute.dsde.workbench.dataaccess.NotificationDAO
import org.broadinstitute.dsde.workbench.model.WorkbenchEmail
import org.joda.time.DateTime
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, never, spy, verify}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpecLike
Expand Down Expand Up @@ -1403,6 +1405,63 @@ class SubmissionMonitorSpec(_system: ActorSystem)
).getOrElse(fail()).status shouldBe SubmissionStatuses.Submitted
}

it should "not re-abort a workflow that has exceeded the per-workflow cost cap but is already aborting" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val cheapWorkflowId = UUID.randomUUID().toString
val expensiveWorkflowId = UUID.randomUUID().toString
val cheapWorkflow = Workflow(Some(cheapWorkflowId),
WorkflowStatuses.Submitted,
new DateTime(),
Some(testData.sample1.toReference),
Seq.empty
)
val expensiveWorkflow = Workflow(Some(expensiveWorkflowId),
WorkflowStatuses.Submitted,
new DateTime(),
Some(testData.sample2.toReference),
Seq.empty
)
val submission = testData.submission1.copy(submissionId = UUID.randomUUID().toString,
workflows = Seq(cheapWorkflow, expensiveWorkflow)
)
runAndWait(submissionQuery.create(testData.workspace, submission))
runAndWait(updateWorkflowExecutionServiceKey("unittestdefault"))

class CostCapTestExecutionServiceDAO(status: String) extends SubmissionTestExecutionServiceDAO(status) {
override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] =
if (id.equals(cheapWorkflowId)) {
Future.successful(WorkflowCostBreakdown(id, BigDecimal(1), "USD", status, Seq.empty))
} else if (id.equals(expensiveWorkflowId)) {
Future.successful(
WorkflowCostBreakdown(id, BigDecimal(11), "USD", WorkflowStatuses.Aborting.toString, Seq.empty)
)
} else {
Future.failed(new Exception("Unexpected workflow ID"))
}
}

val executionServiceDAO = new CostCapTestExecutionServiceDAO(WorkflowStatuses.Running.toString)

val spyExecutionServiceDAO = spy(executionServiceDAO)

val monitor = createSubmissionMonitor(
dataSource,
mockSamDAO,
mockGoogleServicesDAO,
submission,
testData.wsName,
spyExecutionServiceDAO,
perWorkflowCostCap = Option(BigDecimal(2))
)

val workflowCosts = await(monitor.queryExecutionServiceForStatus()).statusResponse.collect {
case Success(Some(recordWithOutputs)) =>
recordWithOutputs._1.externalId.get -> (recordWithOutputs._1.status, recordWithOutputs._1.cost)
}.toMap

verify(spyExecutionServiceDAO, never()).abort(any[String], any[UserInfo])
}

it should "handleOutputs which are unbound by ignoring them" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val unboundExprStr = AttributeString("")
Expand Down

0 comments on commit 9c4e0f9

Please sign in to comment.