From 36ebeb1d3194896b9cd029a50f2557471333f7ca Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 19 Dec 2024 11:00:37 -0700 Subject: [PATCH] loadbalancer-experimental: add CatchAllLoadBalancerObserver (#3147) Motivation: We generally promise that if observers throw it won't mess up the greater state of the system. However, we don't currently do that in DefaultLoadBalancer. Modifications: - Add the CatchAllLoadBalancerObserver to catch exceptions thrown by the underlying underlying observers. - Install it when using anything other than the NoopLoadBalancerObserver. --- .../CatchAllLoadBalancerObserver.java | 168 ++++++++++++++++++ .../loadbalancer/DefaultLoadBalancer.java | 5 +- .../NoopLoadBalancerObserver.java | 4 +- 3 files changed, 173 insertions(+), 4 deletions(-) create mode 100644 servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/CatchAllLoadBalancerObserver.java diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/CatchAllLoadBalancerObserver.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/CatchAllLoadBalancerObserver.java new file mode 100644 index 0000000000..2800ba3929 --- /dev/null +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/CatchAllLoadBalancerObserver.java @@ -0,0 +1,168 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.NoActiveHostException; +import io.servicetalk.client.api.NoAvailableHostException; +import io.servicetalk.client.api.ServiceDiscovererEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +final class CatchAllLoadBalancerObserver implements LoadBalancerObserver { + + private static final Logger LOGGER = LoggerFactory.getLogger(CatchAllLoadBalancerObserver.class); + + private final LoadBalancerObserver delegate; + + private CatchAllLoadBalancerObserver(LoadBalancerObserver delegate) { + this.delegate = delegate; + } + + @Override + public HostObserver hostObserver(Object resolvedAddress) { + try { + return new CatchAllHostObserver(delegate.hostObserver(resolvedAddress)); + } catch (Throwable ex) { + LOGGER.warn("Unexpected exception from {} while getting a HostObserver", delegate, ex); + return NoopLoadBalancerObserver.NoopHostObserver.INSTANCE; + } + } + + @Override + public void onServiceDiscoveryEvent(Collection> events) { + try { + delegate.onServiceDiscoveryEvent(events); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onServiceDiscoveryEvent event", + delegate, unexpected); + } + } + + @Override + public void onHostsUpdate(Collection oldHosts, Collection newHosts) { + try { + delegate.onHostsUpdate(oldHosts, newHosts); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onHostsUpdate event", delegate, unexpected); + } + } + + @Override + public void onNoAvailableHostException(NoAvailableHostException exception) { + try { + delegate.onNoAvailableHostException(exception); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onNoAvailableHostException event", + delegate, unexpected); + } + } + + @Override + public void onNoActiveHostException(Collection hosts, NoActiveHostException exception) { + try { + delegate.onNoActiveHostException(hosts, exception); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onNoActiveHostException event", + delegate, unexpected); + } + } + + private static final class CatchAllHostObserver implements HostObserver { + + private final HostObserver delegate; + + CatchAllHostObserver(HostObserver delegate) { + this.delegate = requireNonNull(delegate, "delegate"); + } + + @Override + public void onHostMarkedExpired(int connectionCount) { + try { + delegate.onHostMarkedExpired(connectionCount); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onHostMarkedExpired event", + delegate, unexpected); + } + } + + @Override + public void onActiveHostRemoved(int connectionCount) { + try { + delegate.onActiveHostRemoved(connectionCount); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onActiveHostRemoved event", + delegate, unexpected); + } + } + + @Override + public void onExpiredHostRevived(int connectionCount) { + try { + delegate.onExpiredHostRevived(connectionCount); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onExpiredHostRevived event", + delegate, unexpected); + } + } + + @Override + public void onExpiredHostRemoved(int connectionCount) { + try { + delegate.onExpiredHostRemoved(connectionCount); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onExpiredHostRemoved event", + delegate, unexpected); + } + } + + @Override + public void onHostMarkedUnhealthy(@Nullable Throwable cause) { + try { + delegate.onHostMarkedUnhealthy(cause); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onHostMarkedUnhealthy event", + delegate, unexpected); + } + } + + @Override + public void onHostRevived() { + try { + delegate.onHostRevived(); + } catch (Throwable unexpected) { + LOGGER.warn("Unexpected exception from {} while reporting an onHostRevived event", + delegate, unexpected); + } + } + } + + static LoadBalancerObserver wrap(LoadBalancerObserver observer) { + requireNonNull(observer, "observer"); + if (observer instanceof CatchAllLoadBalancerObserver) { + return observer; + } + if (observer instanceof NoopLoadBalancerObserver) { + return observer; + } + return new CatchAllLoadBalancerObserver(observer); + } +} diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index 57cd5f76eb..d8d40da251 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -162,8 +162,9 @@ final class DefaultLoadBalancer LOGGER.error("{}: Uncaught exception in {}", this, this.getClass().getSimpleName(), uncaughtException)); diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java index ddbc2770e3..048b3415b3 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java @@ -56,9 +56,9 @@ public void onNoActiveHostException(Collection hosts, NoActiveHo // noop } - private static final class NoopHostObserver implements LoadBalancerObserver.HostObserver { + static final class NoopHostObserver implements LoadBalancerObserver.HostObserver { - private static final HostObserver INSTANCE = new NoopHostObserver(); + static final HostObserver INSTANCE = new NoopHostObserver(); private NoopHostObserver() { }