Skip to content

Commit

Permalink
Make better use of available cpu on larger VMs (#57)
Browse files Browse the repository at this point in the history
These changes allow the loader to better utilize all cpu available on a
larger instance.

**1. CPU-intensive parsing/transforming is now parallelized**.
Parallelism is configured by a new config parameter
`cpuParallelismFraction`. The actual parallelism is chosen dynamically
based on the number of available CPU, so the default value should be
appropriate for all sized VMs.

**2. We now open a new Snowflake ingest client per channel**. Note the
Snowflake SDK recommends to re-use a single Client per VM and open
multiple Channels on the same Client.  So here we are going against the
recommendations.  But, we justify it because it gives the loader better
visiblity of when the client's Future completes, signifying a complete
write to Snowflake.

**3. Upload parallelism chosen dynamically**. Larger VMs benefit from
higher upload parallelism, in order to keep up with the faster rate of
batches produced by the cpu-intensive tasks. Parallelsim is configured
by a new parameter `uploadParallelismFactor`, which gets multiplied by
the number of available CPU. The default value should be appropriate for
all sized VMs.

These new settings have been tested on pods ranging from 0.6 to 8
available CPU.
  • Loading branch information
istreeter committed Nov 26, 2024
1 parent f2b2672 commit cc06bd6
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 110 deletions.
15 changes: 12 additions & 3 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
# -- name to use for the events table.
"table": "events"

# -- name to use for the snowflake channel.
# -- Prefix to use for the snowflake channels.
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
# -- The prefix must be unique per loader VM
"channel": "snowplow"

# -- Timeouts used for JDBC operations
Expand Down Expand Up @@ -75,10 +77,17 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls how many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFactor": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
15 changes: 12 additions & 3 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
# -- name to use for the events table.
"table": "events"

# -- name to use for the snowflake channel.
# -- Prefix to use for the snowflake channels.
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
# -- The prefix must be unique per loader VM
"channel": "snowplow"

# -- Timeouts used for JDBC operations
Expand Down Expand Up @@ -92,10 +94,17 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls how many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFactor": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
15 changes: 12 additions & 3 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
# -- name to use for the events table.
"table": "events"

# -- name to use for the snowflake channel.
# -- Prefix to use for the snowflake channels.
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
# -- The prefix must be unique per loader VM
"channel": "snowplow"

# -- Timeouts used for JDBC operations
Expand Down Expand Up @@ -81,10 +83,17 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls how many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFactor": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
3 changes: 2 additions & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
"batching": {
"maxBytes": 16000000
"maxDelay": "1 second"
"uploadConcurrency": 3
"uploadParallelismFactor": 2.5
}
"cpuParallelismFactor": 0.75

"retries": {
"setupErrors": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ case class Config[+Source, +Sink](
input: Source,
output: Config.Output[Sink],
batching: Config.Batching,
cpuParallelismFactor: BigDecimal,
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
Expand Down Expand Up @@ -69,7 +70,7 @@ object Config {
case class Batching(
maxBytes: Long,
maxDelay: FiniteDuration,
uploadConcurrency: Int
uploadParallelismFactor: BigDecimal
)

case class Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@ import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManage
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.client.Client

/**
* Resources and runtime-derived configuration needed for processing events
*
* @param cpuParallelism
* The processing Pipe involves several steps, some of which are cpu-intensive. We run
* cpu-intensive steps in parallel, so that on big instances we can take advantage of all cores.
* For each of those cpu-intensive steps, `cpuParallelism` controls the parallelism of that step.
*
* Other params are self-explanatory
*/
case class Environment[F[_]](
appInfo: AppInfo,
source: SourceAndAck[F],
badSink: Sink[F],
httpClient: Client[F],
tableManager: TableManager[F],
channel: Channel.Provider[F],
channels: Vector[Channel.Provider[F]],
metrics: Metrics[F],
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
batching: Config.Batching,
cpuParallelism: Int,
schemasToSkip: List[SchemaCriterion],
badRowMaxSize: Int
)
Expand All @@ -52,19 +63,47 @@ object Environment {
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
cpuParallelism = chooseCpuParallelism(config)
uploadParallelism = chooseUploadParallelism(config)
channelProviders <- Vector.range(0, uploadParallelism).traverse { index =>
for {
channelOpener <- Channel.opener(config.output.good, config.retries, appHealth, index)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
} yield channelProvider
}
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channel = channelProvider,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channels = channelProviders,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
cpuParallelism = cpuParallelism,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
)

/**
* See the description of `cpuParallelism` on the [[Environment]] class
*
* For bigger instances (more cores) we want more parallelism, so that cpu-intensive steps can
* take advantage of all the cores.
*/
private def chooseCpuParallelism(config: Config[Any, Any]): Int =
multiplyByCpuAndRoundUp(config.cpuParallelismFactor)

/**
* For bigger instances (more cores) we produce batches more quickly, and so need higher upload
* parallelism so that uploading does not become bottleneck
*/
private def chooseUploadParallelism(config: Config[Any, Any]): Int =
multiplyByCpuAndRoundUp(config.batching.uploadParallelismFactor)

private def multiplyByCpuAndRoundUp(factor: BigDecimal): Int =
(Runtime.getRuntime.availableProcessors * factor)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.{ExitCode, IO, Resource}
import cats.effect.metrics.CpuStarvationWarningMetrics
import io.circe.Decoder
import com.monovore.decline.effect.CommandIOApp
import com.monovore.decline.Opts
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.DurationInt

Expand All @@ -28,6 +31,11 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder](
override def runtimeConfig =
super.runtimeConfig.copy(cpuStarvationCheckInterval = 10.seconds)

private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

override def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
Logger[IO].debug(s"Cats Effect measured responsiveness in excess of ${metrics.starvationInterval * metrics.starvationThreshold}")

type SinkProvider = SinkConfig => Resource[IO, Sink[IO]]
type SourceProvider = SourceConfig => IO[SourceAndAck[IO]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ object Channel {

def opener[F[_]: Async](
config: Config.Snowflake,
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
index: Int
): Resource[F, Opener[F]] =
for {
client <- createClient(config, batchingConfig, retriesConfig, appHealth)
client <- createClient(config, retriesConfig, appHealth)
} yield new Opener[F] {
def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F])
def open: F[CloseableChannel[F]] = createChannel[F](config, client, index).map(impl[F])
}

def provider[F[_]: Async](
Expand Down Expand Up @@ -177,23 +177,25 @@ object Channel {

private def createChannel[F[_]: Async](
config: Config.Snowflake,
client: SnowflakeStreamingIngestClient
client: SnowflakeStreamingIngestClient,
index: Int
): F[SnowflakeStreamingIngestChannel] = {
val channelName = s"${config.channel}-$index"
val request = OpenChannelRequest
.builder(config.channel)
.builder(channelName)
.setDBName(config.database)
.setSchemaName(config.schema)
.setTableName(config.table)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setDefaultTimezone(ZoneOffset.UTC)
.build

Logger[F].info(s"Opening channel ${config.channel}") *>
Logger[F].info(s"Opening channel ${channelName}") *>
Async[F].blocking(client.openChannel(request)) <*
Logger[F].info(s"Successfully opened channel ${config.channel}")
Logger[F].info(s"Successfully opened channel ${channelName}")
}

private def channelProperties(config: Config.Snowflake, batchingConfig: Config.Batching): Properties = {
private def channelProperties(config: Config.Snowflake): Properties = {
val props = new Properties()
props.setProperty("user", config.user)
props.setProperty("private_key", config.privateKey)
Expand All @@ -211,14 +213,13 @@ object Channel {
props.setProperty(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, "0")
props.setProperty(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, "0")
props.setProperty(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, Long.MaxValue.toString)
props.setProperty(ParameterProvider.IO_TIME_CPU_RATIO, batchingConfig.uploadConcurrency.toString)
props.setProperty(ParameterProvider.IO_TIME_CPU_RATIO, "0")

props
}

private def createClient[F[_]: Async](
config: Config.Snowflake,
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
): Resource[F, SnowflakeStreamingIngestClient] = {
Expand All @@ -228,7 +229,7 @@ object Channel {
Sync[F].blocking {
SnowflakeStreamingIngestClientFactory
.builder("Snowplow_Streaming")
.setProperties(channelProperties(config, batchingConfig))
.setProperties(channelProperties(config))
// .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties
.build
} <*
Expand Down
Loading

0 comments on commit cc06bd6

Please sign in to comment.