Skip to content

Commit

Permalink
CORE-133: add status column for billing account change synchronizer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Nov 25, 2024
1 parent d280688 commit 1f24e53
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@
<include file="changesets/20241106_streamline_entity_attr_temp_table_procedure.xml" relativeToChangelogFile="true"/>
<include file="changesets/20241106_streamline_workspace_attr_temp_table_procedure.xml" relativeToChangelogFile="true"/>
<include file="changesets/20241120_rename_submission_cost_cap_threshold.xml" relativeToChangelogFile="true"/>
<include file="changesets/20241121_billing_account_changes_status.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog logicalFilePath="dummy" xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.30.xsd">
<changeSet id="add_billing_account_changes_status" author="davidan" logicalFilePath="dummy">
<!-- add BILLING_ACCOUNT_CHANGES.STATUS column, defaulting to 'outstanding' -->
<addColumn tableName="BILLING_ACCOUNT_CHANGES">
<column name="STATUS"
type="ENUM('Ignored','Outstanding','Synchronized')"
defaultValue="Outstanding"
afterColumn="GOOGLE_SYNC_TIME">
<constraints nullable="false"/>
</column>
</addColumn>
<!-- add an index on the BILLING_ACCOUNT_CHANGES.STATUS column -->
<createIndex tableName="BILLING_ACCOUNT_CHANGES" indexName="BILLING_ACCOUNT_CHANGES_STATUS" unique="false">
<column name="STATUS"
remarks="Has this change been synchronized with GCP?"/>
</createIndex>
<!-- set STATUS='synchronized' for all previously-synchronized rows -->
<sql>update BILLING_ACCOUNT_CHANGES
set STATUS='Synchronized'
where GOOGLE_SYNC_TIME is not null;</sql>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.implicits.catsSyntaxOptionId
import org.broadinstitute.dsde.rawls.RawlsException
import org.broadinstitute.dsde.rawls.dataaccess.GoogleApiTypes.GoogleApiType
import org.broadinstitute.dsde.rawls.dataaccess.GoogleOperationNames.GoogleOperationName
import org.broadinstitute.dsde.rawls.dataaccess.slick.BillingAccountChangeStatus.BillingAccountChangeStatus
import org.broadinstitute.dsde.rawls.dataaccess.{GoogleApiTypes, GoogleOperationNames}
import org.broadinstitute.dsde.rawls.model.CreationStatuses.CreationStatus
import org.broadinstitute.dsde.rawls.model._
Expand Down Expand Up @@ -101,9 +102,24 @@ final case class BillingAccountChange(id: Long,
newBillingAccount: Option[RawlsBillingAccountName],
created: Instant,
googleSyncTime: Option[Instant],
status: BillingAccountChangeStatus,
outcome: Option[Outcome]
)

object BillingAccountChangeStatus extends Enumeration {
type BillingAccountChangeStatus = Value
val Ignored: BillingAccountChangeStatus = Value("Ignored")
val Outstanding: BillingAccountChangeStatus = Value("Outstanding")
val Synchronized: BillingAccountChangeStatus = Value("Synchronized")

def apply(value: String): BillingAccountChangeStatus = value match {
case "Ignored" => Ignored
case "Outstanding" => Outstanding
case "Synchronized" => Synchronized
case _ => throw new NoSuchElementException()
}
}

trait RawlsBillingProjectComponent {
this: DriverComponent =>

Expand Down Expand Up @@ -168,6 +184,8 @@ trait RawlsBillingProjectComponent {

def googleSyncTime = column[Option[Timestamp]]("GOOGLE_SYNC_TIME")

def status = column[String]("STATUS")

def outcome = column[Option[String]]("OUTCOME")

def message = column[Option[String]]("MESSAGE")
Expand All @@ -181,6 +199,7 @@ trait RawlsBillingProjectComponent {
newBillingAccount,
created,
googleSyncTime,
status,
outcome,
message
) <> (
Expand Down Expand Up @@ -447,12 +466,16 @@ trait RawlsBillingProjectComponent {
// - so the `WorkspaceBillingAccountActor` can synchronise the changes with google
// - to keep an audit log of billing account changes
_ <- DBIO.sequence(billingProjects.map { project =>
BillingAccountChanges.create(
project.projectName,
project.billingAccount,
billingAccount,
userSubjectId
)
// ignore all currently-outstanding changes for this project
BillingAccountChanges.ignoreAllOutstanding(project.projectName) andThen {
// insert the most recent change for this project
BillingAccountChanges.create(
project.projectName,
project.billingAccount,
billingAccount,
userSubjectId
)
}
})
} yield count
}
Expand All @@ -469,6 +492,7 @@ trait RawlsBillingProjectComponent {
Option[String], // New billing account
Timestamp, // Created
Option[Timestamp], // Google sync time
String, // status
Option[String], // Outcome
Option[String] // Message
)
Expand All @@ -481,6 +505,7 @@ trait RawlsBillingProjectComponent {
newBillingAccount,
created,
googleSyncTime,
status,
outcome,
message
) =>
Expand All @@ -493,6 +518,7 @@ trait RawlsBillingProjectComponent {
newBillingAccount.map(RawlsBillingAccountName),
created.toInstant,
googleSyncTime.map(_.toInstant),
BillingAccountChangeStatus.apply(status),
outcome
)
}
Expand All @@ -508,6 +534,7 @@ trait RawlsBillingProjectComponent {
billingAccountChange.newBillingAccount.map(_.value),
Timestamp.from(billingAccountChange.created),
billingAccountChange.googleSyncTime.map(Timestamp.from),
billingAccountChange.status.toString,
outcome,
message
)
Expand Down Expand Up @@ -577,5 +604,20 @@ trait RawlsBillingProjectComponent {

def setGoogleSyncTime(syncTime: Option[Instant]): WriteAction[Int] =
query.map(_.googleSyncTime).update(syncTime.map(Timestamp.from))

def setStatus(status: BillingAccountChangeStatus): WriteAction[Int] =
query.map(_.status).update(status.toString)

def ignoreAllOutstanding(billingProjectName: RawlsBillingProjectName): WriteAction[Int] =
query
.filter(c =>
c.billingProjectName === billingProjectName.value && c.status === BillingAccountChangeStatus.Outstanding.toString
)
.map(_.status)
.update(BillingAccountChangeStatus.Ignored.toString)

def nextOutstanding(): BillingAccountChangeQuery =
query.filter(_.status === BillingAccountChangeStatus.Outstanding.toString).sortBy(_.id).take(1)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import cats.implicits.{
import cats.mtl.Ask
import cats.{Applicative, Functor, Monad, MonadThrow}
import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.rawls.dataaccess.slick.{BillingAccountChange, ReadWriteAction, WriteAction}
import org.broadinstitute.dsde.rawls.dataaccess.slick.{
BillingAccountChange,
BillingAccountChangeStatus,
ReadWriteAction,
WriteAction
}
import org.broadinstitute.dsde.rawls.dataaccess.{GoogleServicesDAO, SamDAO, SlickDataSource}
import org.broadinstitute.dsde.rawls.model._
import org.broadinstitute.dsde.rawls.monitor.migration.MigrationUtils.Implicits._
Expand Down Expand Up @@ -96,6 +101,12 @@ final case class BillingAccountChangeSynchronizer(dataSource: SlickDataSource,
for {
billingProject <- loadBillingProject

// Try out the new query using the STATUS column and compare results. If validation fails, don't
// fail the entire process.
_ <- validateStatusColumn.recover { case e: Exception =>
logger.warn(s"BillingAccountChangeStatus logic failed with ${e.getMessage}", e)
}

// v1 billing projects are backed by google projects and are used for v1 workspace billing
updateBillingProjectOutcome <- M.ifM(isV1BillingProject(billingProject.projectName))(
updateBillingProjectGoogleProject(billingProject, tracingContext),
Expand All @@ -106,6 +117,44 @@ final case class BillingAccountChangeSynchronizer(dataSource: SlickDataSource,
_ <- writeBillingAccountChangeOutcome(updateBillingProjectOutcome |+| updateWorkspacesOutcome)
} yield ()

private def validateStatusColumn[F[_]](implicit
R: Ask[F, BillingAccountChange],
M: Monad[F],
L: LiftIO[F]
): F[Unit] =
for {
(changeStatus, changeId) <- R.reader(x => (x.status, x.id))
byStatus <- inTransaction(BillingAccountChanges.nextOutstanding().result)
} yield
/* validation:
- byStatus should find exactly 1
- byStatus should have same id
- changeStatus should have status == Outstanding
*/
if (
byStatus.length == 1 &&
byStatus.head.id == changeId &&
changeStatus == BillingAccountChangeStatus.Outstanding
) {
logger.info(s"BillingAccountChangeStatus logic correct for change id $changeId")
} else {
// collect errors
val sb = new StringBuilder(s"BillingAccountChangeStatus logic error for change id $changeId!")
if (byStatus.isEmpty) {
sb.append(" By-status lookup was empty.")
}
if (byStatus.length > 1) {
sb.append(s" By-status lookup had length ${byStatus.length}.")
}
if (changeStatus != BillingAccountChangeStatus.Outstanding) {
sb.append(s" Legacy lookup had status $changeStatus.")
}
if (changeId != byStatus.head.id) {
sb.append(s" Legacy found id $changeId, but by-status head had id ${byStatus.head.id}")
}
logger.warn(sb.toString())
}

private def loadBillingProject[F[_]](implicit
R: Ask[F, BillingAccountChange],
M: Monad[F],
Expand Down Expand Up @@ -278,11 +327,12 @@ final case class BillingAccountChangeSynchronizer(dataSource: SlickDataSource,
case Success => info("Successfully synchronized Billing Account change")
case Failure(message) => warn("Failed to synchronize Billing Account change", "details" -> message)
}

changeId <- R.reader(_.id)
record = BillingAccountChanges.withId(changeId)
_ <- inTransaction {
record.setGoogleSyncTime(Instant.now().some) *> record.setOutcome(outcome.some)
record.setGoogleSyncTime(Instant.now().some) *>
record.setOutcome(outcome.some) *>
record.setStatus(BillingAccountChangeStatus.Synchronized)
}
} yield ()

Expand Down
Loading

0 comments on commit 1f24e53

Please sign in to comment.