Skip to content

Commit

Permalink
CORE-133: Optimize SQL in billing account change synchronizer (#3145)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Dec 4, 2024
1 parent 6ff3602 commit 587aa68
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.broadinstitute.dsde.rawls.dataaccess.slick

import cats.implicits.catsSyntaxOptionId
import com.google.common.annotations.VisibleForTesting
import org.broadinstitute.dsde.rawls.RawlsException
import org.broadinstitute.dsde.rawls.dataaccess.GoogleApiTypes.GoogleApiType
import org.broadinstitute.dsde.rawls.dataaccess.GoogleOperationNames.GoogleOperationName
Expand Down Expand Up @@ -540,6 +541,7 @@ trait RawlsBillingProjectComponent {
)
}

@VisibleForTesting
def getLastChange(billingProject: RawlsBillingProjectName): ReadAction[Option[BillingAccountChange]] =
BillingAccountChanges
.withProjectName(billingProject)
Expand Down Expand Up @@ -574,27 +576,6 @@ trait RawlsBillingProjectComponent {
def withProjectName(billingProjectName: RawlsBillingProjectName): BillingAccountChangeQuery =
query.filter(_.billingProjectName === billingProjectName.value)

/* SELECT *
* FROM BILLING_ACCOUNT_CHANGES BAC,
* ( SELECT BILLING_PROJECT_NAME, MAX(ID) AS MAXID
* FROM BILLING_ACCOUNT_CHANGES
* GROUP BY BILLING_PROJECT_NAME
* ) AS SUBTABLE
* WHERE SUBTABLE.MAXID = BAC.ID
*/
/**
* Selects the latest changes for all billing projects in query.
*/
def latestChanges: BillingAccountChangeQuery = {
val latestChangeIds = query
.groupBy(_.billingProjectName)
.map { case (_, group) => group.map(_.id).max }

query
.filter(_.id.in(latestChangeIds))
.sortBy(_.id.asc)
}

def unsynced: BillingAccountChangeQuery =
query.filter(_.googleSyncTime.isEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ import cats.implicits.{
toFunctorOps
}
import cats.mtl.Ask
import cats.{Applicative, Functor, Monad, MonadThrow}
import cats.{Functor, Monad, MonadThrow}
import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.rawls.dataaccess.slick.{
BillingAccountChange,
BillingAccountChangeStatus,
ReadWriteAction,
WriteAction
ReadWriteAction
}
import org.broadinstitute.dsde.rawls.dataaccess.{GoogleServicesDAO, SamDAO, SlickDataSource}
import org.broadinstitute.dsde.rawls.model._
Expand All @@ -47,7 +46,7 @@ object BillingAccountChangeSynchronizer {
Behaviors.setup { context =>
val actor = BillingAccountChangeSynchronizer(dataSource, gcsDAO, samDAO)
Behaviors.withTimers { scheduler =>
scheduler.startTimerAtFixedRate(UpdateBillingAccounts, initialDelay, pollInterval)
scheduler.startTimerWithFixedDelay(UpdateBillingAccounts, initialDelay, pollInterval)
Behaviors.receiveMessage { case UpdateBillingAccounts =>
try actor.updateBillingAccounts.unsafeRunSync()
catch {
Expand Down Expand Up @@ -88,9 +87,7 @@ final case class BillingAccountChangeSynchronizer(dataSource: SlickDataSource,

def readABillingProjectChange: IO[Option[BillingAccountChange]] =
inTransaction {
BillingAccountChanges.latestChanges.unsynced
.take(1)
.result
BillingAccountChanges.nextOutstanding().result
}.map(_.headOption)

private def syncBillingAccountChange[F[_]](tracingContext: RawlsTracingContext)(implicit
Expand All @@ -101,12 +98,6 @@ final case class BillingAccountChangeSynchronizer(dataSource: SlickDataSource,
for {
billingProject <- loadBillingProject

// Try out the new query using the STATUS column and compare results. If validation fails, don't
// fail the entire process.
_ <- validateStatusColumn.recover { case e: Exception =>
logger.warn(s"BillingAccountChangeStatus logic failed with ${e.getMessage}", e)
}

// v1 billing projects are backed by google projects and are used for v1 workspace billing
updateBillingProjectOutcome <- M.ifM(isV1BillingProject(billingProject.projectName))(
updateBillingProjectGoogleProject(billingProject, tracingContext),
Expand All @@ -117,44 +108,6 @@ final case class BillingAccountChangeSynchronizer(dataSource: SlickDataSource,
_ <- writeBillingAccountChangeOutcome(updateBillingProjectOutcome |+| updateWorkspacesOutcome)
} yield ()

private def validateStatusColumn[F[_]](implicit
R: Ask[F, BillingAccountChange],
M: Monad[F],
L: LiftIO[F]
): F[Unit] =
for {
(changeStatus, changeId) <- R.reader(x => (x.status, x.id))
byStatus <- inTransaction(BillingAccountChanges.nextOutstanding().result)
} yield
/* validation:
- byStatus should find exactly 1
- byStatus should have same id
- changeStatus should have status == Outstanding
*/
if (
byStatus.length == 1 &&
byStatus.head.id == changeId &&
changeStatus == BillingAccountChangeStatus.Outstanding
) {
logger.info(s"BillingAccountChangeStatus logic correct for change id $changeId")
} else {
// collect errors
val sb = new StringBuilder(s"BillingAccountChangeStatus logic error for change id $changeId!")
if (byStatus.isEmpty) {
sb.append(" By-status lookup was empty.")
}
if (byStatus.length > 1) {
sb.append(s" By-status lookup had length ${byStatus.length}.")
}
if (changeStatus != BillingAccountChangeStatus.Outstanding) {
sb.append(s" Legacy lookup had status $changeStatus.")
}
if (changeId != byStatus.head.id) {
sb.append(s" Legacy found id $changeId, but by-status head had id ${byStatus.head.id}")
}
logger.warn(sb.toString())
}

private def loadBillingProject[F[_]](implicit
R: Ask[F, BillingAccountChange],
M: Monad[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,35 +361,6 @@ class RawlsBillingProjectComponentSpec
next shouldBe empty
}

"BillingAccountChange" should "be able to load records that need to be sync'd" in withDefaultTestDatabase {
runAndWait {
import driver.api._
for {
_ <- rawlsBillingProjectQuery.updateBillingAccount(testData.testProject1Name,
billingAccount = RawlsBillingAccountName("bananas").some,
testData.userOwner.userSubjectId
)
_ <- rawlsBillingProjectQuery.updateBillingAccount(testData.testProject2Name,
billingAccount = RawlsBillingAccountName("kumquat").some,
testData.userOwner.userSubjectId
)
_ <- rawlsBillingProjectQuery.updateBillingAccount(testData.testProject1Name,
billingAccount = RawlsBillingAccountName("kumquat").some,
testData.userOwner.userSubjectId
)
changes <- BillingAccountChanges.latestChanges.result

// We're only concerned with syncing the latest change a user made to the
// billing project billing account. Right now, we're getting the latest changes
// in order of ID. We *COULD* get the changes in order of when the first skipped
// change was made. That's more complicated, so we'll do this for now to keep
// things simple.
change1 <- BillingAccountChanges.getLastChange(testData.testProject2Name)
change2 <- BillingAccountChanges.getLastChange(testData.testProject1Name)
} yield changes shouldBe List(change1, change2).map(_.value)
}
}

// =========== test helpers

// for a given billing project, return the distinct statuses and their counts from the db
Expand Down

0 comments on commit 587aa68

Please sign in to comment.