From 9c4e0f908012989fafa7dc670dd9830198d48940 Mon Sep 17 00:00:00 2001 From: Bria Morgan Date: Fri, 17 Jan 2025 10:47:12 -0500 Subject: [PATCH] [CORE-226] check for aborted/ing status before aborting costcap workflow (#3165) * check for aborted/ing status before aborting costcap workflow --- .../jobexec/SubmissionMonitorActor.scala | 9 ++- .../rawls/jobexec/SubmissionMonitorSpec.scala | 59 +++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala index a55073434e..3ecc5cd7fc 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala @@ -316,8 +316,13 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum for { costBreakdown <- executionServiceCluster.getCost(workflowRec, petUser) updatedWorkflowRec <- - // TODO CORE-217: if workflow is already in Aborted or Aborting, don't try to re-abort it - if (costBreakdown.cost > costCap) { + if ( + costBreakdown.cost > costCap && + WorkflowStatuses.abortableStatuses.contains( + WorkflowStatuses + .withName(costBreakdown.status) + ) + ) { executionServiceCluster.abort(workflowRec, petUser).map { case Success(abortedWfRec) => logger.info( diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala index 143477d8ee..4d63b9652a 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala @@ -23,6 +23,8 @@ import org.broadinstitute.dsde.rawls.util.MockitoTestUtils import org.broadinstitute.dsde.workbench.dataaccess.NotificationDAO import org.broadinstitute.dsde.workbench.model.WorkbenchEmail import org.joda.time.DateTime +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, never, spy, verify} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.flatspec.AnyFlatSpecLike @@ -1403,6 +1405,63 @@ class SubmissionMonitorSpec(_system: ActorSystem) ).getOrElse(fail()).status shouldBe SubmissionStatuses.Submitted } + it should "not re-abort a workflow that has exceeded the per-workflow cost cap but is already aborting" in withDefaultTestDatabase { + dataSource: SlickDataSource => + val cheapWorkflowId = UUID.randomUUID().toString + val expensiveWorkflowId = UUID.randomUUID().toString + val cheapWorkflow = Workflow(Some(cheapWorkflowId), + WorkflowStatuses.Submitted, + new DateTime(), + Some(testData.sample1.toReference), + Seq.empty + ) + val expensiveWorkflow = Workflow(Some(expensiveWorkflowId), + WorkflowStatuses.Submitted, + new DateTime(), + Some(testData.sample2.toReference), + Seq.empty + ) + val submission = testData.submission1.copy(submissionId = UUID.randomUUID().toString, + workflows = Seq(cheapWorkflow, expensiveWorkflow) + ) + runAndWait(submissionQuery.create(testData.workspace, submission)) + runAndWait(updateWorkflowExecutionServiceKey("unittestdefault")) + + class CostCapTestExecutionServiceDAO(status: String) extends SubmissionTestExecutionServiceDAO(status) { + override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] = + if (id.equals(cheapWorkflowId)) { + Future.successful(WorkflowCostBreakdown(id, BigDecimal(1), "USD", status, Seq.empty)) + } else if (id.equals(expensiveWorkflowId)) { + Future.successful( + WorkflowCostBreakdown(id, BigDecimal(11), "USD", WorkflowStatuses.Aborting.toString, Seq.empty) + ) + } else { + Future.failed(new Exception("Unexpected workflow ID")) + } + } + + val executionServiceDAO = new CostCapTestExecutionServiceDAO(WorkflowStatuses.Running.toString) + + val spyExecutionServiceDAO = spy(executionServiceDAO) + + val monitor = createSubmissionMonitor( + dataSource, + mockSamDAO, + mockGoogleServicesDAO, + submission, + testData.wsName, + spyExecutionServiceDAO, + perWorkflowCostCap = Option(BigDecimal(2)) + ) + + val workflowCosts = await(monitor.queryExecutionServiceForStatus()).statusResponse.collect { + case Success(Some(recordWithOutputs)) => + recordWithOutputs._1.externalId.get -> (recordWithOutputs._1.status, recordWithOutputs._1.cost) + }.toMap + + verify(spyExecutionServiceDAO, never()).abort(any[String], any[UserInfo]) + } + it should "handleOutputs which are unbound by ignoring them" in withDefaultTestDatabase { dataSource: SlickDataSource => val unboundExprStr = AttributeString("")