Skip to content

Commit

Permalink
[AN-226] Enable submitting workflows to Cromwell's GCP Batch backend (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-schu authored Dec 9, 2024
1 parent cefaea5 commit 83bca16
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 12 deletions.
5 changes: 4 additions & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ Ticket: <Link to Jira ticket>
- [ ] Include the JIRA issue number in the PR description and title
- [ ] Make sure Swagger is updated if API changes
- [ ] **...and Orchestration's Swagger too!**
- [ ] If you changed anything in `model/`, then you should [publish a new official `rawls-model`](https://github.com/broadinstitute/rawls/blob/develop/README.md#publish-rawls-model) and update `rawls-model` in [Orchestration's dependencies](https://github.com/broadinstitute/firecloud-orchestration/blob/develop/project/Dependencies.scala).
- [ ] If you changed anything in `model/`, then you should publish a new official `rawls-model` and perform the corresponding dependency updates as specified in the [README](https://github.com/broadinstitute/rawls/blob/develop/README.md#publish-rawls-model):
- [ ] in the automation subdirectory
- [ ] in workbench-libs
- [ ] in firecloud-orchestration
- [ ] Get two thumbsworth of PR review
- [ ] Verify all tests go green, including CI tests
- [ ] **Squash commits and merge** to develop (branches are automatically deleted after merging)
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ VAULT_TOKEN=$(cat ~/.vault-token) ARTIFACTORY_USERNAME=dsdejenkins ARTIFACTORY_P
You can view what is in the artifactory here: https://broadinstitute.jfrog.io/broadinstitute/webapp/#/home

After publishing:
* update [model/CHANGELOG.md](model/CHANGELOG.md) properly
* update the rawls-model dependency in the automation subdirectory, and ensure that sbt project is healthy
* update the rawls-model dependency in workbench-libs serviceTest, and ensure that sbt project is healthy
* Update [model/CHANGELOG.md](model/CHANGELOG.md) properly
* Update the rawls-model dependency in the automation subdirectory, and ensure that the sbt project is healthy
* In workbench-libs, update the rawls-model dependency in the project dependencies, update the README and the serviceTest changelog accordingly, and ensure that the sbt project is healthy
* In firecloud-orchestration, update the rawls-model dependency in the project dependencies, and ensure that the sbt project is healthy


## Troubleshooting
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/broadinstitute/dsde/rawls/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ object Boot extends IOApp with LazyLogging {
CromwellBackend(appConfigManager.conf.getString("executionservice.defaultNetworkBackend"))
val highSecurityNetworkCromwellBackend: CromwellBackend =
CromwellBackend(appConfigManager.conf.getString("executionservice.highSecurityNetworkBackend"))
val gcpBatchBackend: CromwellBackend =
CromwellBackend(appConfigManager.conf.getString("executionservice.gcpBatchBackend"))

val wdlParsingConfig = WDLParserConfig(appConfigManager.conf.getConfig("wdl-parsing"))
def cromwellSwaggerClient = new CromwellSwaggerClient(wdlParsingConfig.serverBasePath)
Expand Down Expand Up @@ -599,6 +601,7 @@ object Boot extends IOApp with LazyLogging {
useWorkflowCollectionLabel,
defaultNetworkCromwellBackend,
highSecurityNetworkCromwellBackend,
gcpBatchBackend,
methodConfigResolver,
bardService,
workspaceSettingRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object WorkflowSubmissionActor {
useWorkflowCollectionLabel: Boolean,
defaultNetworkCromwellBackend: CromwellBackend,
highSecurityNetworkCromwellBackend: CromwellBackend,
gcpBatchBackend: CromwellBackend,
methodConfigResolver: MethodConfigResolver,
bardService: BardService,
workspaceSettingRepository: WorkspaceSettingRepository
Expand All @@ -69,6 +70,7 @@ object WorkflowSubmissionActor {
useWorkflowCollectionLabel,
defaultNetworkCromwellBackend,
highSecurityNetworkCromwellBackend,
gcpBatchBackend,
methodConfigResolver,
bardService,
workspaceSettingRepository
Expand Down Expand Up @@ -105,6 +107,7 @@ class WorkflowSubmissionActor(val dataSource: SlickDataSource,
val useWorkflowCollectionLabel: Boolean,
val defaultNetworkCromwellBackend: CromwellBackend,
val highSecurityNetworkCromwellBackend: CromwellBackend,
val gcpBatchBackend: CromwellBackend,
val methodConfigResolver: MethodConfigResolver,
val bardService: BardService,
val workspaceSettingRepository: WorkspaceSettingRepository
Expand Down Expand Up @@ -159,6 +162,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths
val useWorkflowCollectionLabel: Boolean
val defaultNetworkCromwellBackend: CromwellBackend
val highSecurityNetworkCromwellBackend: CromwellBackend
val gcpBatchBackend: CromwellBackend
val methodConfigResolver: MethodConfigResolver
val bardService: BardService
val workspaceSettingRepository: WorkspaceSettingRepository
Expand Down Expand Up @@ -301,7 +305,17 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths
// - final_workflow_outputs_dir = submissions/final-outputs
// - final_workflow_outputs_mode = "copy".

useCromwellGcpBatchBackend: Boolean = currentSettings.exists {
case backendSetting: UseCromwellGcpBatchBackendSetting => backendSetting.config.enabled
case _ => false
}
cromwellSubmissionBackend =
if (useCromwellGcpBatchBackend) gcpBatchBackend else highSecurityNetworkCromwellBackend

executionServiceWorkflowOptions = ExecutionServiceWorkflowOptions(
// We pass the submission root as the value for two options,
// one for the PAPI Cromwell backend and one for the GCP Batch backend.
submission.submissionRoot,
submission.submissionRoot,
final_workflow_outputs_dir,
final_workflow_outputs_dir_metadata,
Expand All @@ -315,7 +329,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths
deleteIntermediateOutputFiles,
useReferenceDisks,
memoryRetryMultiplier,
highSecurityNetworkCromwellBackend,
cromwellSubmissionBackend,
workflowFailureMode,
google_labels = Map("terra-submission-id" -> s"terra-${submission.id.toString}"),
ignoreEmptyOutputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object BootMonitors extends LazyLogging {
useWorkflowCollectionLabel: Boolean,
defaultNetworkCromwellBackend: CromwellBackend,
highSecurityNetworkCromwellBackend: CromwellBackend,
gcpBatchBackend: CromwellBackend,
methodConfigResolver: MethodConfigResolver,
bardService: BardService,
workspaceSettingRepository: WorkspaceSettingRepository
Expand Down Expand Up @@ -147,6 +148,7 @@ object BootMonitors extends LazyLogging {
useWorkflowCollectionLabel,
defaultNetworkCromwellBackend,
highSecurityNetworkCromwellBackend,
gcpBatchBackend,
methodConfigResolver,
bardService,
workspaceSettingRepository
Expand Down Expand Up @@ -338,6 +340,7 @@ object BootMonitors extends LazyLogging {
useWorkflowCollectionLabel: Boolean,
defaultNetworkCromwellBackend: CromwellBackend,
highSecurityNetworkCromwellBackend: CromwellBackend,
gcpBatchBackend: CromwellBackend,
methodConfigResolver: MethodConfigResolver,
bardService: BardService,
workspaceSettingRepository: WorkspaceSettingRepository
Expand Down Expand Up @@ -366,6 +369,7 @@ object BootMonitors extends LazyLogging {
useWorkflowCollectionLabel,
defaultNetworkCromwellBackend,
highSecurityNetworkCromwellBackend,
gcpBatchBackend,
methodConfigResolver,
bardService,
workspaceSettingRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import org.broadinstitute.dsde.rawls.jobexec.WorkflowSubmissionActor.{
import org.broadinstitute.dsde.rawls.metrics.{BardService, RawlsStatsDTestUtils}
import org.broadinstitute.dsde.rawls.mock.{MockBardService, MockSamDAO, RemoteServicesMockServer}
import org.broadinstitute.dsde.rawls.model.ExecutionJsonSupport.ExecutionServiceWorkflowOptionsFormat
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingConfig.SeparateSubmissionFinalOutputsConfig
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingTypes.SeparateSubmissionFinalOutputs
import org.broadinstitute.dsde.rawls.model.{WorkspaceSetting, _}
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingConfig.{
SeparateSubmissionFinalOutputsConfig,
UseCromwellGcpBatchBackendConfig
}
import org.broadinstitute.dsde.rawls.model._
import org.broadinstitute.dsde.rawls.util.MockitoTestUtils
import org.broadinstitute.dsde.rawls.workspace.WorkspaceSettingRepository
import org.broadinstitute.dsde.rawls.{RawlsExceptionWithErrorReport, RawlsTestUtils}
Expand Down Expand Up @@ -120,6 +122,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem)
val useWorkflowCollectionLabel: Boolean = false,
val defaultNetworkCromwellBackend: CromwellBackend = CromwellBackend("PAPIv2"),
val highSecurityNetworkCromwellBackend: CromwellBackend = CromwellBackend("PAPIv2-CloudNAT"),
val gcpBatchBackend: CromwellBackend = CromwellBackend("GCPBatch"),
val methodConfigResolver: MethodConfigResolver = methodConfigResolver,
val bardService: BardService = mockBardService,
val workspaceSettingRepository: WorkspaceSettingRepository = mockWorkspaceSettingRepository
Expand Down Expand Up @@ -381,6 +384,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem)
Some(
ExecutionServiceWorkflowOptions(
jes_gcs_root = s"gs://${testData.workspace.bucketName}/${testData.submission1.submissionId}",
gcp_batch_gcs_root = s"gs://${testData.workspace.bucketName}/${testData.submission1.submissionId}",
None,
None,
google_project = testData.workspace.googleProjectId.value,
Expand Down Expand Up @@ -937,6 +941,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem)
false,
CromwellBackend("PAPIv2"),
CromwellBackend("PAPIv2-CloudNAT"),
CromwellBackend("GCPBatch"),
methodConfigResolver,
mockBardService,
mockWorkspaceSettingRepository
Expand Down Expand Up @@ -1005,6 +1010,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem)
false,
CromwellBackend("PAPIv2"),
CromwellBackend("PAPIv2-CloudNAT"),
CromwellBackend("GCPBatch"),
methodConfigResolver,
mockBardService,
mockWorkspaceSettingRepository
Expand Down Expand Up @@ -1202,6 +1208,90 @@ class WorkflowSubmissionSpec(_system: ActorSystem)
}
}

it should "submit workflows to Cromwell's GCP Batch backend when UseCromwellGcpBatchBackendSetting is true" in withDefaultTestDatabase {
val mockExecCluster = MockShardedExecutionServiceCluster.fromDAO(new MockExecutionServiceDAO(), slickDataSource)
val workspaceSettingRepository = mock[WorkspaceSettingRepository]
when(workspaceSettingRepository.getWorkspaceSettings(UUID.fromString(testData.workspace.workspaceId))).thenReturn(
Future.successful(List(UseCromwellGcpBatchBackendSetting(UseCromwellGcpBatchBackendConfig(true))))
)

val workflowSubmission =
new TestWorkflowSubmission(slickDataSource, workspaceSettingRepository = workspaceSettingRepository) {
override val executionServiceCluster = mockExecCluster
}

val (workflowRecs, submissionRec, workspaceRec) =
getWorkflowSubmissionWorkspaceRecords(testData.regionalSubmission, testData.workspace)

Await.result(
workflowSubmission.submitWorkflowBatch(WorkflowBatch(workflowRecs.map(_.id), submissionRec, workspaceRec)),
Duration.Inf
)

val workflowOptions = mockExecCluster.getDefaultSubmitMember
.asInstanceOf[MockExecutionServiceDAO]
.submitOptions
.map(_.parseJson.convertTo[ExecutionServiceWorkflowOptions])

workflowOptions.get.backend should be(CromwellBackend("GCPBatch"))
}

it should "submit workflows to Cromwell's high security network backend when UseCromwellGcpBatchBackendSetting is false" in withDefaultTestDatabase {
val mockExecCluster = MockShardedExecutionServiceCluster.fromDAO(new MockExecutionServiceDAO(), slickDataSource)
val workspaceSettingRepository = mock[WorkspaceSettingRepository]
when(workspaceSettingRepository.getWorkspaceSettings(UUID.fromString(testData.workspace.workspaceId))).thenReturn(
Future.successful(List(UseCromwellGcpBatchBackendSetting(UseCromwellGcpBatchBackendConfig(false))))
)

val workflowSubmission =
new TestWorkflowSubmission(slickDataSource, workspaceSettingRepository = workspaceSettingRepository) {
override val executionServiceCluster = mockExecCluster
}

val (workflowRecs, submissionRec, workspaceRec) =
getWorkflowSubmissionWorkspaceRecords(testData.regionalSubmission, testData.workspace)

Await.result(
workflowSubmission.submitWorkflowBatch(WorkflowBatch(workflowRecs.map(_.id), submissionRec, workspaceRec)),
Duration.Inf
)

val workflowOptions = mockExecCluster.getDefaultSubmitMember
.asInstanceOf[MockExecutionServiceDAO]
.submitOptions
.map(_.parseJson.convertTo[ExecutionServiceWorkflowOptions])

workflowOptions.get.backend should be(CromwellBackend("PAPIv2-CloudNAT"))
}

it should "submit workflows to Cromwell's high security network backend when UseCromwellGcpBatchBackendSetting is not set" in withDefaultTestDatabase {
val mockExecCluster = MockShardedExecutionServiceCluster.fromDAO(new MockExecutionServiceDAO(), slickDataSource)
val workspaceSettingRepository = mock[WorkspaceSettingRepository]
when(workspaceSettingRepository.getWorkspaceSettings(UUID.fromString(testData.workspace.workspaceId))).thenReturn(
Future.successful(List())
)

val workflowSubmission =
new TestWorkflowSubmission(slickDataSource, workspaceSettingRepository = workspaceSettingRepository) {
override val executionServiceCluster = mockExecCluster
}

val (workflowRecs, submissionRec, workspaceRec) =
getWorkflowSubmissionWorkspaceRecords(testData.regionalSubmission, testData.workspace)

Await.result(
workflowSubmission.submitWorkflowBatch(WorkflowBatch(workflowRecs.map(_.id), submissionRec, workspaceRec)),
Duration.Inf
)

val workflowOptions = mockExecCluster.getDefaultSubmitMember
.asInstanceOf[MockExecutionServiceDAO]
.submitOptions
.map(_.parseJson.convertTo[ExecutionServiceWorkflowOptions])

workflowOptions.get.backend should be(CromwellBackend("PAPIv2-CloudNAT"))
}

private def setWorkflowBatchToQueued(batchSize: Int, submissionId: String): Seq[WorkflowRecord] =
runAndWait(
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class ExecutionModelSpec extends AnyFlatSpec with Matchers {
"ExecutionServiceWorkflowOptions" should "serialize/deserialize to/from JSON" in {
val test = ExecutionServiceWorkflowOptions(
jes_gcs_root = "jes_gcs_root",
gcp_batch_gcs_root = "example_gcp_batch_gcs_root",
final_workflow_outputs_dir = None,
final_workflow_outputs_dir_metadata = None,
google_project = "google_project",
Expand Down Expand Up @@ -162,6 +163,7 @@ class ExecutionModelSpec extends AnyFlatSpec with Matchers {
"""
|{
| "jes_gcs_root": "jes_gcs_root",
| "gcp_batch_gcs_root": "example_gcp_batch_gcs_root",
| "google_project": "google_project",
| "account_name": "account_name",
| "google_compute_service_account": "[email protected]",
Expand Down Expand Up @@ -192,6 +194,7 @@ class ExecutionModelSpec extends AnyFlatSpec with Matchers {
"""
|{
| "jes_gcs_root": "jes_gcs_root",
| "gcp_batch_gcs_root": "example_gcp_batch_gcs_root",
| "google_project": "google_project",
| "account_name": "account_name",
| "google_compute_service_account": "[email protected]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class SubmissionApiServiceSpec extends ApiServiceSpec with TableDrivenPropertyCh
false,
CromwellBackend("PAPIv2"),
CromwellBackend("PAPIv2-CloudNAT"),
CromwellBackend("GCPBatch"),
methodConfigResolver,
new MockBardService(),
new WorkspaceSettingRepository(slickDataSource)
Expand Down
5 changes: 3 additions & 2 deletions local-dev/templates/rawls.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ dataRepo {

resourceBuffer {
projectPool {
regular = "cwb_ws_dev_v7"
exfiltrationControlled = "vpc_sc_v10"
regular = "cwb_ws_dev_v8"
exfiltrationControlled = "vpc_sc_v13"
}
url = "https://buffer.dsde-dev.broadinstitute.org"
saEmail = "[email protected]"
Expand Down Expand Up @@ -138,6 +138,7 @@ executionservice {

defaultNetworkBackend = "PAPIv2-beta"
highSecurityNetworkBackend = "PAPIv2-CloudNAT"
gcpBatchBackend = "GCPBatch"

cromiamUrl = "https://cromiam-priv.dsde-dev.broadinstitute.org"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ case class ExecutionServiceCallLogs(
)

// https://cromwell.readthedocs.io/en/stable/wf_options/Google/
// We provide both the key jes_gcs_root and the key gcp_batch_gcs_root
// to accommodate the PAPI and GCP Batch Cromwell backends. Each backend
// ignores the key that does not correspond to it. This is a temporary
// measure, and jes_gcs_root can be removed when we complete the full
// transition to GCP Batch.
case class ExecutionServiceWorkflowOptions(
jes_gcs_root: String,
gcp_batch_gcs_root: String,
final_workflow_outputs_dir: Option[String],
final_workflow_outputs_dir_metadata: Option[String],
google_project: String,
Expand All @@ -110,7 +116,8 @@ case class ExecutionServiceWorkflowOptions(
monitoring_image_script: Option[String] = None
)

// current possible backends are "JES" and "PAPIv2" but this is subject to change in the future
// Current possible backends are "PAPIv2-beta" (not in current use),
// "PAPIv2-CloudNAT", and "GCPBatch".
final case class CromwellBackend(value: String) extends ValueObject

case class ExecutionServiceLabelResponse(
Expand Down Expand Up @@ -469,7 +476,7 @@ trait ExecutionJsonSupport extends JsonSupport {

implicit val ExecutionServiceLogsFormat: RootJsonFormat[ExecutionServiceLogs] = jsonFormat2(ExecutionServiceLogs)

implicit val ExecutionServiceWorkflowOptionsFormat: RootJsonFormat[ExecutionServiceWorkflowOptions] = jsonFormat20(
implicit val ExecutionServiceWorkflowOptionsFormat: RootJsonFormat[ExecutionServiceWorkflowOptions] = jsonFormat21(
ExecutionServiceWorkflowOptions
)

Expand Down

0 comments on commit 83bca16

Please sign in to comment.