Skip to content

Commit

Permalink
[CORE-172] Change submission cost cap to per-workflow instead of aggr…
Browse files Browse the repository at this point in the history
…egate submission cost (#3140)
  • Loading branch information
marctalbott authored Nov 22, 2024
1 parent 0c7a057 commit 3edee4f
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,5 @@
<include file="changesets/20240830_workflow_cost.xml" relativeToChangelogFile="true"/>
<include file="changesets/20241106_streamline_entity_attr_temp_table_procedure.xml" relativeToChangelogFile="true"/>
<include file="changesets/20241106_streamline_workspace_attr_temp_table_procedure.xml" relativeToChangelogFile="true"/>
<include file="changesets/20241120_rename_submission_cost_cap_threshold.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog logicalFilePath="dummy" xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">
<changeSet id="rename_submission_cost_cap_threshold" author="mtalbott" logicalFilePath="dummy">
<renameColumn tableName="SUBMISSION" oldColumnName="COST_CAP_THRESHOLD" newColumnName="PER_WORKFLOW_COST_CAP" columnDataType="NUMBER(10,2)"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class SubmissionRecord(id: UUID,
monitoringScript: Option[String],
monitoringImage: Option[String],
monitoringImageScript: Option[String],
costCapThreshold: Option[BigDecimal]
perWorkflowCostCap: Option[BigDecimal]
)

case class SubmissionValidationRecord(id: Long, workflowId: Long, errorText: Option[String], inputName: String)
Expand Down Expand Up @@ -80,7 +80,7 @@ trait SubmissionComponent {
def monitoringScript = column[Option[String]]("MONITORING_SCRIPT")
def monitoringImage = column[Option[String]]("MONITORING_IMAGE")
def monitoringImageScript = column[Option[String]]("MONITORING_IMAGE_SCRIPT")
def costCapThreshold = column[Option[BigDecimal]]("COST_CAP_THRESHOLD")
def perWorkflowCostCap = column[Option[BigDecimal]]("PER_WORKFLOW_COST_CAP")

def * = (
id,
Expand All @@ -103,7 +103,7 @@ trait SubmissionComponent {
monitoringScript,
monitoringImage,
monitoringImageScript,
costCapThreshold
perWorkflowCostCap
) <> (SubmissionRecord.tupled, SubmissionRecord.unapply)

def workspace = foreignKey("FK_SUB_WORKSPACE", workspaceId, workspaceQuery)(_.id)
Expand Down Expand Up @@ -299,16 +299,16 @@ trait SubmissionComponent {
})
)

def listActiveSubmissionIdsWithWorkspaceAndCostCapThreshold(
def listActiveSubmissionIdsWithWorkspaceAndPerWorkflowCostCap(
limit: FiniteDuration
): ReadAction[Seq[(UUID, WorkspaceName, Option[BigDecimal])]] = {
// Exclude submissions from monitoring if they are ancient/stuck [WX-820]
val cutoffTime = new Timestamp(DateTime.now().minusDays(limit.toDays.toInt).getMillis)
val query = findActiveSubmissionsAfterTime(cutoffTime) join workspaceQuery on (_.workspaceId === _.id)
val result = query.map { case (sub, ws) => (sub.id, ws.namespace, ws.name, sub.costCapThreshold) }.result
val result = query.map { case (sub, ws) => (sub.id, ws.namespace, ws.name, sub.perWorkflowCostCap) }.result
result.map(rows =>
rows.map { case (subId, wsNs, wsName, costCapThreshold) =>
(subId, WorkspaceName(wsNs, wsName), costCapThreshold)
rows.map { case (subId, wsNs, wsName, perWorkflowCostCap) =>
(subId, WorkspaceName(wsNs, wsName), perWorkflowCostCap)
}
)
}
Expand Down Expand Up @@ -513,7 +513,7 @@ trait SubmissionComponent {
submission.monitoringScript,
submission.monitoringImage,
submission.monitoringImageScript,
submission.costCapThreshold
submission.perWorkflowCostCap
)

private def unmarshalSubmission(submissionRec: SubmissionRecord,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object SubmissionMonitorActor {
config: SubmissionMonitorConfig,
queryTimeout: Duration,
workbenchMetricBaseName: String,
costCapThreshold: Option[BigDecimal] = None
perWorkflowCostCap: Option[BigDecimal] = None
): Props =
Props(
new SubmissionMonitorActor(
Expand All @@ -82,7 +82,7 @@ object SubmissionMonitorActor {
config,
queryTimeout,
workbenchMetricBaseName,
costCapThreshold
perWorkflowCostCap
)
)

Expand Down Expand Up @@ -126,7 +126,7 @@ class SubmissionMonitorActor(val workspaceName: WorkspaceName,
val config: SubmissionMonitorConfig,
val queryTimeout: Duration,
override val workbenchMetricBaseName: String,
val costCapThreshold: Option[BigDecimal]
val perWorkflowCostCap: Option[BigDecimal]
) extends Actor
with SubmissionMonitor
with LazyLogging {
Expand Down Expand Up @@ -188,7 +188,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
val executionServiceCluster: ExecutionServiceCluster
val config: SubmissionMonitorConfig
val queryTimeout: Duration
val costCapThreshold: Option[BigDecimal]
val perWorkflowCostCap: Option[BigDecimal]

// Cache these metric builders since they won't change for this SubmissionMonitor
protected lazy val workspaceMetricBuilder: ExpandedMetricBuilder =
Expand Down Expand Up @@ -310,19 +310,36 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
private def execServiceStatus(workflowRec: WorkflowRecord, petUser: UserInfo)(implicit
executionContext: ExecutionContext
): Future[Option[WorkflowRecord]] =
workflowRec.externalId match {
(workflowRec.externalId, perWorkflowCostCap) match {
// fetch cost information for the workflow if submission has a cost cap threshold defined
case Some(externalId) if costCapThreshold.isDefined =>
executionServiceCluster.getCost(workflowRec, petUser).map { costBreakdown =>
Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))
}
case (Some(externalId), Some(costCap)) =>
for {
costBreakdown <- executionServiceCluster.getCost(workflowRec, petUser)
updatedWorkflowRec <-
if (costBreakdown.cost > costCap) {
executionServiceCluster.abort(workflowRec, petUser).map {
case Success(abortedWfRec) =>
logger.info(
s"Aborted workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap."
)
Option(workflowRec.copy(status = abortedWfRec.status, cost = costBreakdown.cost.some))
case Failure(t) =>
logger.error(
s"Failed to abort workflow ${workflowRec.externalId} in submission $submissionId that exceeded per-workflow cost cap. Error: ${t.getMessage}"
)
Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))
}
} else {
Future.successful(Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some)))
}
} yield updatedWorkflowRec
// fetch workflow status only if cost cap threshold is not defined
case Some(externalId) =>
case (Some(externalId), None) =>
executionServiceCluster.status(workflowRec, petUser).map { newStatus =>
if (newStatus.status != workflowRec.status) Option(workflowRec.copy(status = newStatus.status))
else None
}
case None => Future.successful(None)
case _ => Future.successful(None)
}

private def execServiceOutputs(workflowRec: WorkflowRecord, petUser: UserInfo)(implicit
Expand Down Expand Up @@ -461,7 +478,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
if (doRecordUpdate) {
for {
updateResult <-
if (costCapThreshold.isDefined) {
if (perWorkflowCostCap.isDefined) {
dataAccess.workflowQuery.updateStatusAndCost(currentRec,
WorkflowStatuses.withName(workflowRec.status),
workflowRec.cost.getOrElse(BigDecimal(0))
Expand Down Expand Up @@ -582,24 +599,12 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
*/
def updateSubmissionStatus(
dataAccess: DataAccess
)(implicit executionContext: ExecutionContext): ReadWriteAction[Boolean] = {
val workflowRecsAction = if (costCapThreshold.isDefined) {
dataAccess.workflowQuery.listWorkflowRecsForSubmission(submissionId)
} else {
dataAccess.workflowQuery.listWorkflowRecsForSubmissionAndStatuses(
submissionId,
(WorkflowStatuses.queuedStatuses ++ WorkflowStatuses.runningStatuses): _*
)
}

workflowRecsAction.flatMap { workflowRecs =>
val nonTerminalWorkflows =
if (costCapThreshold.isDefined)
workflowRecs
.filterNot(wf => WorkflowStatuses.terminalStatuses.contains(WorkflowStatuses.withName(wf.status)))
else workflowRecs

if (nonTerminalWorkflows.isEmpty) {
)(implicit executionContext: ExecutionContext): ReadWriteAction[Boolean] =
dataAccess.workflowQuery.listWorkflowRecsForSubmissionAndStatuses(
submissionId,
(WorkflowStatuses.queuedStatuses ++ WorkflowStatuses.runningStatuses): _*
) flatMap { workflowRecs =>
if (workflowRecs.isEmpty) {
dataAccess.submissionQuery.findById(submissionId).map(_.status).result.head.flatMap { status =>
val finalStatus = SubmissionStatuses.withName(status) match {
case SubmissionStatuses.Aborting => SubmissionStatuses.Aborted
Expand All @@ -612,16 +617,10 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
logger.debug(s"submission $submissionId terminating to status $newStatus")
dataAccess.submissionQuery.updateStatus(submissionId, newStatus)
} map (_ => true)
} else if (costCapThreshold.isDefined && costCapThreshold.get <= workflowRecs.flatMap(_.cost).sum) {
logger.info(
s"Submission $submissionId exceeded its cost cap and will be aborted. [costCap=${costCapThreshold.get},currentSubmissionCost=${workflowRecs.flatMap(_.cost).sum}]"
)
dataAccess.submissionQuery.updateStatus(submissionId, SubmissionStatuses.Aborting).map(_ => false)
} else {
DBIO.successful(false)
}
}
}

def handleOutputs(workflowsWithOutputs: Seq[(WorkflowRecord, ExecutionServiceOutputs)],
dataAccess: DataAccess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
object SubmissionSupervisor {
sealed trait SubmissionSupervisorMessage

case class SubmissionStarted(workspaceName: WorkspaceName, submissionId: UUID, costCapThreshold: Option[BigDecimal])
case class SubmissionStarted(workspaceName: WorkspaceName, submissionId: UUID, perWorkflowCostCap: Option[BigDecimal])
case object StartMonitorPass
case object SubmissionMonitorPassComplete

Expand Down Expand Up @@ -133,8 +133,8 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster,
override def receive = {
case StartMonitorPass =>
startMonitoringNewSubmissions pipeTo self
case SubmissionStarted(workspaceName, submissionId, costCapThreshold) =>
val child = startSubmissionMonitor(workspaceName, submissionId, costCapThreshold)
case SubmissionStarted(workspaceName, submissionId, perWorkflowCostCap) =>
val child = startSubmissionMonitor(workspaceName, submissionId, perWorkflowCostCap)
scheduleNextCheckCurrentWorkflowStatus(child)
registerDetailedJobExecGauges(workspaceName, submissionId)

Expand Down Expand Up @@ -181,7 +181,7 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster,

private def startSubmissionMonitor(workspaceName: WorkspaceName,
submissionId: UUID,
costCapThreshold: Option[BigDecimal]
perWorkflowCostCap: Option[BigDecimal]
) =
actorOf(
SubmissionMonitorActor
Expand All @@ -196,7 +196,7 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster,
submissionMonitorConfig,
entityQueryTimeout,
workbenchMetricBaseName,
costCapThreshold
perWorkflowCostCap
)
.withDispatcher("submission-monitor-dispatcher"),
submissionId.toString
Expand All @@ -222,15 +222,15 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster,
val monitoredSubmissions = context.children.map(_.path.name).toSet

datasource.inTransaction { dataAccess =>
dataAccess.submissionQuery.listActiveSubmissionIdsWithWorkspaceAndCostCapThreshold(limit =
dataAccess.submissionQuery.listActiveSubmissionIdsWithWorkspaceAndPerWorkflowCostCap(limit =
submissionMonitorConfig.submissionPollExpiration
) map { activeSubs =>
val unmonitoredSubmissions = activeSubs.filterNot { case (subId, _, _) =>
monitoredSubmissions.contains(subId.toString)
}

unmonitoredSubmissions.foreach { case (subId, wsName, costCapThreshold) =>
self ! SubmissionStarted(wsName, subId, costCapThreshold)
unmonitoredSubmissions.foreach { case (subId, wsName, perWorkflowCostCap) =>
self ! SubmissionStarted(wsName, subId, perWorkflowCostCap)
}
SubmissionMonitorPassComplete
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ object SubmissionRequestValidation extends StringValidationUtils {

implicit val errorReportSource: ErrorReportSource = ErrorReportSource("rawls")

val COST_CAP_THRESHOLD_SCALE = 2;
val COST_CAP_THRESHOLD_PRECISION = 10;
val PER_WORKFLOW_COST_CAP_SCALE = 2;
val PER_WORKFLOW_COST_CAP_PRECISION = 10;

// Note: this limit is also hard-coded in the terra-ui code to allow client-side validation.
// If it is changed, it must also be updated in that repository.
Expand All @@ -32,7 +32,7 @@ object SubmissionRequestValidation extends StringValidationUtils {
def staticValidation(submission: SubmissionRequest, methodConfig: MethodConfiguration): Unit = {

val errors = List(
validateCostCapThreshold(submission),
validatePerWorkflowCostCap(submission),
validateEntityNameAndType(submission),
validateMethodConfigRootEntity(submission, methodConfig),
validateEntityAndDataReference(submission, methodConfig),
Expand All @@ -45,19 +45,19 @@ object SubmissionRequestValidation extends StringValidationUtils {
)
}

def validateCostCapThreshold(submissionRequest: SubmissionRequest): List[ErrorReport] = List(
submissionRequest.costCapThreshold.flatMap { threshold =>
if (threshold.sign <= 0) Some(ErrorReport("costCapThreshold must be greater than zero")) else None
def validatePerWorkflowCostCap(submissionRequest: SubmissionRequest): List[ErrorReport] = List(
submissionRequest.perWorkflowCostCap.flatMap { threshold =>
if (threshold.sign <= 0) Some(ErrorReport("perWorkflowCostCap must be greater than zero")) else None
},
submissionRequest.costCapThreshold.flatMap { threshold =>
if (threshold.scale > COST_CAP_THRESHOLD_SCALE) {
submissionRequest.perWorkflowCostCap.flatMap { threshold =>
if (threshold.scale > PER_WORKFLOW_COST_CAP_SCALE) {
// TODO: improve messages
Some(ErrorReport(s"costCapThreshold scale is limited to $COST_CAP_THRESHOLD_SCALE decimal places"))
Some(ErrorReport(s"perWorkflowCostCap scale is limited to $PER_WORKFLOW_COST_CAP_SCALE decimal places"))
} else None
},
submissionRequest.costCapThreshold.flatMap { threshold =>
if (threshold.precision > COST_CAP_THRESHOLD_PRECISION)
Some(ErrorReport(s"costCapThreshold cannot be greater than $COST_CAP_THRESHOLD_PRECISION total digits"))
submissionRequest.perWorkflowCostCap.flatMap { threshold =>
if (threshold.precision > PER_WORKFLOW_COST_CAP_PRECISION)
Some(ErrorReport(s"perWorkflowCostCap cannot be greater than $PER_WORKFLOW_COST_CAP_PRECISION total digits"))
else None
}
).flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ class SubmissionsService(
monitoringScript = submissionRequest.monitoringScript,
monitoringImage = submissionRequest.monitoringImage,
monitoringImageScript = submissionRequest.monitoringImageScript,
costCapThreshold = submissionRequest.costCapThreshold
perWorkflowCostCap = submissionRequest.perWorkflowCostCap
)

logAndCreateDbSubmission(workspaceContext, submissionId, submission, dataAccess)
Expand Down
Loading

0 comments on commit 3edee4f

Please sign in to comment.