Skip to content

Commit

Permalink
[CORE-267] Query BQ once per BP for spend report, handle errors (#3166)
Browse files Browse the repository at this point in the history
* split allworkspace query by bp
* handle errors from bq
* group by exporttable instead of bp
  • Loading branch information
calypsomatic authored Jan 23, 2025
1 parent 9e60b98 commit 44fc4ae
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,21 @@ case class SpendReportingAggregationKeyWithSub(key: SpendReportingAggregationKey
subAggregationKey: Option[SpendReportingAggregationKey] = None
)

case class SpendReportingResults(spendDetails: Seq[SpendReportingAggregation], spendSummary: SpendReportingForDateRange)
case class SpendReportingResults(spendDetails: Seq[SpendReportingAggregation],
spendSummary: SpendReportingForDateRange
) {
def +(other: SpendReportingResults): SpendReportingResults =
new SpendReportingResults(
spendDetails ++ other.spendDetails,
SpendReportingForDateRange(
(BigDecimal(this.spendSummary.cost) + BigDecimal(other.spendSummary.cost)).toString,
(BigDecimal(this.spendSummary.credits) + BigDecimal(other.spendSummary.credits)).toString,
this.spendSummary.currency,
this.spendSummary.startTime,
this.spendSummary.endTime
)
)
}
object SpendReportingResults {
def apply(spendReport: bio.terra.profile.model.SpendReport): SpendReportingResults = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ class SpendReportingService(
}

def getAllUserWorkspaceQuery(
billingProjects: Map[BillingProjectSpendExport, Seq[(GoogleProjectId, WorkspaceName)]],
spendExportTable: String,
workspaces: Seq[GoogleProjectId],
pageSize: Int,
offset: Int
): String = {
Expand All @@ -383,16 +384,16 @@ class SpendReportingService(
| spend_category,
| currency""".stripMargin.trim

val bpSubQuery = billingProjects
.map { bp =>
val tableName = bp._1.spendExportTable.getOrElse(spendReportingServiceConfig.defaultTableName)
val timePartitionColumn: String = getTimePartitionColumn(tableName)
baseQuery
.replace("_PARTITIONTIME", timePartitionColumn)
.replace("_BILLING_ACCOUNT_TABLE", tableName)
.replace("_PROJECT_ID_LIST", "(" + bp._2.map(tuple => s""""${tuple._1.value}"""").mkString(", ") + ")")
}
.mkString("\nUNION ALL\n")
val timePartitionColumn: String = getTimePartitionColumn(spendExportTable)
val bpSubQuery =
baseQuery
.replace("_PARTITIONTIME", timePartitionColumn)
.replace("_BILLING_ACCOUNT_TABLE", spendExportTable)
.replace("_PROJECT_ID_LIST",
workspaces
.map(projectId => s""""${projectId}"""")
.mkString("(", ", ", ")")
)

s"""WITH spend_categories AS (
|$bpSubQuery
Expand Down Expand Up @@ -568,15 +569,27 @@ class SpendReportingService(
return Future.successful(None)
}
projectNames: Map[GoogleProjectId, WorkspaceName] = billingMap.values.flatten.toMap
query = getAllUserWorkspaceQuery(billingMap, pageSize, offset)
queryJob = setUpAllUserWorkspaceQuery(query, start, end)
tableToProjectIdsMap: Map[String, Seq[GoogleProjectId]] = billingMap.map { case (table, workspaces) =>
table -> workspaces.map(_._1)
}
results <- Future.sequence(tableToProjectIdsMap.map { case (spendExportTable, projects) =>
val query = getAllUserWorkspaceQuery(spendExportTable, projects, pageSize, offset)
val queryJob = setUpAllUserWorkspaceQuery(query, start, end)
runBigQueryJob(queryJob, childContext)
.map { result =>
result.getValues.asScala.toList match {
case Nil => None
case rows => Some(extractCrossBillingProjectSpendReportingResults(rows, start, end, projectNames))
}
}
.recoverWith { case ex: Throwable =>
logger.warn(s"Error fetching results from BigQuery: ${ex.getMessage}")
Future.successful(None)
}
})
combinedResults = results.flatten.reduceOption((acc, res) => acc + res)
} yield combinedResults

result <- runBigQueryJob(queryJob, childContext)
} yield result.getValues.asScala.toList match {
case Nil =>
None
case rows => Some(extractCrossBillingProjectSpendReportingResults(rows, start, end, projectNames))
}
}

def runBigQueryJob(queryJob: JobInfo, ctx: RawlsRequestContext): Future[TableResult] =
Expand All @@ -590,7 +603,7 @@ class SpendReportingService(

def getBillingWithSpendPermission(
parentContext: RawlsRequestContext
): Future[Map[BillingProjectSpendExport, Seq[(GoogleProjectId, WorkspaceName)]]] =
): Future[Map[String, Seq[(GoogleProjectId, WorkspaceName)]]] =
traceFutureWithParent("getBillingWithSpendPermission", parentContext) { childContext =>
for {
ownerWorkspaces <- samDAO.listResourcesWithActions(
Expand All @@ -617,10 +630,17 @@ class SpendReportingService(
} else {
getSpendExportConfigurations(groupedWorkspaces.keys.toList)
}
} yield spendConfigs.map { config =>
config -> groupedWorkspaces
.getOrElse(RawlsBillingProjectName(config.billingProjectName.value), Seq.empty)
.map(ws => (ws.googleProjectId, ws.toWorkspaceName))
}.toMap
groupedByTable = spendConfigs.groupBy(
_.spendExportTable.getOrElse(spendReportingServiceConfig.defaultTableName)
)
combinedResults = groupedByTable.map { case (table, configs) =>
val combinedWorkspaces = configs.flatMap { config =>
groupedWorkspaces
.getOrElse(RawlsBillingProjectName(config.billingProjectName.value), Seq.empty)
.map(ws => (ws.googleProjectId, ws.toWorkspaceName))
}
table -> combinedWorkspaces
}
} yield combinedResults
}
}
Loading

0 comments on commit 44fc4ae

Please sign in to comment.