diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b3bd420..04608657 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### 1.17.0 +* feat: introduce a `ReconnectionListener` which the `ReconnectingClient` will call when trying to reconnect. This is + called more often than registered `ConnectionChangeListener`s, and is mostly useful for logging purposes. + ### 1.16.1 * Add non-authenticated errors for all ASCII operations diff --git a/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java b/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java index 14433469..6f74ef4f 100644 --- a/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java +++ b/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2015 Spotify AB + * Copyright (c) 2014-2023 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -41,7 +41,9 @@ import com.spotify.folsom.ketama.AddressAndClient; import com.spotify.folsom.ketama.KetamaMemcacheClient; import com.spotify.folsom.ketama.ResolvingKetamaClient; +import com.spotify.folsom.reconnect.CatchingReconnectionListener; import com.spotify.folsom.reconnect.ReconnectingClient; +import com.spotify.folsom.reconnect.ReconnectionListener; import com.spotify.folsom.retry.RetryingClient; import com.spotify.folsom.roundrobin.RoundRobinMemcacheClient; import com.spotify.folsom.transcoder.ByteArrayTranscoder; @@ -98,6 +100,8 @@ public class MemcacheClientBuilder { private final Transcoder valueTranscoder; private Metrics metrics = NoopMetrics.INSTANCE; private Tracer tracer = NoopTracer.INSTANCE; + private ReconnectionListener reconnectionListener = + new ReconnectingClient.StandardReconnectionListener(); private BackoffFunction backoffFunction = new ExponentialBackoff(10L, 60 * 1000L, 2.5); @@ -307,6 +311,25 @@ public MemcacheClientBuilder withTracer(final Tracer tracer) { return this; } + /** + * Specify how to handle reconnections. This does not retain the {@linkplain + * com.spotify.folsom.reconnect.ReconnectingClient.StandardReconnectionListener standard + * implementation} whatsoever, meaning you would need to either delegate to it, or reimplement its + * behaviour. + * + *

This method will implicitly wrap your listener in a {@link CatchingReconnectionListener}. + * This is to ensure they cannot break the client returned if they throw. + * + * @param reconnectionListener the listener for reconnections + * @return itself + */ + public MemcacheClientBuilder withReconnectionListener( + final ReconnectionListener reconnectionListener) { + requireNonNull(reconnectionListener, "reconnectionListener cannot be null"); + this.reconnectionListener = new CatchingReconnectionListener(reconnectionListener); + return this; + } + /** * Specify the maximum number of requests in the queue per server connection. If this is set too * low, requests will fail with {@link com.spotify.folsom.MemcacheOverloadedException}. If this is @@ -698,6 +721,7 @@ private RawMemcacheClient createReconnectingClient( backoffFunction, ReconnectingClient.singletonExecutor(), address, + reconnectionListener, maxOutstandingRequests, eventLoopThreadFlushMaxBatchSize, binary, diff --git a/folsom/src/main/java/com/spotify/folsom/reconnect/AbstractReconnectionListener.java b/folsom/src/main/java/com/spotify/folsom/reconnect/AbstractReconnectionListener.java new file mode 100644 index 00000000..828d87b4 --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/reconnect/AbstractReconnectionListener.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.folsom.reconnect; + +import com.spotify.folsom.guava.HostAndPort; +import javax.annotation.Nullable; + +/** + * A listener for each reconnection event. This will be called more often than the {@link + * com.spotify.folsom.ConnectionChangeListener ConnectionChangeListener}s. For more information on + * this, see {@link ReconnectionListener}. + * + *

This class acts as a stable interface for {@link ReconnectionListener}. While {@link + * ReconnectionListener} can have source-incompatible changes in minor versions, this cannot do + * that. + * + * @see ReconnectionListener + */ +public abstract class AbstractReconnectionListener implements ReconnectionListener { + /** {@inheritDoc} */ + @Override + public void connectionFailure(final Throwable cause) { + // No-op. + } + + /** {@inheritDoc} */ + @Override + public void connectionLost(final @Nullable Throwable cause, final HostAndPort address) { + // No-op. + } + + /** {@inheritDoc} */ + @Override + public void reconnectionSuccessful( + final HostAndPort address, final int attempt, final boolean willStayConnected) { + // No-op. + } + + /** {@inheritDoc} */ + @Override + public void reconnectionCancelled() { + // No-op. + } + + /** {@inheritDoc} */ + @Override + public void reconnectionQueuedFromError( + final Throwable cause, + final HostAndPort address, + final long backOffMillis, + final int attempt) { + // No-op. + } +} diff --git a/folsom/src/main/java/com/spotify/folsom/reconnect/CatchingReconnectionListener.java b/folsom/src/main/java/com/spotify/folsom/reconnect/CatchingReconnectionListener.java new file mode 100644 index 00000000..409916e8 --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/reconnect/CatchingReconnectionListener.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.folsom.reconnect; + +import com.spotify.folsom.guava.HostAndPort; +import java.util.Objects; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link ReconnectionListener} which ensures the methods never throw {@link Exception} (though + * {@link Error} and {@link Throwable} are not caught). + * + *

This should wrap any untrusted {@link ReconnectionListener}. + * + *

This class should be regarded as internal API. We recognise it may be useful outside + * internals, to which end it is exposed. The constructor is stable. All methods are + * unstable and can change with breakage in patch versions. If you want a stable interface, + * wrap your own reconnection listener in this, instead of extending it (and instead extend {@link + * AbstractReconnectionListener}). + */ +public class CatchingReconnectionListener implements ReconnectionListener { + + private static final Logger log = LoggerFactory.getLogger(CatchingReconnectionListener.class); + + private final ReconnectionListener delegate; + + public CatchingReconnectionListener(final ReconnectionListener delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null"); + } + + /** {@inheritDoc} */ + @Override + public void connectionFailure(final Throwable cause) { + try { + delegate.connectionFailure(cause); + } catch (final Exception ex) { + log.warn("Delegate ReconnectionListener threw on #connectionFailure: {}", this.delegate, ex); + } + } + + /** {@inheritDoc} */ + @Override + public void reconnectionCancelled() { + try { + delegate.reconnectionCancelled(); + } catch (final Exception ex) { + log.warn( + "Delegate ReconnectionListener threw on #reconnectionCancelled: {}", this.delegate, ex); + } + } + + /** {@inheritDoc} */ + @Override + public void reconnectionSuccessful( + final HostAndPort address, final int attempt, final boolean willStayConnected) { + try { + delegate.reconnectionSuccessful(address, attempt, willStayConnected); + } catch (final Exception ex) { + log.warn( + "Delegate ReconnectionListener threw on #reconnectionSuccessful: {}", this.delegate, ex); + } + } + + /** {@inheritDoc} */ + @Override + public void connectionLost(final @Nullable Throwable cause, final HostAndPort address) { + try { + delegate.connectionLost(cause, address); + } catch (final Exception ex) { + log.warn("Delegate ReconnectionListener threw on #connectionLost: {}", this.delegate, ex); + } + } + + /** {@inheritDoc} */ + @Override + public void reconnectionQueuedFromError( + final Throwable cause, + final HostAndPort address, + final long backOffMillis, + final int attempt) { + try { + delegate.reconnectionQueuedFromError(cause, address, backOffMillis, attempt); + } catch (final Exception ex) { + log.warn( + "Delegate ReconnectionListener threw on #reconnectionQueuedFromError: {}", + this.delegate, + ex); + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CatchingReconnectionListener)) { + return false; + } + final CatchingReconnectionListener that = (CatchingReconnectionListener) o; + return this.delegate.equals(that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(this.delegate); + } + + @Override + public String toString() { + return "CatchingReconnectionListener{delegate=" + this.delegate + '}'; + } +} diff --git a/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectingClient.java b/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectingClient.java index 9b822320..10d74749 100644 --- a/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectingClient.java +++ b/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectingClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2015 Spotify AB + * Copyright (c) 2014-2023 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +49,13 @@ public class ReconnectingClient extends AbstractRawMemcacheClient { 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("folsom-reconnecter").build()); - private final Logger log = LoggerFactory.getLogger(ReconnectingClient.class); + private static final Logger log = LoggerFactory.getLogger(ReconnectingClient.class); private final BackoffFunction backoffFunction; private final ScheduledExecutorService scheduledExecutorService; private final com.spotify.folsom.reconnect.Connector connector; private final HostAndPort address; + private final ReconnectionListener reconnectionListener; private volatile RawMemcacheClient client = NotConnectedClient.INSTANCE; private volatile int reconnectCount = 0; @@ -64,6 +66,7 @@ public ReconnectingClient( final BackoffFunction backoffFunction, final ScheduledExecutorService scheduledExecutorService, final HostAndPort address, + final ReconnectionListener reconnectionListener, final int outstandingRequestLimit, final int eventLoopThreadFlushMaxBatchSize, final boolean binary, @@ -92,7 +95,44 @@ public ReconnectingClient( eventLoopGroup, channelClass), authenticator, - address); + address, + reconnectionListener); + } + + public ReconnectingClient( + final BackoffFunction backoffFunction, + final ScheduledExecutorService scheduledExecutorService, + final HostAndPort address, + final int outstandingRequestLimit, + final int eventLoopThreadFlushMaxBatchSize, + final boolean binary, + final Authenticator authenticator, + final Executor executor, + final long connectionTimeoutMillis, + final Charset charset, + final Metrics metrics, + final int maxSetLength, + final EventLoopGroup eventLoopGroup, + final Class channelClass) { + this( + backoffFunction, + scheduledExecutorService, + () -> + DefaultRawMemcacheClient.connect( + address, + outstandingRequestLimit, + eventLoopThreadFlushMaxBatchSize, + binary, + executor, + connectionTimeoutMillis, + charset, + metrics, + maxSetLength, + eventLoopGroup, + channelClass), + authenticator, + address, + new StandardReconnectionListener()); } private ReconnectingClient( @@ -100,23 +140,27 @@ private ReconnectingClient( final ScheduledExecutorService scheduledExecutorService, final Connector connector, final Authenticator authenticator, - final HostAndPort address) { + final HostAndPort address, + final ReconnectionListener reconnectionListener) { this( backoffFunction, scheduledExecutorService, () -> AuthenticatingClient.authenticate(connector, authenticator), - address); + address, + reconnectionListener); } ReconnectingClient( final BackoffFunction backoffFunction, final ScheduledExecutorService scheduledExecutorService, final com.spotify.folsom.reconnect.Connector connector, - final HostAndPort address) { + final HostAndPort address, + final ReconnectionListener reconnectionListener) { super(); this.backoffFunction = backoffFunction; this.scheduledExecutorService = scheduledExecutorService; this.connector = connector; + this.reconnectionListener = reconnectionListener; this.address = address; retry(); @@ -170,21 +214,24 @@ private void retry() { future.whenComplete( (newClient, t) -> { if (t != null) { + this.reconnectionListener.connectionFailure(t); + if (t instanceof CompletionException && t.getCause() instanceof MemcacheAuthenticationException) { connectionFailure = t.getCause(); shutdown(); return; } - ReconnectingClient.this.onFailure(); + ReconnectingClient.this.onFailure(t); } else { - log.info("Successfully connected to {}", address); + reconnectionListener.reconnectionSuccessful(address, reconnectCount, stayConnected); reconnectCount = 0; client.shutdown(); client = newClient; // Protection against races with shutdown() if (!stayConnected) { + reconnectionListener.reconnectionCancelled(); newClient.shutdown(); notifyConnectionChange(); return; @@ -193,9 +240,11 @@ private void retry() { notifyConnectionChange(); newClient .disconnectFuture() + .whenComplete( + (unused, throwable) -> + reconnectionListener.connectionLost(throwable, address)) .thenRun( () -> { - log.info("Lost connection to {}", address); notifyConnectionChange(); if (stayConnected) { retry(); @@ -204,20 +253,20 @@ private void retry() { } }); } catch (final Exception e) { - ReconnectingClient.this.onFailure(); + ReconnectingClient.this.onFailure(e); } } - private void onFailure() { + private void onFailure(final Throwable cause) { if (!stayConnected) { + this.reconnectionListener.reconnectionCancelled(); return; } final long backOff = backoffFunction.getBackoffTimeMillis(reconnectCount); - log.warn( - "Attempting reconnect to {} in {} ms (retry number {})", address, backOff, reconnectCount); reconnectCount++; + this.reconnectionListener.reconnectionQueuedFromError(cause, address, backOff, reconnectCount); scheduledExecutorService.schedule( () -> { @@ -237,4 +286,36 @@ public static ScheduledExecutorService singletonExecutor() { public String toString() { return "Reconnecting(" + client + ")"; } + + /** + * This class should be regarded as internal API. We recognise it may be useful outside + * internals, to which end it is exposed. All methods are unstable and can change with + * breakage in patch versions. If you just want your own listener with a stable API, you could + * extend {@link AbstractReconnectionListener} yourself and delegate calls to this. + */ + public static class StandardReconnectionListener extends AbstractReconnectionListener { + + public StandardReconnectionListener() {} + + @Override + public void connectionLost(final @Nullable Throwable cause, final HostAndPort address) { + log.info("Lost connection to {}", address); + } + + @Override + public void reconnectionSuccessful( + final HostAndPort address, final int attempt, final boolean willStayConnected) { + log.info("Successfully connected to {}", address); + } + + @Override + public void reconnectionQueuedFromError( + final Throwable cause, + final HostAndPort address, + final long backOffMillis, + final int attempt) { + log.warn( + "Attempting reconnect to {} in {} ms (retry number {})", address, backOffMillis, attempt); + } + } } diff --git a/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectionListener.java b/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectionListener.java new file mode 100644 index 00000000..9c33475e --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/reconnect/ReconnectionListener.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.folsom.reconnect; + +import com.spotify.folsom.guava.HostAndPort; +import javax.annotation.Nullable; + +/** + * A listener for each reconnection event. This will be called more often than the {@link + * com.spotify.folsom.ConnectionChangeListener ConnectionChangeListener}s. + * + *

If you implement this interface manually, expect there to be changes in minor versions + * which can cause source-incompatibility. If you want to implement this interface without this + * hassle, extend {@link AbstractReconnectionListener} instead. + */ +public interface ReconnectionListener { + + /** + * A connection to Memcached was attempted, but ultimately failed. + * + * @param cause why the connection failed, never {@code null} + */ + void connectionFailure(final Throwable cause); + + /** + * The reconnection was cancelled. This can be because the client was shut down between the + * connection and clean up upon a retry, or because a connection couldn't be acquired to begin + * with. + */ + void reconnectionCancelled(); + + /** + * A reconnect was successful, and a new connection has been set up. + * + * @param address the address to which the client connected, never {@code null} + * @param attempt which attempt it succeeded on; this is 1-indexed (i.e. 1st attempt is 1, 2nd is + * 2) + * @param willStayConnected whether the connection will immediately close after this, often due to + * race conditions + */ + void reconnectionSuccessful( + final HostAndPort address, final int attempt, final boolean willStayConnected); + + /** + * A connection to Memcached was lost after having been acquired. + * + *

This will be called regardless of if it was shut down intentionally, or in error. + * + * @param cause the cause of losing the connection, can be {@code null} + * @param address the address to the Memcached node we were connected, never {@code null} + */ + void connectionLost(final @Nullable Throwable cause, final HostAndPort address); + + /** + * A new reconnection has been queued on the executor. This is done after a failure occurred. + * + *

If the client is shut down between this call and the actual connection, you will never see a + * follow-up to any success or failure method. + * + * @param cause the cause of the reconnection, i.e. "what failed to cause this?", never {@code + * null} + * @param address the address to the Memcached node we will connect to, never {@code null} + * @param backOffMillis how many milliseconds we will wait before we attempt again, never {@code + * null} + * @param attempt which attempt we are on; this is 1-indexed (i.e. 1st attempt is 1, 2nd is 2) + */ + void reconnectionQueuedFromError( + final Throwable cause, + final HostAndPort address, + final long backOffMillis, + final int attempt); +} diff --git a/folsom/src/test/java/com/spotify/folsom/MisbehavingServerTest.java b/folsom/src/test/java/com/spotify/folsom/MisbehavingServerTest.java index 49620a68..d50ee8c9 100644 --- a/folsom/src/test/java/com/spotify/folsom/MisbehavingServerTest.java +++ b/folsom/src/test/java/com/spotify/folsom/MisbehavingServerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Spotify AB + * Copyright (c) 2015-2023 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -23,6 +23,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.StandardCharsets; @@ -213,7 +214,7 @@ private MemcacheClient setupAscii(String response) throws Exception { server = new Server(response); MemcacheClient client = MemcacheClientBuilder.newStringClient() - .withAddress("127.0.0.8", server.port) + .withAddress(server.inetAddress.getHostAddress(), server.port) .withRequestTimeoutMillis(100L) .withRetry(false) .connectAscii(); @@ -222,6 +223,7 @@ private MemcacheClient setupAscii(String response) throws Exception { } private static class Server { + private final InetAddress inetAddress; private final int port; private final ServerSocket serverSocket; private final Thread thread; @@ -232,6 +234,7 @@ private static class Server { private Server(String responseString) throws IOException { final byte[] response = responseString.getBytes(StandardCharsets.UTF_8); serverSocket = new ServerSocket(0); + inetAddress = serverSocket.getInetAddress(); port = serverSocket.getLocalPort(); thread = new Thread( diff --git a/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectingClientTest.java b/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectingClientTest.java index d2f85762..4d6c3857 100644 --- a/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectingClientTest.java +++ b/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectingClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2015 Spotify AB + * Copyright (c) 2014-2023 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -86,7 +86,8 @@ public void testInitialConnect() throws Exception { backoffFunction, scheduledExecutorService, connector, - HostAndPort.fromString("localhost:123")); + HostAndPort.fromString("localhost:123"), + new ReconnectingClient.StandardReconnectionListener()); verify(connector, times(3)).connect(); verify(scheduledExecutorService, times(1)) @@ -122,7 +123,8 @@ public void testLostConnectionRetry() throws Exception { backoffFunction, scheduledExecutorService, connector, - HostAndPort.fromString("localhost:123")); + HostAndPort.fromString("localhost:123"), + new ReconnectingClient.StandardReconnectionListener()); verify(connector, times(4)).connect(); verify(scheduledExecutorService, times(1)) @@ -153,7 +155,8 @@ public void testShutdown() throws Exception { backoffFunction, scheduledExecutorService, connector, - HostAndPort.fromString("localhost:123")); + HostAndPort.fromString("localhost:123"), + new ReconnectingClient.StandardReconnectionListener()); assertTrue(client.isConnected()); client.shutdown(); diff --git a/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectionListenerTest.java b/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectionListenerTest.java new file mode 100644 index 00000000..a7f39def --- /dev/null +++ b/folsom/src/test/java/com/spotify/folsom/reconnect/ReconnectionListenerTest.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.folsom.reconnect; + +import org.junit.Test; + +public class ReconnectionListenerTest { + @Test + public void ensureEmptyClassCompiles() { + // This class acts as a test. Nothing should be necessary from the abstract class, as such this + // one line of code should compile at all times. + // + // Needing to change this line should result in a minor version update, at least. + @SuppressWarnings("unused") + class EmptyReconnectionListener extends AbstractReconnectionListener {} + } +}