Skip to content

Commit

Permalink
Merge pull request #241 from emeraldpay/fix/grpc-status-disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
splix authored May 10, 2023
2 parents 61aea11 + efa4809 commit dca1a2e
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,12 @@ open class ConfiguredUpstreams(
this.options = options
}
log.info("Using ALL CHAINS (gRPC) upstream, at ${endpoint.host}:${endpoint.port}")
ds.start()
ds.subscribeUpstreamChanges()
.doOnNext {
log.info("Chain ${it.chain} ${it.type} through gRPC at ${endpoint.host}:${endpoint.port}. With caps: ${it.upstream.getCapabilities()}")
}
.subscribe(currentUpstreams::update)
ds.startStatusUpdates()
}

private fun buildHttpClient(config: UpstreamsConfig.Upstream<out UpstreamsConfig.RpcConnection>): JsonRpcHttpClient? {
Expand Down
78 changes: 43 additions & 35 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ package io.emeraldpay.dshackle.upstream.grpc
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.api.proto.ReactorBlockchainGrpc
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.SilentException
import io.emeraldpay.dshackle.commons.DurableFlux
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.AbstractHead
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.etherjar.rpc.RpcException
import io.emeraldpay.grpc.BlockchainType
import io.emeraldpay.grpc.Chain
import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
import org.springframework.context.Lifecycle
import org.springframework.util.backoff.ExponentialBackOff
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.retry.Retry
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Function

class GrpcHead(
Expand All @@ -54,6 +56,7 @@ class GrpcHead(
}

private var headSubscription: Disposable? = null
private val shouldBeRunning = AtomicBoolean(false)

/**
* Initiate a new head subscription with connection to the remote
Expand All @@ -64,51 +67,54 @@ class GrpcHead(
}
log.debug("Start Head subscription to ${parent.getId()}")

val source = Flux.concat(
// first connect immediately
Flux.just(remote),
// following requests do with delay, give it a time to recover
Flux.just(remote).repeat().delayElements(Defaults.retryConnection)
).flatMap(this::subscribeHead)

internalStart(source)
val blocks = DurableFlux(
{ connect(remote) },
ExponentialBackOff(100, 1.5),
log,
shouldBeRunning,
)
headSubscription = super.follow(blocks.connect())
}

fun subscribeHead(client: ReactorBlockchainGrpc.ReactorBlockchainStub): Publisher<BlockchainOuterClass.ChainHead> {
private fun connect(remote: ReactorBlockchainGrpc.ReactorBlockchainStub): Flux<BlockContainer> {
val chainRef = Common.Chain.newBuilder()
.setTypeValue(chain.id)
.build()
return client.subscribeHead(chainRef)
// simple retry on failure, if eventually failed then it supposed to resubscribe later from outer method
.retryWhen(Retry.backoff(4, Duration.ofSeconds(1)))
.onErrorContinue { err, _ ->
log.warn("Disconnected $chain from ${parent.getId()}: ${err.message}")
return remote.subscribeHead(chainRef)
// if nothing returned for a relatively long period it's probably because of a broken connection, so in this case we force to drop the connection
.timeout(
expectEventsTime(),
Mono.fromCallable { log.info("No events received from ${parent.getId()}. Reconnecting...") }
.then(Mono.error(SilentException.Timeout("No Events")))
)
.doOnError { err ->
if (err !is SilentException) {
log.warn("Disconnected $chain from ${parent.getId()}: ${err.message}")
}
parent.setStatus(UpstreamAvailability.UNAVAILABLE)
Mono.empty<BlockchainOuterClass.ChainHead>()
}
.map(converter)
.distinctUntilChanged(BlockContainer::hash)
.transform(enhanced())
}

/**
* Initiate a new head from provided source of head details
*/
private fun internalStart(source: Flux<BlockchainOuterClass.ChainHead>) {
var blocks = source.map(converter)
.distinctUntilChanged {
it.hash
private fun expectEventsTime(): Duration {
return try {
when (BlockchainType.from(chain)) {
BlockchainType.BITCOIN -> Duration.ofHours(1)
BlockchainType.ETHEREUM -> Duration.ofMinutes(5)
}
if (enhancer != null) {
blocks = blocks.flatMap(enhancer)
} catch (e: IllegalArgumentException) {
Duration.ofMinutes(15)
}
}

blocks = blocks.onErrorContinue { err, _ ->
if (err is RpcException) {
log.error("Head subscription error on ${parent.getId()}. ${err.javaClass.name}:${err.message}")
} else {
log.error("Head subscription error on ${parent.getId()}. ${err.javaClass.name}:${err.message}", err)
}
private fun enhanced(): Function<Flux<BlockContainer>, Flux<BlockContainer>> {
return if (enhancer != null) {
Function { blocks -> blocks.flatMap(enhancer) }
} else {
Function.identity()
}

headSubscription = super.follow(blocks)
}

override fun isRunning(): Boolean {
Expand All @@ -117,10 +123,12 @@ class GrpcHead(

override fun start() {
headSubscription?.dispose()
shouldBeRunning.set(true)
this.internalStart(remote)
}

override fun stop() {
shouldBeRunning.set(false)
headSubscription?.dispose()
}
}
174 changes: 101 additions & 73 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.upstream.grpc
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.ReactorBlockchainGrpc
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.commons.DurableFlux
import io.emeraldpay.dshackle.config.AuthConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.monitoring.Channel
Expand Down Expand Up @@ -46,12 +47,15 @@ import io.netty.handler.ssl.SslContextBuilder
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.slf4j.LoggerFactory
import org.springframework.util.backoff.ExponentialBackOff
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import java.net.ConnectException
import java.time.Duration
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.function.Supplier
import kotlin.concurrent.withLock

class GrpcUpstreams(
Expand All @@ -66,40 +70,79 @@ class GrpcUpstreams(

var options = UpstreamsConfig.PartialOptions.getDefaults().build()

private var client: ReactorBlockchainGrpc.ReactorBlockchainStub? = null
private var clientValue: ReactorBlockchainGrpc.ReactorBlockchainStub? = null
private val known = HashMap<Chain, DefaultUpstream>()
private val lock = ReentrantLock()

fun start(): Flux<UpstreamChange> {
val channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) {
NettyChannelBuilder.forAddress(conn.host, conn.port)
// some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces)
.maxInboundMessageSize(Int.MAX_VALUE)
.useTransportSecurity()
.enableRetry()
.maxRetryAttempts(3)
.sslContext(withTls(conn.auth!!))
} else {
ManagedChannelBuilder.forAddress(conn.host, conn.port)
.let {
if (conn.autoTls == true) {
it.useTransportSecurity()
} else {
log.warn("Using insecure connection to ${conn.host}:${conn.port}")
it.usePlaintext()
private val client: ReactorBlockchainGrpc.ReactorBlockchainStub
get() {
if (clientValue != null) {
return clientValue!!
}
val channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) {
NettyChannelBuilder.forAddress(conn.host, conn.port)
// some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces)
.maxInboundMessageSize(Int.MAX_VALUE)
.useTransportSecurity()
.enableRetry()
.maxRetryAttempts(3)
.sslContext(withTls(conn.auth!!))
} else {
ManagedChannelBuilder.forAddress(conn.host, conn.port)
.let {
if (conn.autoTls == true) {
it.useTransportSecurity()
} else {
log.warn("Using insecure connection to ${conn.host}:${conn.port}")
it.usePlaintext()
}
}
}
}

this.clientValue = ReactorBlockchainGrpc.newReactorStub(channel.build())
return this.clientValue!!
}

val client = ReactorBlockchainGrpc.newReactorStub(channel.build())
this.client = client
fun subscribeUpstreamChanges(): Flux<UpstreamChange> {
val connect = {
Flux.interval(Duration.ZERO, Duration.ofMinutes(1))
.flatMap { client.describe(BlockchainOuterClass.DescribeRequest.newBuilder().build()) }
.transform(catchIOError())
.flatMap(::processDescription)
.doOnError { t -> log.error("Failed to process update from gRPC upstream $id", t) }
}

val statusSubscription = AtomicReference<Disposable>()
return DurableFlux(
connect,
ExponentialBackOff(100L, 1.5),
log,
AtomicBoolean(true)
).connect()
}

val updates = Flux.interval(Duration.ZERO, Duration.ofMinutes(1))
.flatMap {
client.describe(BlockchainOuterClass.DescribeRequest.newBuilder().build())
}.onErrorContinue { t, _ ->
fun startStatusUpdates(): Disposable {
val connection = DurableFlux(
{
client
.subscribeStatus(BlockchainOuterClass.StatusRequest.newBuilder().build())
.transform(catchIOError())
},
ExponentialBackOff(100L, 1.5),
log,
AtomicBoolean(true)
)
return connection
.connect()
.subscribeOn(Schedulers.boundedElastic())
.subscribe { value ->
val chain = Chain.byId(value.chain.number)
known[chain]?.onStatus(value)
}
}

fun <T> catchIOError(): java.util.function.Function<Flux<T>, Flux<T>> {
return java.util.function.Function<Flux<T>, Flux<T>> { source ->
source.onErrorContinue { t, _ ->
if (ExceptionUtils.indexOfType(t, ConnectException::class.java) >= 0) {
log.warn("gRPC upstream ${conn.host}:${conn.port} is unavailable. (${t.javaClass}: ${t.message})")
known.values.forEach {
Expand All @@ -108,25 +151,8 @@ class GrpcUpstreams(
} else {
log.error("Failed to get description from ${conn.host}:${conn.port}", t)
}
}.flatMap { value ->
processDescription(value)
}.doOnNext {
val subscription = client.subscribeStatus(BlockchainOuterClass.StatusRequest.newBuilder().build())
.subscribe { value ->
val chain = Chain.byId(value.chain.number)
if (chain != Chain.UNSPECIFIED) {
known[chain]?.onStatus(value)
}
}
statusSubscription.updateAndGet { prev ->
prev?.dispose()
subscription
}
}.doOnError { t ->
log.error("Failed to process update from gRPC upstream $id", t)
}

return updates
}
}

fun processDescription(value: BlockchainOuterClass.DescribeResponse): Flux<UpstreamChange> {
Expand Down Expand Up @@ -180,29 +206,31 @@ class GrpcUpstreams(
}

fun getOrCreate(chain: Chain): UpstreamChange {
val metricsTags = listOf(
Tag.of("upstream", id),
Tag.of("chain", chain.chainCode)
)
val metrics = Supplier {
val metricsTags = listOf(
Tag.of("upstream", id),
Tag.of("chain", chain.chainCode)
)

val metrics = RpcMetrics(
metricsTags,
timer = Timer.builder("upstream.grpc.conn")
.description("Request time through a Dshackle/gRPC connection")
.tags(metricsTags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry),
fails = Counter.builder("upstream.grpc.fail")
.description("Number of failures of Dshackle/gRPC requests")
.tags(metricsTags)
.register(Metrics.globalRegistry),
responseSize = DistributionSummary.builder("upstream.grpc.response.size")
.description("Size of Dshackle/gRPC responses")
.baseUnit("Bytes")
.tags(metricsTags)
.register(Metrics.globalRegistry),
connectionMetrics = ConnectionMetrics(metricsTags)
)
RpcMetrics(
metricsTags,
timer = Timer.builder("upstream.grpc.conn")
.description("Request time through a Dshackle/gRPC connection")
.tags(metricsTags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry),
fails = Counter.builder("upstream.grpc.fail")
.description("Number of failures of Dshackle/gRPC requests")
.tags(metricsTags)
.register(Metrics.globalRegistry),
responseSize = DistributionSummary.builder("upstream.grpc.response.size")
.description("Size of Dshackle/gRPC responses")
.baseUnit("Bytes")
.tags(metricsTags)
.register(Metrics.globalRegistry),
connectionMetrics = ConnectionMetrics(metricsTags)
)
}

val blockchainType = BlockchainType.from(chain)
if (blockchainType == BlockchainType.ETHEREUM) {
Expand All @@ -214,14 +242,14 @@ class GrpcUpstreams(
}
}

fun getOrCreateEthereum(chain: Chain, metrics: RpcMetrics): UpstreamChange {
fun getOrCreateEthereum(chain: Chain, metrics: Supplier<RpcMetrics>): UpstreamChange {
lock.withLock {
val current = known[chain]
return if (current == null) {
val rpcClient = JsonRpcGrpcClient(client!!, chain, metrics) {
val rpcClient = JsonRpcGrpcClient(client, chain, metrics.get()) {
currentRequestLogWriter.wrap(it, id, Channel.DSHACKLE)
}
val created = EthereumGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client!!, rpcClient)
val created = EthereumGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client, rpcClient)
created.timeout = this.options.timeout
known[chain] = created
created.start()
Expand All @@ -232,14 +260,14 @@ class GrpcUpstreams(
}
}

fun getOrCreateBitcoin(chain: Chain, metrics: RpcMetrics): UpstreamChange {
fun getOrCreateBitcoin(chain: Chain, metrics: Supplier<RpcMetrics>): UpstreamChange {
lock.withLock {
val current = known[chain]
return if (current == null) {
val rpcClient = JsonRpcGrpcClient(client!!, chain, metrics) {
val rpcClient = JsonRpcGrpcClient(client, chain, metrics.get()) {
currentRequestLogWriter.wrap(it, id, Channel.DSHACKLE)
}
val created = BitcoinGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client!!, rpcClient)
val created = BitcoinGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client, rpcClient)
created.timeout = this.options.timeout
known[chain] = created
created.start()
Expand Down

0 comments on commit dca1a2e

Please sign in to comment.