diff --git a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml index 1e4525ee6c..eb43ae9af3 100644 --- a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml +++ b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml @@ -125,4 +125,5 @@ + diff --git a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20231130_limit_clone_workspace_file_transfer.xml b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20231130_limit_clone_workspace_file_transfer.xml new file mode 100644 index 0000000000..96e9007633 --- /dev/null +++ b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20231130_limit_clone_workspace_file_transfer.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/core/src/main/resources/swagger/api-docs.yaml b/core/src/main/resources/swagger/api-docs.yaml index 11998b7391..9cae13e4fc 100644 --- a/core/src/main/resources/swagger/api-docs.yaml +++ b/core/src/main/resources/swagger/api-docs.yaml @@ -5851,6 +5851,13 @@ components: description: timestamp (UTC) marking the date that the bucket usage was last updated (YYYY-MM-DDThh:mm:ss.fffZ) description: "" PendingCloneWorkspaceFileTransfer: + required: + - destWorkspaceId + - sourceWorkspaceBucketName + - destWorkspaceBucketName + - copyFilesWithPrefix + - destWorkspaceGoogleProjectId + - created type: object properties: destWorkspaceId: @@ -5868,6 +5875,22 @@ components: destWorkspaceGoogleProjectId: type: string description: "The Google project that the destination workspace belongs to" + created: + type: string + description: "The time the file transfer started in yyyy-MM-ddTHH:mm:ss.SSSZZ + format." + format: date-time + finished: + type: string + description: "The time the file transfer finished in yyyy-MM-ddTHH:mm:ss.SSSZZ + format." + format: date-time + outcome: + type: string + description: "The outcome of a finished file transfer." + enum: + - Success + - Failure Attribute: type: object properties: diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/CloneWorkspaceFileTransferComponent.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/CloneWorkspaceFileTransferComponent.scala index f15a373a1e..7abfa07a23 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/CloneWorkspaceFileTransferComponent.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/CloneWorkspaceFileTransferComponent.scala @@ -1,13 +1,18 @@ package org.broadinstitute.dsde.rawls.dataaccess.slick import org.broadinstitute.dsde.rawls.model.{GoogleProjectId, PendingCloneWorkspaceFileTransfer} +import org.joda.time.DateTime +import java.sql.Timestamp import java.util.UUID case class CloneWorkspaceFileTransferRecord(id: Long, destWorkspaceId: UUID, sourceWorkspaceId: UUID, - copyFilesWithPrefix: String + copyFilesWithPrefix: String, + created: Timestamp, + finished: Option[Timestamp], + outcome: Option[String] ) trait CloneWorkspaceFileTransferComponent { @@ -21,11 +26,17 @@ trait CloneWorkspaceFileTransferComponent { def destWorkspaceId = column[UUID]("DEST_WORKSPACE_ID") def sourceWorkspaceId = column[UUID]("SOURCE_WORKSPACE_ID") def copyFilesWithPrefix = column[String]("COPY_FILES_WITH_PREFIX", O.Length(254)) + def created = column[Timestamp]("CREATED") + def finished = column[Option[Timestamp]]("FINISHED") + def outcome = column[Option[String]]("OUTCOME") def * = (id, destWorkspaceId, sourceWorkspaceId, - copyFilesWithPrefix + copyFilesWithPrefix, + created, + finished, + outcome ) <> (CloneWorkspaceFileTransferRecord.tupled, CloneWorkspaceFileTransferRecord.unapply) } @@ -43,16 +54,22 @@ trait CloneWorkspaceFileTransferComponent { def listPendingTransfers(workspaceId: Option[UUID] = None): ReadAction[Seq[PendingCloneWorkspaceFileTransfer]] = { val query = for { - fileTransfer <- cloneWorkspaceFileTransferQuery.filterOpt(workspaceId) { case (table, workspaceId) => - table.destWorkspaceId === workspaceId - } + fileTransfer <- + cloneWorkspaceFileTransferQuery + .filter(_.finished.isEmpty) + .filterOpt(workspaceId) { case (table, workspaceId) => + table.destWorkspaceId === workspaceId + } sourceWorkspace <- workspaceQuery if sourceWorkspace.id === fileTransfer.sourceWorkspaceId destWorkspace <- workspaceQuery if destWorkspace.id === fileTransfer.destWorkspaceId } yield (destWorkspace.id, sourceWorkspace.bucketName, destWorkspace.bucketName, fileTransfer.copyFilesWithPrefix, - destWorkspace.googleProjectId + destWorkspace.googleProjectId, + fileTransfer.created, + fileTransfer.finished, + fileTransfer.outcome ) query.result.map(results => @@ -61,18 +78,34 @@ trait CloneWorkspaceFileTransferComponent { sourceWorkspaceBucketName, destWorkspaceBucketName, copyFilesWithPrefix, - destWorkspaceGoogleProjectId + destWorkspaceGoogleProjectId, + created, + finished, + outcome ) => - PendingCloneWorkspaceFileTransfer(destWorkspaceId, - sourceWorkspaceBucketName, - destWorkspaceBucketName, - copyFilesWithPrefix, - GoogleProjectId(destWorkspaceGoogleProjectId) + PendingCloneWorkspaceFileTransfer( + destWorkspaceId, + sourceWorkspaceBucketName, + destWorkspaceBucketName, + copyFilesWithPrefix, + GoogleProjectId(destWorkspaceGoogleProjectId), + new DateTime(created), + finished.map(new DateTime(_)), + outcome ) } ) } + def update(pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer): ReadWriteAction[Int] = + findByDestWorkspaceId(pendingCloneWorkspaceFileTransfer.destWorkspaceId) + .map(ft => (ft.finished, ft.outcome)) + .update( + (pendingCloneWorkspaceFileTransfer.finished.map(f => new Timestamp(f.getMillis)), + pendingCloneWorkspaceFileTransfer.outcome + ) + ) + def delete(destWorkspaceId: UUID): ReadWriteAction[Int] = findByDestWorkspaceId(destWorkspaceId).delete diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitor.scala index 88ffc9eba9..b39cc8272b 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitor.scala @@ -11,8 +11,8 @@ import com.typesafe.scalalogging.LazyLogging import org.broadinstitute.dsde.rawls.dataaccess._ import org.broadinstitute.dsde.rawls.model._ import org.broadinstitute.dsde.rawls.monitor.CloneWorkspaceFileTransferMonitor.CheckAll +import org.joda.time.DateTime -import java.util.Date import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps @@ -50,7 +50,7 @@ class CloneWorkspaceFileTransferMonitorActor(val dataSource: SlickDataSource, } _ <- pendingTransfers.toList .traverse { pendingTransfer => - IO.fromFuture(IO(copyBucketFiles(pendingTransfer))).attempt.map { + IO.fromFuture(IO(attemptTransfer(pendingTransfer))).attempt.map { case Left(e) => // We do not want to throw e here. traverse stops executing as soon as it encounters a Failure, but we // want to continue traversing the list to transfer the rest of the buckets even if one of the @@ -66,6 +66,24 @@ class CloneWorkspaceFileTransferMonitorActor(val dataSource: SlickDataSource, .unsafeToFuture() } yield () + private def attemptTransfer( + pendingTransfer: PendingCloneWorkspaceFileTransfer + )(implicit executionContext: ExecutionContext): Future[List[Option[StorageObject]]] = { + val transferExpired = pendingTransfer.created.isBefore(DateTime.now().minusDays(1)) + for { + copiedObjects <- + if (!transferExpired) copyBucketFiles(pendingTransfer) + else { + logger.warn( + s"File transfer from ${pendingTransfer.sourceWorkspaceBucketName} to ${pendingTransfer.destWorkspaceBucketName} did not succeed within allowed time and will no longer be retried. [workspaceId=${pendingTransfer.destWorkspaceId}]" + ) + Future.successful(List.empty) + } + + _ <- markTransferAsComplete(pendingTransfer, transferSucceeded = !transferExpired) + } yield copiedObjects + } + private def copyBucketFiles( pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer )(implicit executionContext: ExecutionContext): Future[List[Option[StorageObject]]] = @@ -103,21 +121,28 @@ class CloneWorkspaceFileTransferMonitorActor(val dataSource: SlickDataSource, _ = logger.info( s"successfully copied files with prefix ${pendingCloneWorkspaceFileTransfer.copyFilesWithPrefix} from ${pendingCloneWorkspaceFileTransfer.sourceWorkspaceBucketName} to ${pendingCloneWorkspaceFileTransfer.destWorkspaceBucketName}" ) - _ <- markTransferAsComplete(pendingCloneWorkspaceFileTransfer) } yield copiedObjects private def markTransferAsComplete( - pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer - ): Future[Unit] = + pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer, + transferSucceeded: Boolean + ): Future[Unit] = { + val currentTime = DateTime.now() + dataSource.inTransaction { dataAccess => for { - _ <- dataAccess.cloneWorkspaceFileTransferQuery.delete(pendingCloneWorkspaceFileTransfer.destWorkspaceId) + _ <- dataAccess.cloneWorkspaceFileTransferQuery.update( + pendingCloneWorkspaceFileTransfer.copy(finished = currentTime.some, + outcome = if (transferSucceeded) "Success".some else "Failure".some + ) + ) _ <- dataAccess.workspaceQuery.updateCompletedCloneWorkspaceFileTransfer( pendingCloneWorkspaceFileTransfer.destWorkspaceId, - new Date() + currentTime.toDate ) } yield () } + } } final case class CloneWorkspaceFileTransferMonitorConfig(pollInterval: FiniteDuration, initialDelay: FiniteDuration) diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitorSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitorSpec.scala index 6b6a4116e1..2befe0f055 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitorSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/monitor/CloneWorkspaceFileTransferMonitorSpec.scala @@ -17,6 +17,7 @@ import org.scalatest.time.{Seconds, Span} import org.scalatest.{BeforeAndAfterAll, OptionValues} import org.scalatestplus.mockito.MockitoSugar +import java.sql.Timestamp import java.util.UUID import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -212,8 +213,9 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem) ) val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS) - val failureMessage = "because I feel like it" - val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build + val failureMessage = "expected test exception" + val exception = + new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build when( mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId)) ) @@ -301,8 +303,9 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem) ) val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS) - val failureMessage = "because I feel like it" - val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build + val failureMessage = "expected test exception" + val exception = + new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build when( mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId)) ) @@ -402,8 +405,9 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem) ) val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS) - val failureMessage = "because I feel like it" - val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build + val failureMessage = "expected test exception" + val exception = + new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build when( mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId)) ) @@ -423,7 +427,7 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem) destinationBucketName, goodObjectToCopy.getName, Option(destWorkspace.googleProjectId) - ) + )(system.dispatchers.defaultGlobalDispatcher) ) .thenReturn(Future.successful(Option(goodObjectToCopy))) @@ -545,8 +549,9 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem) ) val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS) - val failureMessage = "because I feel like it" - val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build + val failureMessage = "expected test exception" + val exception = + new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build when( mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, @@ -610,4 +615,117 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem) system.stop(actor) } } + + it should "eventually stop trying to copy files" in { + withEmptyTestDatabase { dataSource: SlickDataSource => + val billingProject = RawlsBillingProject(defaultBillingProjectName, + CreationStatuses.Ready, + Option(defaultBillingAccountName), + None, + googleProjectNumber = Option(defaultGoogleProjectNumber) + ) + val sourceBucketName = "sourceBucket" + val destinationBucketName = "destinationBucket" + val copyFilesWithPrefix = "prefix" + val objectToCopy = new StorageObject().setName("copy-me") + val sourceWorkspace = Workspace( + billingProject.projectName.value, + "source", + UUID.randomUUID().toString, + sourceBucketName, + None, + DateTime.now, + DateTime.now, + "creator@example.com", + Map.empty, + false, + WorkspaceVersions.V2, + GoogleProjectId("some-project"), + Option(GoogleProjectNumber("43")), + billingProject.billingAccount, + None, + Option(DateTime.now), + WorkspaceType.RawlsWorkspace, + WorkspaceState.Ready + ) + val destWorkspace = Workspace( + billingProject.projectName.value, + "destination", + UUID.randomUUID().toString, + destinationBucketName, + None, + DateTime.now, + DateTime.now, + "creator@example.com", + Map.empty, + false, + WorkspaceVersions.V2, + GoogleProjectId("different-project"), + Option(GoogleProjectNumber("44")), + billingProject.billingAccount, + None, + None, + WorkspaceType.RawlsWorkspace, + WorkspaceState.Ready + ) + + runAndWait(rawlsBillingProjectQuery.create(billingProject)) + runAndWait(workspaceQuery.createOrUpdate(sourceWorkspace)) + runAndWait(workspaceQuery.createOrUpdate(destWorkspace)) + runAndWait( + cloneWorkspaceFileTransferQuery.save(destWorkspace.workspaceIdAsUUID, + sourceWorkspace.workspaceIdAsUUID, + copyFilesWithPrefix + ) + ) + + val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS) + val failureMessage = "expected test exception" + val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build + when( + mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId)) + ) + .thenReturn(Future.successful(List(objectToCopy))) + when( + mockGcsDAO.copyFile(sourceBucketName, + objectToCopy.getName, + destinationBucketName, + objectToCopy.getName, + Option(destWorkspace.googleProjectId) + )(system.dispatchers.defaultGlobalDispatcher) + ) + .thenReturn(Future.failed(exception)) + + val actor = createCloneWorkspaceFileTransferMonitor(dataSource, mockGcsDAO) + import driver.api._ + runAndWait( + cloneWorkspaceFileTransferQuery + .filter(_.destWorkspaceId === destWorkspace.workspaceIdAsUUID) + .map(_.created) + .update(new Timestamp(DateTime.now().minusDays(2).getMillis)) + ) + + eventually(timeout = timeout(Span(10, Seconds))) { + runAndWait(workspaceQuery.findById(destWorkspace.workspaceIdAsUUID.toString)) + .getOrElse(fail(s"${destWorkspace.name} not found")) + .completedCloneWorkspaceFileTransfer + .isDefined shouldBe true + runAndWait(cloneWorkspaceFileTransferQuery.listPendingTransfers()) shouldBe empty + + val allWorkspaceTransfers = runAndWait( + cloneWorkspaceFileTransferQuery + .filter(_.destWorkspaceId === destWorkspace.workspaceIdAsUUID) + .map(r => (r.finished, r.outcome)) + .result + ) + allWorkspaceTransfers should have size 1 + + val transferResult = allWorkspaceTransfers.head + transferResult._1 shouldBe defined + transferResult._2 shouldBe Some("Failure") + } + + system.stop(actor) + } + } } diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/webservice/WorkspaceApiServiceSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/webservice/WorkspaceApiServiceSpec.scala index 7cd63299e2..57c9166da1 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/webservice/WorkspaceApiServiceSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/webservice/WorkspaceApiServiceSpec.scala @@ -2696,14 +2696,15 @@ class WorkspaceApiServiceSpec extends ApiServiceSpec { } val clonedWorkspaceResult = runAndWait(workspaceQuery.findByName(clonedWorkspaceName)).get - val expected = Seq( - PendingCloneWorkspaceFileTransfer( - clonedWorkspaceResult.workspaceIdAsUUID, - testData.workspace.bucketName, - clonedWorkspaceResult.bucketName, - workspaceCopy.copyFilesWithPrefix.get, - clonedWorkspaceResult.googleProjectId - ) + val expected = PendingCloneWorkspaceFileTransfer( + clonedWorkspaceResult.workspaceIdAsUUID, + testData.workspace.bucketName, + clonedWorkspaceResult.bucketName, + workspaceCopy.copyFilesWithPrefix.get, + clonedWorkspaceResult.googleProjectId, + DateTime.now(), + None, + None ) Get(s"${clonedWorkspaceName.path}/fileTransfers") ~> @@ -2712,9 +2713,16 @@ class WorkspaceApiServiceSpec extends ApiServiceSpec { assertResult(StatusCodes.OK) { status } - assertResult(expected) { - responseAs[Seq[PendingCloneWorkspaceFileTransfer]] - } + + val allTransfers = responseAs[Seq[PendingCloneWorkspaceFileTransfer]] + allTransfers should have size 1 + + val res = allTransfers.headOption.getOrElse(fail("pending transfer expected but not found")) + res.destWorkspaceId shouldBe expected.destWorkspaceId + res.sourceWorkspaceBucketName shouldBe expected.sourceWorkspaceBucketName + res.destWorkspaceBucketName shouldBe expected.destWorkspaceBucketName + res.copyFilesWithPrefix shouldBe expected.copyFilesWithPrefix + res.destWorkspaceGoogleProjectId shouldBe expected.destWorkspaceGoogleProjectId } } diff --git a/model/src/main/scala/org/broadinstitute/dsde/rawls/model/WorkspaceModel.scala b/model/src/main/scala/org/broadinstitute/dsde/rawls/model/WorkspaceModel.scala index d23d87cbc6..bd4f2f740b 100644 --- a/model/src/main/scala/org/broadinstitute/dsde/rawls/model/WorkspaceModel.scala +++ b/model/src/main/scala/org/broadinstitute/dsde/rawls/model/WorkspaceModel.scala @@ -938,7 +938,10 @@ case class PendingCloneWorkspaceFileTransfer(destWorkspaceId: UUID, sourceWorkspaceBucketName: String, destWorkspaceBucketName: String, copyFilesWithPrefix: String, - destWorkspaceGoogleProjectId: GoogleProjectId + destWorkspaceGoogleProjectId: GoogleProjectId, + created: DateTime, + finished: Option[DateTime], + outcome: Option[String] ) case class ManagedGroupAccessInstructions(groupName: String, instructions: String) @@ -1263,7 +1266,7 @@ class WorkspaceJsonSupport extends JsonSupport { implicit val WorkspaceResponseFormat: RootJsonFormat[WorkspaceResponse] = jsonFormat10(WorkspaceResponse) - implicit val PendingCloneWorkspaceFileTransferFormat: RootJsonFormat[PendingCloneWorkspaceFileTransfer] = jsonFormat5( + implicit val PendingCloneWorkspaceFileTransferFormat: RootJsonFormat[PendingCloneWorkspaceFileTransfer] = jsonFormat8( PendingCloneWorkspaceFileTransfer )