Skip to content

Commit

Permalink
Merge pull request #232 from Proximyst/mariellh/log-cause-on-reconnect
Browse files Browse the repository at this point in the history
feat: introduce a ReconnectionListener
  • Loading branch information
thiagocesarj authored Oct 16, 2023
2 parents c979630 + 42d7ab4 commit 2c91846
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 1.17.0
* feat: introduce a `ReconnectionListener` which the `ReconnectingClient` will call when trying to reconnect. This is
called more often than registered `ConnectionChangeListener`s, and is mostly useful for logging purposes.

### 1.16.1
* Add non-authenticated errors for all ASCII operations

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2015 Spotify AB
* Copyright (c) 2014-2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -41,7 +41,9 @@
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.folsom.ketama.KetamaMemcacheClient;
import com.spotify.folsom.ketama.ResolvingKetamaClient;
import com.spotify.folsom.reconnect.CatchingReconnectionListener;
import com.spotify.folsom.reconnect.ReconnectingClient;
import com.spotify.folsom.reconnect.ReconnectionListener;
import com.spotify.folsom.retry.RetryingClient;
import com.spotify.folsom.roundrobin.RoundRobinMemcacheClient;
import com.spotify.folsom.transcoder.ByteArrayTranscoder;
Expand Down Expand Up @@ -98,6 +100,8 @@ public class MemcacheClientBuilder<V> {
private final Transcoder<V> valueTranscoder;
private Metrics metrics = NoopMetrics.INSTANCE;
private Tracer tracer = NoopTracer.INSTANCE;
private ReconnectionListener reconnectionListener =
new ReconnectingClient.StandardReconnectionListener();

private BackoffFunction backoffFunction = new ExponentialBackoff(10L, 60 * 1000L, 2.5);

Expand Down Expand Up @@ -307,6 +311,25 @@ public MemcacheClientBuilder<V> withTracer(final Tracer tracer) {
return this;
}

/**
* Specify how to handle reconnections. This does not retain the {@linkplain
* com.spotify.folsom.reconnect.ReconnectingClient.StandardReconnectionListener standard
* implementation} whatsoever, meaning you would need to either delegate to it, or reimplement its
* behaviour.
*
* <p>This method will implicitly wrap your listener in a {@link CatchingReconnectionListener}.
* This is to ensure they cannot break the client returned if they throw.
*
* @param reconnectionListener the listener for reconnections
* @return itself
*/
public MemcacheClientBuilder<V> withReconnectionListener(
final ReconnectionListener reconnectionListener) {
requireNonNull(reconnectionListener, "reconnectionListener cannot be null");
this.reconnectionListener = new CatchingReconnectionListener(reconnectionListener);
return this;
}

/**
* Specify the maximum number of requests in the queue per server connection. If this is set too
* low, requests will fail with {@link com.spotify.folsom.MemcacheOverloadedException}. If this is
Expand Down Expand Up @@ -698,6 +721,7 @@ private RawMemcacheClient createReconnectingClient(
backoffFunction,
ReconnectingClient.singletonExecutor(),
address,
reconnectionListener,
maxOutstandingRequests,
eventLoopThreadFlushMaxBatchSize,
binary,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.spotify.folsom.reconnect;

import com.spotify.folsom.guava.HostAndPort;
import javax.annotation.Nullable;

/**
* A listener for each reconnection event. This will be called more often than the {@link
* com.spotify.folsom.ConnectionChangeListener ConnectionChangeListener}s. For more information on
* this, see {@link ReconnectionListener}.
*
* <p>This class acts as a stable interface for {@link ReconnectionListener}. While {@link
* ReconnectionListener} can have source-incompatible changes in minor versions, this cannot do
* that.
*
* @see ReconnectionListener
*/
public abstract class AbstractReconnectionListener implements ReconnectionListener {
/** {@inheritDoc} */
@Override
public void connectionFailure(final Throwable cause) {
// No-op.
}

/** {@inheritDoc} */
@Override
public void connectionLost(final @Nullable Throwable cause, final HostAndPort address) {
// No-op.
}

/** {@inheritDoc} */
@Override
public void reconnectionSuccessful(
final HostAndPort address, final int attempt, final boolean willStayConnected) {
// No-op.
}

/** {@inheritDoc} */
@Override
public void reconnectionCancelled() {
// No-op.
}

/** {@inheritDoc} */
@Override
public void reconnectionQueuedFromError(
final Throwable cause,
final HostAndPort address,
final long backOffMillis,
final int attempt) {
// No-op.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.spotify.folsom.reconnect;

import com.spotify.folsom.guava.HostAndPort;
import java.util.Objects;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link ReconnectionListener} which ensures the methods never throw {@link Exception} (though
* {@link Error} and {@link Throwable} are not caught).
*
* <p>This should wrap any untrusted {@link ReconnectionListener}.
*
* <p>This class should be regarded as <b>internal API</b>. We recognise it may be useful outside
* internals, to which end it is exposed. The constructor is <b>stable</b>. All methods are
* <b>unstable</b> and can change with breakage in patch versions. If you want a stable interface,
* wrap your own reconnection listener in this, instead of extending it (and instead extend {@link
* AbstractReconnectionListener}).
*/
public class CatchingReconnectionListener implements ReconnectionListener {

private static final Logger log = LoggerFactory.getLogger(CatchingReconnectionListener.class);

private final ReconnectionListener delegate;

public CatchingReconnectionListener(final ReconnectionListener delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");
}

/** {@inheritDoc} */
@Override
public void connectionFailure(final Throwable cause) {
try {
delegate.connectionFailure(cause);
} catch (final Exception ex) {
log.warn("Delegate ReconnectionListener threw on #connectionFailure: {}", this.delegate, ex);
}
}

/** {@inheritDoc} */
@Override
public void reconnectionCancelled() {
try {
delegate.reconnectionCancelled();
} catch (final Exception ex) {
log.warn(
"Delegate ReconnectionListener threw on #reconnectionCancelled: {}", this.delegate, ex);
}
}

/** {@inheritDoc} */
@Override
public void reconnectionSuccessful(
final HostAndPort address, final int attempt, final boolean willStayConnected) {
try {
delegate.reconnectionSuccessful(address, attempt, willStayConnected);
} catch (final Exception ex) {
log.warn(
"Delegate ReconnectionListener threw on #reconnectionSuccessful: {}", this.delegate, ex);
}
}

/** {@inheritDoc} */
@Override
public void connectionLost(final @Nullable Throwable cause, final HostAndPort address) {
try {
delegate.connectionLost(cause, address);
} catch (final Exception ex) {
log.warn("Delegate ReconnectionListener threw on #connectionLost: {}", this.delegate, ex);
}
}

/** {@inheritDoc} */
@Override
public void reconnectionQueuedFromError(
final Throwable cause,
final HostAndPort address,
final long backOffMillis,
final int attempt) {
try {
delegate.reconnectionQueuedFromError(cause, address, backOffMillis, attempt);
} catch (final Exception ex) {
log.warn(
"Delegate ReconnectionListener threw on #reconnectionQueuedFromError: {}",
this.delegate,
ex);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CatchingReconnectionListener)) {
return false;
}
final CatchingReconnectionListener that = (CatchingReconnectionListener) o;
return this.delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return Objects.hash(this.delegate);
}

@Override
public String toString() {
return "CatchingReconnectionListener{delegate=" + this.delegate + '}';
}
}
Loading

0 comments on commit 2c91846

Please sign in to comment.