Skip to content

Commit

Permalink
AJ-1498: tracing and isolation level for updateStatisticsCache (#2652)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Dec 5, 2023
1 parent d7ce758 commit d13c1ba
Showing 1 changed file with 48 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import com.typesafe.scalalogging.LazyLogging
import io.opencensus.trace.{AttributeValue => OpenCensusAttributeValue}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.broadinstitute.dsde.rawls.dataaccess.SlickDataSource
import org.broadinstitute.dsde.rawls.dataaccess.slick.DataAccess
import org.broadinstitute.dsde.rawls.metrics.RawlsInstrumented
import org.broadinstitute.dsde.rawls.model.RawlsTracingContext
import org.broadinstitute.dsde.rawls.monitor.EntityStatisticsCacheMonitor._
import org.broadinstitute.dsde.rawls.util.TracingUtils._
import slick.dbio.DBIO
import slick.jdbc.TransactionIsolation
import slick.jdbc.TransactionIsolation.ReadCommitted

import java.sql.Timestamp
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -105,6 +108,9 @@ trait EntityStatisticsCacheMonitor extends LazyLogging with RawlsInstrumented {
val workspaceCooldown: FiniteDuration
val timeoutPerWorkspace: Duration

// default isolation level for queries in this trait
final val isolationLevel: TransactionIsolation = TransactionIsolation.ReadCommitted

def sweep(): Future[EntityStatisticsCacheMessage] =
trace("EntityStatisticsCacheMonitor.sweep") { rootContext =>
dataSource.inTransaction(
Expand All @@ -123,8 +129,8 @@ trait EntityStatisticsCacheMonitor extends LazyLogging with RawlsInstrumented {
_.putAttribute("workspaceId", OpenCensusAttributeValue.stringAttributeValue(workspaceId.toString))
)
logger.info(s"EntityStatisticsCacheMonitor starting update attempt for workspace $workspaceId.")
traceDBIOWithParent("updateStatisticsCache", rootContext) { _ =>
DBIO.from(updateStatisticsCache(workspaceId, lastModified).map { _ =>
traceDBIOWithParent("updateStatisticsCache", rootContext) { innerSpan =>
DBIO.from(updateStatisticsCache(workspaceId, lastModified, innerSpan).map { _ =>
val outDated = lastModified.getTime - cacheLastUpdated.getOrElse(MIN_CACHE_TIME).getTime
logger.info(s"Updated entity cache for workspace $workspaceId. Cache was ${outDated}ms out of date.")
Sweep
Expand All @@ -138,42 +144,56 @@ trait EntityStatisticsCacheMonitor extends LazyLogging with RawlsInstrumented {
}
}
},
TransactionIsolation.ReadCommitted
isolationLevel
)
}

def updateStatisticsCache(workspaceId: UUID, timestamp: Timestamp): Future[Unit] = {
private def updateStatisticsCache(workspaceId: UUID,
timestamp: Timestamp,
parentSpan: RawlsTracingContext
): Future[Unit] = {
// allow 80% of the per-workspace timeout to be spent calculating the attribute names.
// note that other statements do not have timeouts and are unbounded.
val attrNamesTimeout = (timeoutPerWorkspace * .8).toSeconds.toInt

val updateFuture = dataSource.inTransaction { dataAccess =>
// TODO: beware contention on the approach of delete-all and batch-insert all below
// if we see contention we could move to encoding the entire metadata object as json
// and storing in a single column on WORKSPACE_ENTITY_CACHE
for {
// calculate entity statistics
entityTypesWithCounts <- dataAccess.entityQuery.getEntityTypesWithCounts(workspaceId)
// calculate entity attribute statistics
entityTypesWithAttrNames <- dataAccess.entityQuery.getAttrNamesAndEntityTypes(workspaceId, attrNamesTimeout)
_ <- dataAccess.entityCacheManagementQuery.saveEntityCache(workspaceId,
entityTypesWithCounts,
entityTypesWithAttrNames,
timestamp
)
} yield entityCacheSaveCounter.inc()
}
val updateFuture = dataSource.inTransaction(
dataAccess =>
// TODO: beware contention on the approach of delete-all and batch-insert all below
// if we see contention we could move to encoding the entire metadata object as json
// and storing in a single column on WORKSPACE_ENTITY_CACHE
for {
// calculate entity statistics
entityTypesWithCounts <- traceDBIOWithParent("getEntityTypesWithCounts", parentSpan) { _ =>
dataAccess.entityQuery.getEntityTypesWithCounts(workspaceId)
}
// calculate entity attribute statistics
entityTypesWithAttrNames <- traceDBIOWithParent("getAttrNamesAndEntityTypes", parentSpan) { _ =>
dataAccess.entityQuery.getAttrNamesAndEntityTypes(workspaceId, attrNamesTimeout)
}
_ <- traceDBIOWithParent("saveEntityCache", parentSpan) { _ =>
dataAccess.entityCacheManagementQuery.saveEntityCache(workspaceId,
entityTypesWithCounts,
entityTypesWithAttrNames,
timestamp
)
}
} yield entityCacheSaveCounter.inc(),
isolationLevel
)

updateFuture.recover { case t: Throwable =>
logger.error(s"Error updating statistics cache for workspaceId $workspaceId: ${t.getMessage}", t)
dataSource.inTransaction { dataAccess =>
logger.error(s"Workspace $workspaceId will be ignored by the entity cache monitor: ${t.getMessage}")
// We will set the cacheLastUpdated timestamp to the lowest possible value in MySQL.
// This is a "magic" value that allows the monitor to skip this problematic workspace
// so it does not get caught in a loop. These workspaces will require manual intervention.
val errMsg = s"${t.getMessage} ${ExceptionUtils.getStackTrace(t)}"
dataAccess.entityCacheQuery.updateCacheLastUpdated(workspaceId, MIN_CACHE_TIME, Some(errMsg))
}
dataSource.inTransaction(
{ dataAccess =>
logger.error(s"Workspace $workspaceId will be ignored by the entity cache monitor: ${t.getMessage}")
// We will set the cacheLastUpdated timestamp to the lowest possible value in MySQL.
// This is a "magic" value that allows the monitor to skip this problematic workspace
// so it does not get caught in a loop. These workspaces will require manual intervention.
val errMsg = s"${t.getMessage} ${ExceptionUtils.getStackTrace(t)}"
dataAccess.entityCacheQuery.updateCacheLastUpdated(workspaceId, MIN_CACHE_TIME, Some(errMsg))
},
isolationLevel
)
}
}

Expand Down

0 comments on commit d13c1ba

Please sign in to comment.