From 3edee4fb3832cd2bafd256b4b1731f508f105d38 Mon Sep 17 00:00:00 2001 From: Marc Talbott Date: Fri, 22 Nov 2024 15:59:29 -0500 Subject: [PATCH] [CORE-172] Change submission cost cap to per-workflow instead of aggregate submission cost (#3140) --- .../dsde/rawls/liquibase/changelog.xml | 1 + ...0_rename_submission_cost_cap_threshold.xml | 6 ++ .../slick/SubmissionComponent.scala | 16 ++-- .../jobexec/SubmissionMonitorActor.scala | 71 +++++++++--------- .../rawls/jobexec/SubmissionSupervisor.scala | 16 ++-- .../SubmissionRequestValidation.scala | 24 +++--- .../submissions/SubmissionsService.scala | 2 +- .../rawls/jobexec/SubmissionMonitorSpec.scala | 73 +++++++++++-------- .../SubmissionValidationSpec.scala | 19 ++--- .../dsde/rawls/model/ExecutionModel.scala | 10 +-- .../dsde/rawls/model/ExecutionModelSpec.scala | 16 ++-- 11 files changed, 136 insertions(+), 118 deletions(-) create mode 100644 core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20241120_rename_submission_cost_cap_threshold.xml diff --git a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml index b11b05bf0d..cf7eb7de97 100644 --- a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml +++ b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml @@ -128,4 +128,5 @@ + diff --git a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20241120_rename_submission_cost_cap_threshold.xml b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20241120_rename_submission_cost_cap_threshold.xml new file mode 100644 index 0000000000..ab76e128d3 --- /dev/null +++ b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20241120_rename_submission_cost_cap_threshold.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/SubmissionComponent.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/SubmissionComponent.scala index 093c786738..3d90746956 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/SubmissionComponent.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/SubmissionComponent.scala @@ -41,7 +41,7 @@ case class SubmissionRecord(id: UUID, monitoringScript: Option[String], monitoringImage: Option[String], monitoringImageScript: Option[String], - costCapThreshold: Option[BigDecimal] + perWorkflowCostCap: Option[BigDecimal] ) case class SubmissionValidationRecord(id: Long, workflowId: Long, errorText: Option[String], inputName: String) @@ -80,7 +80,7 @@ trait SubmissionComponent { def monitoringScript = column[Option[String]]("MONITORING_SCRIPT") def monitoringImage = column[Option[String]]("MONITORING_IMAGE") def monitoringImageScript = column[Option[String]]("MONITORING_IMAGE_SCRIPT") - def costCapThreshold = column[Option[BigDecimal]]("COST_CAP_THRESHOLD") + def perWorkflowCostCap = column[Option[BigDecimal]]("PER_WORKFLOW_COST_CAP") def * = ( id, @@ -103,7 +103,7 @@ trait SubmissionComponent { monitoringScript, monitoringImage, monitoringImageScript, - costCapThreshold + perWorkflowCostCap ) <> (SubmissionRecord.tupled, SubmissionRecord.unapply) def workspace = foreignKey("FK_SUB_WORKSPACE", workspaceId, workspaceQuery)(_.id) @@ -299,16 +299,16 @@ trait SubmissionComponent { }) ) - def listActiveSubmissionIdsWithWorkspaceAndCostCapThreshold( + def listActiveSubmissionIdsWithWorkspaceAndPerWorkflowCostCap( limit: FiniteDuration ): ReadAction[Seq[(UUID, WorkspaceName, Option[BigDecimal])]] = { // Exclude submissions from monitoring if they are ancient/stuck [WX-820] val cutoffTime = new Timestamp(DateTime.now().minusDays(limit.toDays.toInt).getMillis) val query = findActiveSubmissionsAfterTime(cutoffTime) join workspaceQuery on (_.workspaceId === _.id) - val result = query.map { case (sub, ws) => (sub.id, ws.namespace, ws.name, sub.costCapThreshold) }.result + val result = query.map { case (sub, ws) => (sub.id, ws.namespace, ws.name, sub.perWorkflowCostCap) }.result result.map(rows => - rows.map { case (subId, wsNs, wsName, costCapThreshold) => - (subId, WorkspaceName(wsNs, wsName), costCapThreshold) + rows.map { case (subId, wsNs, wsName, perWorkflowCostCap) => + (subId, WorkspaceName(wsNs, wsName), perWorkflowCostCap) } ) } @@ -513,7 +513,7 @@ trait SubmissionComponent { submission.monitoringScript, submission.monitoringImage, submission.monitoringImageScript, - submission.costCapThreshold + submission.perWorkflowCostCap ) private def unmarshalSubmission(submissionRec: SubmissionRecord, diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala index 23853e62bc..98831edc8d 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala @@ -68,7 +68,7 @@ object SubmissionMonitorActor { config: SubmissionMonitorConfig, queryTimeout: Duration, workbenchMetricBaseName: String, - costCapThreshold: Option[BigDecimal] = None + perWorkflowCostCap: Option[BigDecimal] = None ): Props = Props( new SubmissionMonitorActor( @@ -82,7 +82,7 @@ object SubmissionMonitorActor { config, queryTimeout, workbenchMetricBaseName, - costCapThreshold + perWorkflowCostCap ) ) @@ -126,7 +126,7 @@ class SubmissionMonitorActor(val workspaceName: WorkspaceName, val config: SubmissionMonitorConfig, val queryTimeout: Duration, override val workbenchMetricBaseName: String, - val costCapThreshold: Option[BigDecimal] + val perWorkflowCostCap: Option[BigDecimal] ) extends Actor with SubmissionMonitor with LazyLogging { @@ -188,7 +188,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum val executionServiceCluster: ExecutionServiceCluster val config: SubmissionMonitorConfig val queryTimeout: Duration - val costCapThreshold: Option[BigDecimal] + val perWorkflowCostCap: Option[BigDecimal] // Cache these metric builders since they won't change for this SubmissionMonitor protected lazy val workspaceMetricBuilder: ExpandedMetricBuilder = @@ -310,19 +310,36 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum private def execServiceStatus(workflowRec: WorkflowRecord, petUser: UserInfo)(implicit executionContext: ExecutionContext ): Future[Option[WorkflowRecord]] = - workflowRec.externalId match { + (workflowRec.externalId, perWorkflowCostCap) match { // fetch cost information for the workflow if submission has a cost cap threshold defined - case Some(externalId) if costCapThreshold.isDefined => - executionServiceCluster.getCost(workflowRec, petUser).map { costBreakdown => - Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some)) - } + case (Some(externalId), Some(costCap)) => + for { + costBreakdown <- executionServiceCluster.getCost(workflowRec, petUser) + updatedWorkflowRec <- + if (costBreakdown.cost > costCap) { + executionServiceCluster.abort(workflowRec, petUser).map { + case Success(abortedWfRec) => + logger.info( + s"Aborted workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap." + ) + Option(workflowRec.copy(status = abortedWfRec.status, cost = costBreakdown.cost.some)) + case Failure(t) => + logger.error( + s"Failed to abort workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap. Error: ${t.getMessage}" + ) + Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some)) + } + } else { + Future.successful(Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))) + } + } yield updatedWorkflowRec // fetch workflow status only if cost cap threshold is not defined - case Some(externalId) => + case (Some(externalId), None) => executionServiceCluster.status(workflowRec, petUser).map { newStatus => if (newStatus.status != workflowRec.status) Option(workflowRec.copy(status = newStatus.status)) else None } - case None => Future.successful(None) + case _ => Future.successful(None) } private def execServiceOutputs(workflowRec: WorkflowRecord, petUser: UserInfo)(implicit @@ -461,7 +478,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum if (doRecordUpdate) { for { updateResult <- - if (costCapThreshold.isDefined) { + if (perWorkflowCostCap.isDefined) { dataAccess.workflowQuery.updateStatusAndCost(currentRec, WorkflowStatuses.withName(workflowRec.status), workflowRec.cost.getOrElse(BigDecimal(0)) @@ -582,24 +599,12 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum */ def updateSubmissionStatus( dataAccess: DataAccess - )(implicit executionContext: ExecutionContext): ReadWriteAction[Boolean] = { - val workflowRecsAction = if (costCapThreshold.isDefined) { - dataAccess.workflowQuery.listWorkflowRecsForSubmission(submissionId) - } else { - dataAccess.workflowQuery.listWorkflowRecsForSubmissionAndStatuses( - submissionId, - (WorkflowStatuses.queuedStatuses ++ WorkflowStatuses.runningStatuses): _* - ) - } - - workflowRecsAction.flatMap { workflowRecs => - val nonTerminalWorkflows = - if (costCapThreshold.isDefined) - workflowRecs - .filterNot(wf => WorkflowStatuses.terminalStatuses.contains(WorkflowStatuses.withName(wf.status))) - else workflowRecs - - if (nonTerminalWorkflows.isEmpty) { + )(implicit executionContext: ExecutionContext): ReadWriteAction[Boolean] = + dataAccess.workflowQuery.listWorkflowRecsForSubmissionAndStatuses( + submissionId, + (WorkflowStatuses.queuedStatuses ++ WorkflowStatuses.runningStatuses): _* + ) flatMap { workflowRecs => + if (workflowRecs.isEmpty) { dataAccess.submissionQuery.findById(submissionId).map(_.status).result.head.flatMap { status => val finalStatus = SubmissionStatuses.withName(status) match { case SubmissionStatuses.Aborting => SubmissionStatuses.Aborted @@ -612,16 +617,10 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum logger.debug(s"submission $submissionId terminating to status $newStatus") dataAccess.submissionQuery.updateStatus(submissionId, newStatus) } map (_ => true) - } else if (costCapThreshold.isDefined && costCapThreshold.get <= workflowRecs.flatMap(_.cost).sum) { - logger.info( - s"Submission $submissionId exceeded its cost cap and will be aborted. [costCap=${costCapThreshold.get},currentSubmissionCost=${workflowRecs.flatMap(_.cost).sum}]" - ) - dataAccess.submissionQuery.updateStatus(submissionId, SubmissionStatuses.Aborting).map(_ => false) } else { DBIO.successful(false) } } - } def handleOutputs(workflowsWithOutputs: Seq[(WorkflowRecord, ExecutionServiceOutputs)], dataAccess: DataAccess, diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala index 1227c36877..c0dbe8d633 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal object SubmissionSupervisor { sealed trait SubmissionSupervisorMessage - case class SubmissionStarted(workspaceName: WorkspaceName, submissionId: UUID, costCapThreshold: Option[BigDecimal]) + case class SubmissionStarted(workspaceName: WorkspaceName, submissionId: UUID, perWorkflowCostCap: Option[BigDecimal]) case object StartMonitorPass case object SubmissionMonitorPassComplete @@ -133,8 +133,8 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster, override def receive = { case StartMonitorPass => startMonitoringNewSubmissions pipeTo self - case SubmissionStarted(workspaceName, submissionId, costCapThreshold) => - val child = startSubmissionMonitor(workspaceName, submissionId, costCapThreshold) + case SubmissionStarted(workspaceName, submissionId, perWorkflowCostCap) => + val child = startSubmissionMonitor(workspaceName, submissionId, perWorkflowCostCap) scheduleNextCheckCurrentWorkflowStatus(child) registerDetailedJobExecGauges(workspaceName, submissionId) @@ -181,7 +181,7 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster, private def startSubmissionMonitor(workspaceName: WorkspaceName, submissionId: UUID, - costCapThreshold: Option[BigDecimal] + perWorkflowCostCap: Option[BigDecimal] ) = actorOf( SubmissionMonitorActor @@ -196,7 +196,7 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster, submissionMonitorConfig, entityQueryTimeout, workbenchMetricBaseName, - costCapThreshold + perWorkflowCostCap ) .withDispatcher("submission-monitor-dispatcher"), submissionId.toString @@ -222,15 +222,15 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster, val monitoredSubmissions = context.children.map(_.path.name).toSet datasource.inTransaction { dataAccess => - dataAccess.submissionQuery.listActiveSubmissionIdsWithWorkspaceAndCostCapThreshold(limit = + dataAccess.submissionQuery.listActiveSubmissionIdsWithWorkspaceAndPerWorkflowCostCap(limit = submissionMonitorConfig.submissionPollExpiration ) map { activeSubs => val unmonitoredSubmissions = activeSubs.filterNot { case (subId, _, _) => monitoredSubmissions.contains(subId.toString) } - unmonitoredSubmissions.foreach { case (subId, wsName, costCapThreshold) => - self ! SubmissionStarted(wsName, subId, costCapThreshold) + unmonitoredSubmissions.foreach { case (subId, wsName, perWorkflowCostCap) => + self ! SubmissionStarted(wsName, subId, perWorkflowCostCap) } SubmissionMonitorPassComplete } diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionRequestValidation.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionRequestValidation.scala index 9941bbe006..363a805c62 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionRequestValidation.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionRequestValidation.scala @@ -19,8 +19,8 @@ object SubmissionRequestValidation extends StringValidationUtils { implicit val errorReportSource: ErrorReportSource = ErrorReportSource("rawls") - val COST_CAP_THRESHOLD_SCALE = 2; - val COST_CAP_THRESHOLD_PRECISION = 10; + val PER_WORKFLOW_COST_CAP_SCALE = 2; + val PER_WORKFLOW_COST_CAP_PRECISION = 10; // Note: this limit is also hard-coded in the terra-ui code to allow client-side validation. // If it is changed, it must also be updated in that repository. @@ -32,7 +32,7 @@ object SubmissionRequestValidation extends StringValidationUtils { def staticValidation(submission: SubmissionRequest, methodConfig: MethodConfiguration): Unit = { val errors = List( - validateCostCapThreshold(submission), + validatePerWorkflowCostCap(submission), validateEntityNameAndType(submission), validateMethodConfigRootEntity(submission, methodConfig), validateEntityAndDataReference(submission, methodConfig), @@ -45,19 +45,19 @@ object SubmissionRequestValidation extends StringValidationUtils { ) } - def validateCostCapThreshold(submissionRequest: SubmissionRequest): List[ErrorReport] = List( - submissionRequest.costCapThreshold.flatMap { threshold => - if (threshold.sign <= 0) Some(ErrorReport("costCapThreshold must be greater than zero")) else None + def validatePerWorkflowCostCap(submissionRequest: SubmissionRequest): List[ErrorReport] = List( + submissionRequest.perWorkflowCostCap.flatMap { threshold => + if (threshold.sign <= 0) Some(ErrorReport("perWorkflowCostCap must be greater than zero")) else None }, - submissionRequest.costCapThreshold.flatMap { threshold => - if (threshold.scale > COST_CAP_THRESHOLD_SCALE) { + submissionRequest.perWorkflowCostCap.flatMap { threshold => + if (threshold.scale > PER_WORKFLOW_COST_CAP_SCALE) { // TODO: improve messages - Some(ErrorReport(s"costCapThreshold scale is limited to $COST_CAP_THRESHOLD_SCALE decimal places")) + Some(ErrorReport(s"perWorkflowCostCap scale is limited to $PER_WORKFLOW_COST_CAP_SCALE decimal places")) } else None }, - submissionRequest.costCapThreshold.flatMap { threshold => - if (threshold.precision > COST_CAP_THRESHOLD_PRECISION) - Some(ErrorReport(s"costCapThreshold cannot be greater than $COST_CAP_THRESHOLD_PRECISION total digits")) + submissionRequest.perWorkflowCostCap.flatMap { threshold => + if (threshold.precision > PER_WORKFLOW_COST_CAP_PRECISION) + Some(ErrorReport(s"perWorkflowCostCap cannot be greater than $PER_WORKFLOW_COST_CAP_PRECISION total digits")) else None } ).flatten diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionsService.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionsService.scala index fa11798930..6e2e0dc6d5 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionsService.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionsService.scala @@ -808,7 +808,7 @@ class SubmissionsService( monitoringScript = submissionRequest.monitoringScript, monitoringImage = submissionRequest.monitoringImage, monitoringImageScript = submissionRequest.monitoringImageScript, - costCapThreshold = submissionRequest.costCapThreshold + perWorkflowCostCap = submissionRequest.perWorkflowCostCap ) logAndCreateDbSubmission(workspaceContext, submissionId, submission, dataAccess) diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala index 6be65d99f9..30a6af9349 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala @@ -11,10 +11,7 @@ import org.broadinstitute.dsde.rawls.coordination.{DataSourceAccess, Uncoordinat import org.broadinstitute.dsde.rawls.dataaccess._ import org.broadinstitute.dsde.rawls.dataaccess.slick.{TestDriverComponent, WorkflowRecord} import org.broadinstitute.dsde.rawls.expressions.{BoundOutputExpression, OutputExpression} -import org.broadinstitute.dsde.rawls.jobexec.SubmissionMonitorActor.{ - ExecutionServiceStatusResponse, - StatusCheckComplete -} +import org.broadinstitute.dsde.rawls.jobexec.SubmissionMonitorActor.{ExecutionServiceStatusResponse, StatusCheckComplete} import org.broadinstitute.dsde.rawls.metrics.RawlsStatsDTestUtils import org.broadinstitute.dsde.rawls.mock.{MockSamDAO, RemoteServicesMockServer} import org.broadinstitute.dsde.rawls.model._ @@ -22,12 +19,15 @@ import org.broadinstitute.dsde.rawls.monitor.HealthMonitor import org.broadinstitute.dsde.rawls.util.MockitoTestUtils import org.broadinstitute.dsde.workbench.dataaccess.NotificationDAO import org.broadinstitute.dsde.workbench.model.WorkbenchEmail +import org.joda.time.DateTime import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers +import spray.json.JsObject import java.util.UUID +import scala.Option import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} @@ -164,7 +164,7 @@ class SubmissionMonitorSpec(_system: ActorSystem) (WorkflowStatuses.runningStatuses.toSet ++ WorkflowStatuses.terminalStatuses -- Set(WorkflowStatuses.Succeeded, WorkflowStatuses.Submitted )).foreach { status => - it should s"queryExecutionServiceForCostAndStatus $status if costCapThreshold is defined" in withDefaultTestDatabase { + it should s"queryExecutionServiceForCostAndStatus $status if perWorkflowCostCap is defined" in withDefaultTestDatabase { dataSource: SlickDataSource => val monitor = createSubmissionMonitor( dataSource, @@ -173,7 +173,7 @@ class SubmissionMonitorSpec(_system: ActorSystem) testData.submission1, testData.wsName, new SubmissionTestExecutionServiceDAO(status.toString, BigDecimal(5)), - costCapThreshold = Option(BigDecimal(10.00)) + perWorkflowCostCap = Option(BigDecimal(10.00)) ) val workflowsRecs = @@ -1320,7 +1320,7 @@ class SubmissionMonitorSpec(_system: ActorSystem) testData.submissionUpdateEntity, testData.wsName, new SubmissionTestExecutionServiceDAO(WorkflowStatuses.Running.toString, BigDecimal(5)), - costCapThreshold = Option(BigDecimal(10)) + perWorkflowCostCap = Option(BigDecimal(10)) ) val workflowsRecs = runAndWait( workflowQuery.listWorkflowRecsForSubmission(UUID.fromString(testData.submissionUpdateEntity.submissionId)) @@ -1344,37 +1344,48 @@ class SubmissionMonitorSpec(_system: ActorSystem) ).foreach(rec => rec.cost shouldEqual (Option(BigDecimal(5)))) } - it should "abort a submission that has exceeded its cost cap" in withDefaultTestDatabase { + it should "abort a workflow that has exceeded the per-workflow cost cap but not abort the entire submission" in withDefaultTestDatabase { dataSource: SlickDataSource => + val cheapWorkflowId = UUID.randomUUID().toString + val expensiveWorkflowId = UUID.randomUUID().toString + val cheapWorkflow = Workflow(Some(cheapWorkflowId), WorkflowStatuses.Submitted, new DateTime(), Some(testData.sample1.toReference), Seq.empty) + val expensiveWorkflow = Workflow(Some(expensiveWorkflowId), WorkflowStatuses.Submitted, new DateTime(), Some(testData.sample2.toReference), Seq.empty) + val submission = testData.submission1.copy(submissionId = UUID.randomUUID().toString, workflows = Seq(cheapWorkflow, expensiveWorkflow)) + runAndWait(submissionQuery.create(testData.workspace, submission)) + runAndWait(updateWorkflowExecutionServiceKey("unittestdefault")) + + class CostCapTestExecutionServiceDAO(status: String) extends SubmissionTestExecutionServiceDAO(status) { + override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] = { + if (id.equals(cheapWorkflowId)) { + Future.successful(WorkflowCostBreakdown(id, BigDecimal(1), "USD", status, Seq.empty)) + } else if (id.equals(expensiveWorkflowId)) { + Future.successful(WorkflowCostBreakdown(id, BigDecimal(11), "USD", status, Seq.empty)) + } else { + Future.failed(new Exception("Unexpected workflow ID")) + } + } + } + val monitor = createSubmissionMonitor( dataSource, mockSamDAO, mockGoogleServicesDAO, - testData.submissionUpdateEntity, + submission, testData.wsName, - new SubmissionTestExecutionServiceDAO(WorkflowStatuses.Running.toString, BigDecimal(5)), - costCapThreshold = Option(BigDecimal(2)) - ) - val workflowsRecs = runAndWait( - workflowQuery.listWorkflowRecsForSubmission(UUID.fromString(testData.submissionUpdateEntity.submissionId)) + new CostCapTestExecutionServiceDAO(WorkflowStatuses.Running.toString), + perWorkflowCostCap = Option(BigDecimal(2)) ) - assertResult(StatusCheckComplete(false)) { - await( - monitor.handleStatusResponses( - ExecutionServiceStatusResponse( - workflowsRecs.map(r => - scala.util.Success( - Option((r.copy(status = WorkflowStatuses.Running.toString, cost = Option(BigDecimal(5))), None)) - ) - ) - ) - ) - ) - } + val workflowCosts = await(monitor.queryExecutionServiceForStatus()).statusResponse.collect { + case Success(Some(recordWithOutputs)) => recordWithOutputs._1.externalId.get -> (recordWithOutputs._1.status, recordWithOutputs._1.cost) + }.toMap + + workflowCosts(cheapWorkflowId) shouldEqual (WorkflowStatuses.Running.toString, Option(BigDecimal(1))) + workflowCosts(expensiveWorkflowId) shouldEqual (WorkflowStatuses.Aborting.toString, Option(BigDecimal(11))) + runAndWait( submissionQuery.loadSubmission(UUID.fromString(testData.submissionUpdateEntity.submissionId)) - ).getOrElse(fail()).status shouldBe SubmissionStatuses.Aborting + ).getOrElse(fail()).status shouldBe SubmissionStatuses.Submitted } it should "handleOutputs which are unbound by ignoring them" in withDefaultTestDatabase { @@ -2021,7 +2032,7 @@ class SubmissionMonitorSpec(_system: ActorSystem) wsName: WorkspaceName, execSvcDAO: ExecutionServiceDAO, attributesPerWorkflow: Int = 10, - costCapThreshold: Option[BigDecimal] = None + perWorkflowCostCap: Option[BigDecimal] = None ): SubmissionMonitor = { val config = SubmissionMonitorConfig(1 minutes, 30 days, true, attributesPerWorkflow, true) new TestSubmissionMonitor( @@ -2036,7 +2047,7 @@ class SubmissionMonitorSpec(_system: ActorSystem) config, ConfigFactory.load().getDuration("entities.queryTimeout").toScala, "test", - costCapThreshold + perWorkflowCostCap ) } @@ -2119,5 +2130,5 @@ class TestSubmissionMonitor(val workspaceName: WorkspaceName, val config: SubmissionMonitorConfig, val queryTimeout: Duration, override val workbenchMetricBaseName: String, - val costCapThreshold: Option[BigDecimal] + val perWorkflowCostCap: Option[BigDecimal] ) extends SubmissionMonitor {} diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionValidationSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionValidationSpec.scala index 417054e157..cad575a9db 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionValidationSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/submissions/SubmissionValidationSpec.scala @@ -20,32 +20,33 @@ class SubmissionValidationSpec extends AnyFlatSpec with Matchers with TableDrive SubmissionRequestValidation.staticValidation(submission, defaultValidMethodConfig) shouldBe () } - val costCapThresholdValidations = Table( - ("validation case", "costCapThresholdValue", "expected error text"), + val perWorkflowCostCapValidations = Table( + ("validation case", "perWorkflowCostCapValue", "expected error text"), ("no value", None, List()), ("a valid value", Some(BigDecimal("98765432.01")), List()), ("a negative value", Some(BigDecimal("-98765432.01")), List("greater than zero")), ("a value that is too large", Some(BigDecimal("198765432.01")), - List(s"cannot be greater than ${SubmissionRequestValidation.COST_CAP_THRESHOLD_PRECISION}") + List(s"cannot be greater than ${SubmissionRequestValidation.PER_WORKFLOW_COST_CAP_PRECISION}") ), ("a value with an invalid scale", Some(BigDecimal("8765432.019")), - List(s"scale is limited to ${SubmissionRequestValidation.COST_CAP_THRESHOLD_SCALE}") + List(s"scale is limited to ${SubmissionRequestValidation.PER_WORKFLOW_COST_CAP_SCALE}") ), ("a value with multiple failed validations", Some(BigDecimal("-198765432.012")), List( "greater than zero", - s"cannot be greater than ${SubmissionRequestValidation.COST_CAP_THRESHOLD_PRECISION}", - s"${SubmissionRequestValidation.COST_CAP_THRESHOLD_SCALE}" + s"cannot be greater than ${SubmissionRequestValidation.PER_WORKFLOW_COST_CAP_PRECISION}", + s"${SubmissionRequestValidation.PER_WORKFLOW_COST_CAP_SCALE}" ) ) ) - it should "validate costCapThreshold" in { - forAll(costCapThresholdValidations) { (_, value, expectedErrors) => - val submission = SubmissionRequest("name", "namespace", None, None, None, false, false, costCapThreshold = value) + it should "validate perWorkflowCostCap" in { + forAll(perWorkflowCostCapValidations) { (_, value, expectedErrors) => + val submission = + SubmissionRequest("name", "namespace", None, None, None, false, false, perWorkflowCostCap = value) if (expectedErrors.isEmpty) { SubmissionRequestValidation.staticValidation(submission, defaultValidMethodConfig) shouldBe () } else { diff --git a/model/src/main/scala/org/broadinstitute/dsde/rawls/model/ExecutionModel.scala b/model/src/main/scala/org/broadinstitute/dsde/rawls/model/ExecutionModel.scala index 708c1e8d48..2a21a8199b 100644 --- a/model/src/main/scala/org/broadinstitute/dsde/rawls/model/ExecutionModel.scala +++ b/model/src/main/scala/org/broadinstitute/dsde/rawls/model/ExecutionModel.scala @@ -36,7 +36,7 @@ case class SubmissionRequest( monitoringScript: Option[String] = None, monitoringImage: Option[String] = None, monitoringImageScript: Option[String] = None, - costCapThreshold: Option[BigDecimal] = None + perWorkflowCostCap: Option[BigDecimal] = None ) // This class contains values from the submission REST request @@ -171,7 +171,7 @@ case class Submission( monitoringScript: Option[String] = None, monitoringImage: Option[String] = None, monitoringImageScript: Option[String] = None, - costCapThreshold: Option[BigDecimal] = None + perWorkflowCostCap: Option[BigDecimal] = None ) case class SubmissionListResponse( @@ -192,7 +192,7 @@ case class SubmissionListResponse( cost: Option[Float] = None, externalEntityInfo: Option[ExternalEntityInfo] = None, userComment: Option[String] = None, - costCapThreshold: Option[BigDecimal] = None + perWorkflowCostCap: Option[BigDecimal] = None ) object SubmissionListResponse { @@ -407,7 +407,7 @@ trait ExecutionJsonSupport extends JsonSupport { Option("monitoringScript" -> obj.monitoringScript.toJson), Option("monitoringImage" -> obj.monitoringImage.toJson), Option("monitoringImageScript" -> obj.monitoringImageScript.toJson), - obj.costCapThreshold.map("costCapThreshold" -> _.toJson) + obj.perWorkflowCostCap.map("perWorkflowCostCap" -> _.toJson) ).flatten: _* ) @@ -435,7 +435,7 @@ trait ExecutionJsonSupport extends JsonSupport { monitoringScript = fields.get("monitoringScript").flatMap(_.convertTo[Option[String]]), monitoringImage = fields.get("monitoringImage").flatMap(_.convertTo[Option[String]]), monitoringImageScript = fields.get("monitoringImageScript").flatMap(_.convertTo[Option[String]]), - costCapThreshold = fields.get("costCapThreshold").map(_.convertTo[BigDecimal]) + perWorkflowCostCap = fields.get("perWorkflowCostCap").map(_.convertTo[BigDecimal]) // All new fields above this line MUST have defaults or be wrapped in Option[]! ) } diff --git a/model/src/test/scala/org/broadinstitute/dsde/rawls/model/ExecutionModelSpec.scala b/model/src/test/scala/org/broadinstitute/dsde/rawls/model/ExecutionModelSpec.scala index 74d9234767..a72a3f113b 100644 --- a/model/src/test/scala/org/broadinstitute/dsde/rawls/model/ExecutionModelSpec.scala +++ b/model/src/test/scala/org/broadinstitute/dsde/rawls/model/ExecutionModelSpec.scala @@ -10,23 +10,23 @@ import spray.json._ class ExecutionModelSpec extends AnyFlatSpec with Assertions with Matchers { behavior of "SubmissionRequest Deserialization" - it should "deserialize costCapThreshold correctly" in { + it should "deserialize perWorkflowCostCap correctly" in { val requestString = """{ | "methodConfigurationNamespace": "testNamespace", | "methodConfigurationName": "testName", | "useCallCache": false, | "deleteIntermediateOutputFiles": false, - | "costCapThreshold": 23456789.01 + | "perWorkflowCostCap": 23456789.01 |}""".stripMargin val requestObj = SubmissionRequestFormat.read(requestString.parseJson) - requestObj.costCapThreshold should be(Some(BigDecimal("23456789.01"))) + requestObj.perWorkflowCostCap should be(Some(BigDecimal("23456789.01"))) } behavior of "SubmissionListResponse Serialization" - it should "not include costCapThreshold in json if specified as None" in { + it should "not include perWorkflowCostCap in json if specified as None" in { val responseObj = new SubmissionListResponse( "id", DateTime.now(), @@ -47,11 +47,11 @@ class ExecutionModelSpec extends AnyFlatSpec with Assertions with Matchers { serializedObj should include("submissionId") serializedObj should include("submitter") serializedObj should include("testSubmitter") - serializedObj should not(include("costCapThreshold")) + serializedObj should not(include("perWorkflowCostCap")) } - it should "be able to write out the costCapThreshold to json" in { + it should "be able to write out the perWorkflowCostCap to json" in { val bigDecimalString = "23456789.01" val responseObj = new SubmissionListResponse( "id", @@ -68,11 +68,11 @@ class ExecutionModelSpec extends AnyFlatSpec with Assertions with Matchers { false, None, None, - costCapThreshold = Some(BigDecimal(bigDecimalString)) + perWorkflowCostCap = Some(BigDecimal(bigDecimalString)) ) val serializedObj = SubmissionListResponseFormat.write(responseObj).toString() - serializedObj should include("costCapThreshold") + serializedObj should include("perWorkflowCostCap") serializedObj should include(bigDecimalString) }