Skip to content

Commit

Permalink
Enforce affinity during recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Aug 1, 2024
1 parent 607b60f commit c768f89
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 23 deletions.
49 changes: 40 additions & 9 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,23 @@ static NativeConnectionWrapper enforceAffinity(
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename);
pickedConnection = connectionWrapper;
if (!queueInfoRefreshed) {
LOGGER.debug("Refreshing queue information.");
management.init();
info = management.queueInfo(affinity.queue());
affinityCache.queueInfo(info);
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
queueInfoRefreshed = true;
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
pickedConnection = connectionWrapper;
} else {
management.releaseResources();
connectionWrapper.connection.close();
}
} else {
pickedConnection = connectionWrapper;
}
} else if (attemptCount == 5) {
LOGGER.debug(
"Could not find affinity {} after {} attempt(s), using last connection.",
Expand Down Expand Up @@ -389,7 +405,8 @@ private void recoverAfterConnectionFailure(
AmqpException failureCause,
AtomicReference<BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>>
disconnectedHandlerReference) {
LOGGER.info("Connection to {} failed, trying to recover", this.currentConnectionLabel());
LOGGER.info(
"Connection to {} has been disconnected, trying to recover", this.currentConnectionLabel());
this.state(RECOVERING, failureCause);
this.changeStateOfPublishers(RECOVERING, failureCause);
this.changeStateOfConsumers(RECOVERING, failureCause);
Expand All @@ -399,9 +416,13 @@ private void recoverAfterConnectionFailure(
this.releaseManagementResources();
try {
this.recoveringConnection.set(true);
this.nativeConnection =
NativeConnectionWrapper ncw =
recoverNativeConnection(
recoveryConfiguration, connectionName, disconnectedHandlerReference);
this.connectionAddress = ncw.address;
this.connectionNodename = ncw.nodename;
this.nativeConnection = ncw.connection;
LOGGER.debug("Reconnected to {}", this.currentConnectionLabel());
} catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -432,7 +453,7 @@ private void recoverAfterConnectionFailure(
}
}

private org.apache.qpid.protonj2.client.Connection recoverNativeConnection(
private NativeConnectionWrapper recoverNativeConnection(
AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration,
String connectionName,
AtomicReference<BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>>
Expand All @@ -455,11 +476,21 @@ private org.apache.qpid.protonj2.client.Connection recoverNativeConnection(

try {
NativeConnectionWrapper result =
connect(
this.connectionSettings, connectionName, disconnectedHandlerReference.get(), null);
this.connectionAddress = result.address;
LOGGER.debug("Reconnected to {}", this.currentConnectionLabel());
return result.connection;
enforceAffinity(
addrs -> {
NativeConnectionWrapper wrapper =
connect(
this.connectionSettings,
connectionName,
disconnectedHandlerReference.get(),
addrs);
this.nativeConnection = wrapper.connection;
return wrapper;
},
this.management,
this.affinity,
this.environment.affinityCache());
return result;
} catch (Exception ex) {
LOGGER.info("Error while trying to recover connection", ex);
if (!RECOVERY_PREDICATE.test(ex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ void init() {
}
} catch (ClientException e) {
java.util.function.Consumer<String> log =
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.warn(m, e);
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.info(m, e);
log.accept("Error while polling AMQP receiver");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// [email protected].
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
import static com.rabbitmq.client.amqp.impl.AmqpConnection.enforceAffinity;
import static org.assertj.core.api.Assertions.fail;
Expand Down Expand Up @@ -48,6 +49,11 @@ public class AmqpConnectionAffinityUnitTest {
private static final String FOLLOWER2_NODENAME = "f2";
private static final Address FOLLOWER2_ADDRESS = new Address(FOLLOWER2_NODENAME, 5672);
private static final String Q = "my-queue";
private static final Map<String, Address> NODES =
Map.of(
LEADER_NODENAME, LEADER_ADDRESS,
FOLLOWER1_NODENAME, FOLLOWER1_ADDRESS,
FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS);

AutoCloseable mocks;

Expand All @@ -71,17 +77,39 @@ void tearDown() throws Exception {
}

@Test
void noInfoLookupIfAlreadyInCache() {
void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldUseConnectionIfMatch() {
cache.queueInfo(info());
when(management.queueInfo(Q)).thenReturn(info());
when(cf.apply(anyList())).thenReturn(leaderConnection());
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
assertThat(w).isLeader();
verifyNoInteractions(management);
verify(management, times(1)).queueInfo(Q);
verify(cf, times(1)).apply(anyList());
verify(nativeConnection, never()).close();
assertThat(cache).contains(info()).hasMapping(LEADER_NODENAME, LEADER_ADDRESS);
}

@Test
void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldRetryIfConnectionDoesNotMatch() {
String initialLeader = LEADER_NODENAME;
when(management.queueInfo(Q)).thenReturn(info(initialLeader));
when(cf.apply(anyList())).thenReturn(leaderConnection());
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
assertThat(w).hasNodename(initialLeader);

String newLeader = FOLLOWER1_NODENAME;
// the QQ leader moves to another node for some reason
// the cache is stale, the management is the authority
when(management.queueInfo(Q)).thenReturn(info(newLeader));
when(cf.apply(anyList())).thenReturn(leaderConnection()).thenReturn(follower1Connection());
// we want the returned connection to be on the new leader
w = enforceAffinity(cf, management, affinity(), cache);
assertThat(w).hasNodename(newLeader);
verify(management, times(2)).queueInfo(Q);
verify(cf, times(3)).apply(anyList());
verify(nativeConnection, times(1)).close();
}

@Test
void infoLookupIfNotInCache() {
when(cf.apply(anyList())).thenReturn(leaderConnection());
Expand Down Expand Up @@ -145,10 +173,20 @@ AmqpConnection.NativeConnectionWrapper follower2Connection() {
this.nativeConnection, FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS);
}

AmqpConnection.NativeConnectionWrapper connection(String nodename) {
return new AmqpConnection.NativeConnectionWrapper(
this.nativeConnection, nodename, NODES.get(nodename));
}

static ConnectionUtils.ConnectionAffinity affinity() {
return new ConnectionUtils.ConnectionAffinity(Q, ConnectionSettings.Affinity.Operation.PUBLISH);
}

static Management.QueueInfo info(String leader) {
return new TestQueueInfo(
Q, QUORUM, leader, List.of(LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME));
}

static Management.QueueInfo info() {
return info(
Management.QueueType.QUORUM, LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME);
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,23 @@ public static String rabbitmqctlCommand() {
}
}

static String rabbitmqQueuesCommand() {
String rabbitmqctl = rabbitmqctlCommand();
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
if (lastIndex == -1) {
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
}
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-queues";
}

static ProcessState rabbitmqctl(String command) {
return executeCommand(rabbitmqctlCommand() + " " + command);
}

static ProcessState rabbitmqQueues(String command) {
return executeCommand(rabbitmqQueuesCommand() + " " + command);
}

static ProcessState rabbitmqctlIgnoreError(String command) {
return executeCommand(rabbitmqctlCommand() + " " + command, true);
}
Expand Down Expand Up @@ -177,6 +190,14 @@ static boolean exchangeExists(String exchange) {
return Arrays.asList(output.split("\n")).contains(exchange);
}

static void addQuorumQueueMember(String queue, String node) {
rabbitmqQueues(" add_member " + queue + " " + node);
}

static void deleteQuorumQueueMember(String queue, String node) {
rabbitmqQueues(" delete_member " + queue + " " + node);
}

static List<ConnectionInfo> listConnections() {
String output = rabbitmqctl("list_connections -q pid peer_port client_properties").output();
// output (header line presence depends on broker version):
Expand Down
68 changes: 58 additions & 10 deletions src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.CONSUME;
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.PUBLISH;
import static java.time.Duration.ofMillis;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.client.amqp.*;
Expand All @@ -32,13 +33,15 @@
@TestUtils.DisabledIfNotCluster
public class ClusterTest {

static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy.fixed(ofMillis(100));
Environment environment;
Connection connection;
Management management;
String name;
String q, name;

@BeforeEach
void init(TestInfo info) {
this.q = TestUtils.name(info);
this.name = TestUtils.name(info);
environment =
new AmqpEnvironmentBuilder()
Expand All @@ -60,24 +63,69 @@ void tearDown() {
@ParameterizedTest
void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) {
try {
management.queue(name).type(type).declare();
AmqpConnection consumeConnection =
connection(b -> b.affinity().queue(name).operation(CONSUME));
AmqpConnection publishConnection =
connection(b -> b.affinity().queue(name).operation(PUBLISH));
Management.QueueInfo info = connection.management().queueInfo(name);
management.queue(q).type(type).declare();
AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME));
AmqpConnection publishConnection = connection(b -> b.affinity().queue(q).operation(PUBLISH));
Management.QueueInfo info = connection.management().queueInfo(q);
assertThat(publishConnection.connectionNodename()).isEqualTo(info.leader());
assertThat(consumeConnection.connectionNodename())
.isIn(info.replicas())
.isNotEqualTo(info.leader());
assertThat(Cli.listConnections()).hasSize(3);
} finally {
management.queueDeletion().delete(name);
management.queueDeletion().delete(q);
}
}

@Test
void connectionShouldRecoverToNewQuorumQueueLeaderAfterAfterItHasMoved() {
try {
management.queue(q).type(Management.QueueType.QUORUM).declare();
Management.QueueInfo info = queueInfo();
String initialLeader = info.leader();

TestUtils.Sync recoveredSync = TestUtils.sync();
AmqpConnection publishConnection =
connection(
b ->
b.name(name)
.listeners(
context -> {
if (context.previousState() == Resource.State.RECOVERING
&& context.currentState() == Resource.State.OPEN) {
recoveredSync.down();
}
})
.affinity()
.queue(q)
.operation(PUBLISH));
assertThat(publishConnection.connectionNodename()).isEqualTo(initialLeader);

int initialReplicaCount = info.replicas().size();
Cli.deleteQuorumQueueMember(q, initialLeader);
TestUtils.waitAtMost(() -> !queueInfo().leader().equals(initialLeader));
assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1);
Cli.addQuorumQueueMember(q, initialLeader);
TestUtils.waitAtMost(() -> queueInfo().replicas().size() == initialReplicaCount);
info = queueInfo();
TestUtils.assertThat(info).doesNotHaveLeader(initialLeader);
String newLeader = info.leader();

Cli.closeConnection(name);
TestUtils.assertThat(recoveredSync).completes();
assertThat(publishConnection.connectionNodename()).isEqualTo(newLeader);
} finally {
management.queueDeletion().delete(q);
}
}

AmqpConnection connection(Consumer<ConnectionBuilder> operation) {
ConnectionBuilder builder = environment.connectionBuilder();
Management.QueueInfo queueInfo() {
return this.management.queueInfo(q);
}

AmqpConnection connection(Consumer<AmqpConnectionBuilder> operation) {
AmqpConnectionBuilder builder = (AmqpConnectionBuilder) environment.connectionBuilder();
builder.recovery().backOffDelayPolicy(BACK_OFF_DELAY_POLICY);
operation.accept(builder);
return (AmqpConnection) builder.build();
}
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,24 @@ QueueInfoAssert is(Management.QueueType type) {
return this;
}

QueueInfoAssert hasLeader(String leader) {
Assert.notNull(leader, "Expected leader cannot be null");
isNotNull();
if (!leader.equals(actual.leader())) {
fail("Queue leader should be '%s' but is '%s'", leader, actual.leader());
}
return this;
}

QueueInfoAssert doesNotHaveLeader(String leader) {
Assert.notNull(leader, "Leader cannot be null");
isNotNull();
if (leader.equals(actual.leader())) {
fail("Queue leader should not be '%s'", leader);
}
return this;
}

QueueInfoAssert hasArgument(String key, Object value) {
isNotNull();
if (!actual.arguments().containsKey(key) || !actual.arguments().get(key).equals(value)) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</encoder>
</appender>

<logger name="com.rabbitmq.client.amqp.AmqpClientTestExtension" level="info" />
<logger name="com.rabbitmq.client.amqp.AmqpClientTestExtension" level="warn" />
<logger name="com.rabbitmq.client.amqp" level="warn" />
<logger name="com.rabbitmq.client.amqp.impl.EntityRecovery" level="warn" />
<logger name="com.rabbitmq.client.amqp.impl.AmqpConnection" level="warn" />
Expand Down

0 comments on commit c768f89

Please sign in to comment.