Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WOR-161] Stop trying to clone workspace files if it hasn't succeeded within a day #2644

Merged
merged 4 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@
<include file="changesets/20230810_track_mrb_sts_progress.xml" relativeToChangelogFile="true"/>
<include file="changesets/20230829_mrb_add_sts_project.xml" relativeToChangelogFile="true"/>
<include file="changesets/20231013_submission_monitor_script.xml" relativeToChangelogFile="true"/>
<include file="changesets/20231130_limit_clone_workspace_file_transfer.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog logicalFilePath="dummy" xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">
<changeSet logicalFilePath="dummy" author="mtalbott" id="add_timestamps_outcome_CLONE_WORKSPACE_FILE_TRANSFER">
<addColumn tableName="CLONE_WORKSPACE_FILE_TRANSFER">
<column name="CREATED" type="DATETIME" defaultValueComputed="CURRENT_TIMESTAMP">
<constraints nullable="false" />
</column>
</addColumn>
<addColumn tableName="CLONE_WORKSPACE_FILE_TRANSFER">
<column name="FINISHED" type="DATETIME">
<constraints nullable="true" />
</column>
</addColumn>
<addColumn tableName="CLONE_WORKSPACE_FILE_TRANSFER">
<column name="OUTCOME" type="VARCHAR(254)">
<constraints nullable="true" />
</column>
</addColumn>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.broadinstitute.dsde.rawls.dataaccess.slick

import org.broadinstitute.dsde.rawls.model.{GoogleProjectId, PendingCloneWorkspaceFileTransfer}
import org.joda.time.DateTime

import java.sql.Timestamp
import java.util.UUID

case class CloneWorkspaceFileTransferRecord(id: Long,
destWorkspaceId: UUID,
sourceWorkspaceId: UUID,
copyFilesWithPrefix: String
copyFilesWithPrefix: String,
created: Timestamp,
finished: Option[Timestamp],
outcome: Option[String]
)

trait CloneWorkspaceFileTransferComponent {
Expand All @@ -21,11 +26,17 @@ trait CloneWorkspaceFileTransferComponent {
def destWorkspaceId = column[UUID]("DEST_WORKSPACE_ID")
def sourceWorkspaceId = column[UUID]("SOURCE_WORKSPACE_ID")
def copyFilesWithPrefix = column[String]("COPY_FILES_WITH_PREFIX", O.Length(254))
def created = column[Timestamp]("CREATED")
def finished = column[Option[Timestamp]]("FINISHED")
def outcome = column[Option[String]]("OUTCOME")

def * = (id,
destWorkspaceId,
sourceWorkspaceId,
copyFilesWithPrefix
copyFilesWithPrefix,
created,
finished,
outcome
) <> (CloneWorkspaceFileTransferRecord.tupled, CloneWorkspaceFileTransferRecord.unapply)
}

Expand All @@ -43,16 +54,22 @@ trait CloneWorkspaceFileTransferComponent {

def listPendingTransfers(workspaceId: Option[UUID] = None): ReadAction[Seq[PendingCloneWorkspaceFileTransfer]] = {
val query = for {
fileTransfer <- cloneWorkspaceFileTransferQuery.filterOpt(workspaceId) { case (table, workspaceId) =>
table.destWorkspaceId === workspaceId
}
fileTransfer <-
cloneWorkspaceFileTransferQuery
.filter(_.finished.isEmpty)
.filterOpt(workspaceId) { case (table, workspaceId) =>
table.destWorkspaceId === workspaceId
}
sourceWorkspace <- workspaceQuery if sourceWorkspace.id === fileTransfer.sourceWorkspaceId
destWorkspace <- workspaceQuery if destWorkspace.id === fileTransfer.destWorkspaceId
} yield (destWorkspace.id,
sourceWorkspace.bucketName,
destWorkspace.bucketName,
fileTransfer.copyFilesWithPrefix,
destWorkspace.googleProjectId
destWorkspace.googleProjectId,
fileTransfer.created,
fileTransfer.finished,
fileTransfer.outcome
)

query.result.map(results =>
Expand All @@ -61,18 +78,34 @@ trait CloneWorkspaceFileTransferComponent {
sourceWorkspaceBucketName,
destWorkspaceBucketName,
copyFilesWithPrefix,
destWorkspaceGoogleProjectId
destWorkspaceGoogleProjectId,
created,
finished,
outcome
) =>
PendingCloneWorkspaceFileTransfer(destWorkspaceId,
sourceWorkspaceBucketName,
destWorkspaceBucketName,
copyFilesWithPrefix,
GoogleProjectId(destWorkspaceGoogleProjectId)
PendingCloneWorkspaceFileTransfer(
destWorkspaceId,
sourceWorkspaceBucketName,
destWorkspaceBucketName,
copyFilesWithPrefix,
GoogleProjectId(destWorkspaceGoogleProjectId),
new DateTime(created),
finished.map(new DateTime(_)),
outcome
)
}
)
}

def update(pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer): ReadWriteAction[Int] =
findByDestWorkspaceId(pendingCloneWorkspaceFileTransfer.destWorkspaceId)
.map(ft => (ft.finished, ft.outcome))
.update(
(pendingCloneWorkspaceFileTransfer.finished.map(f => new Timestamp(f.getMillis)),
pendingCloneWorkspaceFileTransfer.outcome
)
)

def delete(destWorkspaceId: UUID): ReadWriteAction[Int] =
findByDestWorkspaceId(destWorkspaceId).delete

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.rawls.dataaccess._
import org.broadinstitute.dsde.rawls.model._
import org.broadinstitute.dsde.rawls.monitor.CloneWorkspaceFileTransferMonitor.CheckAll
import org.joda.time.DateTime

import java.util.Date
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
Expand Down Expand Up @@ -50,7 +50,7 @@ class CloneWorkspaceFileTransferMonitorActor(val dataSource: SlickDataSource,
}
_ <- pendingTransfers.toList
.traverse { pendingTransfer =>
IO.fromFuture(IO(copyBucketFiles(pendingTransfer))).attempt.map {
IO.fromFuture(IO(attemptTransfer(pendingTransfer))).attempt.map {
case Left(e) =>
// We do not want to throw e here. traverse stops executing as soon as it encounters a Failure, but we
// want to continue traversing the list to transfer the rest of the buckets even if one of the
Expand All @@ -66,6 +66,24 @@ class CloneWorkspaceFileTransferMonitorActor(val dataSource: SlickDataSource,
.unsafeToFuture()
} yield ()

private def attemptTransfer(
pendingTransfer: PendingCloneWorkspaceFileTransfer
)(implicit executionContext: ExecutionContext): Future[List[Option[StorageObject]]] = {
val transferExpired = pendingTransfer.created.isBefore(DateTime.now().minusDays(1))
for {
copiedObjects <-
if (!transferExpired) copyBucketFiles(pendingTransfer)
else {
logger.warn(
s"File transfer from ${pendingTransfer.sourceWorkspaceBucketName} to ${pendingTransfer.destWorkspaceBucketName} did not succeed within allowed time and will no longer be retried. [workspaceId=${pendingTransfer.destWorkspaceId}]"
)
Future.successful(List.empty)
}

_ <- markTransferAsComplete(pendingTransfer, transferSucceeded = !transferExpired)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So copyBucketFiles doesn't complete until the transfer is complete? But somehow we get to the point where we can kill off a transfer that is taking too long….

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addresses cases where a source workspace's files can't be copied to a destination workspace for longer than a day due to persistent errors. When that happens, the monitor currently tries to clone the files nonstop and fills up our logs unnecessarily.

It doesn't attempt to protect against long running copy operations while transferring large buckets. While it would be nice to have those protections, the solution is likely to switch to STS which would be more involved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so copyBucketFIles would throw an exception at some point, and then the monitor kicks off this code again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. And if copyBucketFiles continues to throw exceptions for over a day for a given workspace, this will make it so the monitor stops trying to transfer the files for that workspace

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is to stop retries, not to interrupt an in-progress operation?
And then if it fails, this entire function will return a failed future (it won't even get to markTransferAsComplete).

} yield copiedObjects
}

private def copyBucketFiles(
pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer
)(implicit executionContext: ExecutionContext): Future[List[Option[StorageObject]]] =
Expand Down Expand Up @@ -103,21 +121,28 @@ class CloneWorkspaceFileTransferMonitorActor(val dataSource: SlickDataSource,
_ = logger.info(
s"successfully copied files with prefix ${pendingCloneWorkspaceFileTransfer.copyFilesWithPrefix} from ${pendingCloneWorkspaceFileTransfer.sourceWorkspaceBucketName} to ${pendingCloneWorkspaceFileTransfer.destWorkspaceBucketName}"
)
_ <- markTransferAsComplete(pendingCloneWorkspaceFileTransfer)
} yield copiedObjects

private def markTransferAsComplete(
pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer
): Future[Unit] =
pendingCloneWorkspaceFileTransfer: PendingCloneWorkspaceFileTransfer,
transferSucceeded: Boolean
): Future[Unit] = {
val currentTime = DateTime.now()

dataSource.inTransaction { dataAccess =>
for {
_ <- dataAccess.cloneWorkspaceFileTransferQuery.delete(pendingCloneWorkspaceFileTransfer.destWorkspaceId)
_ <- dataAccess.cloneWorkspaceFileTransferQuery.update(
pendingCloneWorkspaceFileTransfer.copy(finished = currentTime.some,
outcome = if (transferSucceeded) "Success".some else "Failure".some
)
)
_ <- dataAccess.workspaceQuery.updateCompletedCloneWorkspaceFileTransfer(
pendingCloneWorkspaceFileTransfer.destWorkspaceId,
new Date()
currentTime.toDate
)
} yield ()
}
}
}

final case class CloneWorkspaceFileTransferMonitorConfig(pollInterval: FiniteDuration, initialDelay: FiniteDuration)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.scalatest.time.{Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, OptionValues}
import org.scalatestplus.mockito.MockitoSugar

import java.sql.Timestamp
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -213,7 +214,8 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem)

val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS)
val failureMessage = "because I feel like it"
val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build
val exception =
new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build
Copy link
Contributor Author

@marctalbott marctalbott Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added .setMessage to this exceptions and others in the tests since they were showing up in the logs with null messages and I was initially concerned something was wrong. The message makes it more clear that these exceptions are expected.

Before:

com.google.api.client.http.HttpResponseException: null
	at com.google.api.client.http.HttpResponseException$Builder.build(HttpResponseException.java:293)
	...

After:

com.google.api.client.http.HttpResponseException: because I feel like it
	at com.google.api.client.http.HttpResponseException$Builder.build(HttpResponseException.java:293)
	...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps make the exception text more meaningful, like "expected test exception"?

when(
mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId))
)
Expand Down Expand Up @@ -302,7 +304,8 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem)

val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS)
val failureMessage = "because I feel like it"
val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build
val exception =
new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build
when(
mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId))
)
Expand Down Expand Up @@ -403,7 +406,8 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem)

val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS)
val failureMessage = "because I feel like it"
val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build
val exception =
new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build
when(
mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId))
)
Expand All @@ -423,7 +427,7 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem)
destinationBucketName,
goodObjectToCopy.getName,
Option(destWorkspace.googleProjectId)
)
)(system.dispatchers.defaultGlobalDispatcher)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing a NPE since the call wasn't properly mocked. The test still succeeded because the verify only checks that the method was called 5+ times, not that the method succeeded on those calls.

)
.thenReturn(Future.successful(Option(goodObjectToCopy)))

Expand Down Expand Up @@ -546,7 +550,8 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem)

val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS)
val failureMessage = "because I feel like it"
val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build
val exception =
new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).setMessage(failureMessage).build
when(
mockGcsDAO.listObjectsWithPrefix(sourceBucketName,
copyFilesWithPrefix,
Expand Down Expand Up @@ -610,4 +615,117 @@ class CloneWorkspaceFileTransferMonitorSpec(_system: ActorSystem)
system.stop(actor)
}
}

it should "eventually stop trying to copy files" in {
withEmptyTestDatabase { dataSource: SlickDataSource =>
val billingProject = RawlsBillingProject(defaultBillingProjectName,
CreationStatuses.Ready,
Option(defaultBillingAccountName),
None,
googleProjectNumber = Option(defaultGoogleProjectNumber)
)
val sourceBucketName = "sourceBucket"
val destinationBucketName = "destinationBucket"
val copyFilesWithPrefix = "prefix"
val objectToCopy = new StorageObject().setName("copy-me")
val sourceWorkspace = Workspace(
billingProject.projectName.value,
"source",
UUID.randomUUID().toString,
sourceBucketName,
None,
DateTime.now,
DateTime.now,
"[email protected]",
Map.empty,
false,
WorkspaceVersions.V2,
GoogleProjectId("some-project"),
Option(GoogleProjectNumber("43")),
billingProject.billingAccount,
None,
Option(DateTime.now),
WorkspaceType.RawlsWorkspace,
WorkspaceState.Ready
)
val destWorkspace = Workspace(
billingProject.projectName.value,
"destination",
UUID.randomUUID().toString,
destinationBucketName,
None,
DateTime.now,
DateTime.now,
"[email protected]",
Map.empty,
false,
WorkspaceVersions.V2,
GoogleProjectId("different-project"),
Option(GoogleProjectNumber("44")),
billingProject.billingAccount,
None,
None,
WorkspaceType.RawlsWorkspace,
WorkspaceState.Ready
)

runAndWait(rawlsBillingProjectQuery.create(billingProject))
runAndWait(workspaceQuery.createOrUpdate(sourceWorkspace))
runAndWait(workspaceQuery.createOrUpdate(destWorkspace))
runAndWait(
cloneWorkspaceFileTransferQuery.save(destWorkspace.workspaceIdAsUUID,
sourceWorkspace.workspaceIdAsUUID,
copyFilesWithPrefix
)
)

val mockGcsDAO = mock[GoogleServicesDAO](RETURNS_SMART_NULLS)
val failureMessage = "because I feel like it"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not again!!

val exception = new HttpResponseException.Builder(403, failureMessage, new HttpHeaders()).build
when(
mockGcsDAO.listObjectsWithPrefix(sourceBucketName, copyFilesWithPrefix, Option(destWorkspace.googleProjectId))
)
.thenReturn(Future.successful(List(objectToCopy)))
when(
mockGcsDAO.copyFile(sourceBucketName,
objectToCopy.getName,
destinationBucketName,
objectToCopy.getName,
Option(destWorkspace.googleProjectId)
)(system.dispatchers.defaultGlobalDispatcher)
)
.thenReturn(Future.failed(exception))

val actor = createCloneWorkspaceFileTransferMonitor(dataSource, mockGcsDAO)
import driver.api._
runAndWait(
cloneWorkspaceFileTransferQuery
.filter(_.destWorkspaceId === destWorkspace.workspaceIdAsUUID)
.map(_.created)
.update(new Timestamp(DateTime.now().minusDays(2).getMillis))
)

eventually(timeout = timeout(Span(10, Seconds))) {
runAndWait(workspaceQuery.findById(destWorkspace.workspaceIdAsUUID.toString))
.getOrElse(fail(s"${destWorkspace.name} not found"))
.completedCloneWorkspaceFileTransfer
.isDefined shouldBe true
runAndWait(cloneWorkspaceFileTransferQuery.listPendingTransfers()) shouldBe empty

val allWorkspaceTransfers = runAndWait(
cloneWorkspaceFileTransferQuery
.filter(_.destWorkspaceId === destWorkspace.workspaceIdAsUUID)
.map(r => (r.finished, r.outcome))
.result
)
allWorkspaceTransfers should have size 1

val transferResult = allWorkspaceTransfers.head
transferResult._1 shouldBe defined
transferResult._2 shouldBe Some("Failure")
}

system.stop(actor)
}
}
}
Loading
Loading