From c768f890b5915b0844cdecb1030221d41e08729f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 1 Aug 2024 17:58:50 +0200 Subject: [PATCH] Enforce affinity during recovery --- .../client/amqp/impl/AmqpConnection.java | 49 ++++++++++--- .../client/amqp/impl/AmqpManagement.java | 2 +- .../impl/AmqpConnectionAffinityUnitTest.java | 42 +++++++++++- .../com/rabbitmq/client/amqp/impl/Cli.java | 21 ++++++ .../client/amqp/impl/ClusterTest.java | 68 ++++++++++++++++--- .../rabbitmq/client/amqp/impl/TestUtils.java | 18 +++++ src/test/resources/logback-test.xml | 2 +- 7 files changed, 179 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index 85ed17199..430307544 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -282,7 +282,23 @@ static NativeConnectionWrapper enforceAffinity( affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address); if (nodesWithAffinity.contains(connectionWrapper.nodename)) { LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename); - pickedConnection = connectionWrapper; + if (!queueInfoRefreshed) { + LOGGER.debug("Refreshing queue information."); + management.init(); + info = management.queueInfo(affinity.queue()); + affinityCache.queueInfo(info); + nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info); + LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity); + queueInfoRefreshed = true; + if (nodesWithAffinity.contains(connectionWrapper.nodename)) { + pickedConnection = connectionWrapper; + } else { + management.releaseResources(); + connectionWrapper.connection.close(); + } + } else { + pickedConnection = connectionWrapper; + } } else if (attemptCount == 5) { LOGGER.debug( "Could not find affinity {} after {} attempt(s), using last connection.", @@ -389,7 +405,8 @@ private void recoverAfterConnectionFailure( AmqpException failureCause, AtomicReference> disconnectedHandlerReference) { - LOGGER.info("Connection to {} failed, trying to recover", this.currentConnectionLabel()); + LOGGER.info( + "Connection to {} has been disconnected, trying to recover", this.currentConnectionLabel()); this.state(RECOVERING, failureCause); this.changeStateOfPublishers(RECOVERING, failureCause); this.changeStateOfConsumers(RECOVERING, failureCause); @@ -399,9 +416,13 @@ private void recoverAfterConnectionFailure( this.releaseManagementResources(); try { this.recoveringConnection.set(true); - this.nativeConnection = + NativeConnectionWrapper ncw = recoverNativeConnection( recoveryConfiguration, connectionName, disconnectedHandlerReference); + this.connectionAddress = ncw.address; + this.connectionNodename = ncw.nodename; + this.nativeConnection = ncw.connection; + LOGGER.debug("Reconnected to {}", this.currentConnectionLabel()); } catch (Exception ex) { if (ex instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -432,7 +453,7 @@ private void recoverAfterConnectionFailure( } } - private org.apache.qpid.protonj2.client.Connection recoverNativeConnection( + private NativeConnectionWrapper recoverNativeConnection( AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration, String connectionName, AtomicReference> @@ -455,11 +476,21 @@ private org.apache.qpid.protonj2.client.Connection recoverNativeConnection( try { NativeConnectionWrapper result = - connect( - this.connectionSettings, connectionName, disconnectedHandlerReference.get(), null); - this.connectionAddress = result.address; - LOGGER.debug("Reconnected to {}", this.currentConnectionLabel()); - return result.connection; + enforceAffinity( + addrs -> { + NativeConnectionWrapper wrapper = + connect( + this.connectionSettings, + connectionName, + disconnectedHandlerReference.get(), + addrs); + this.nativeConnection = wrapper.connection; + return wrapper; + }, + this.management, + this.affinity, + this.environment.affinityCache()); + return result; } catch (Exception ex) { LOGGER.info("Error while trying to recover connection", ex); if (!RECOVERY_PREDICATE.test(ex)) { diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java index 914b014d1..71ff14a5f 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java @@ -255,7 +255,7 @@ void init() { } } catch (ClientException e) { java.util.function.Consumer log = - this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.warn(m, e); + this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.info(m, e); log.accept("Error while polling AMQP receiver"); } }; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java index 455f1be74..1fc992c00 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java @@ -17,6 +17,7 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp.impl; +import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM; import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; import static com.rabbitmq.client.amqp.impl.AmqpConnection.enforceAffinity; import static org.assertj.core.api.Assertions.fail; @@ -48,6 +49,11 @@ public class AmqpConnectionAffinityUnitTest { private static final String FOLLOWER2_NODENAME = "f2"; private static final Address FOLLOWER2_ADDRESS = new Address(FOLLOWER2_NODENAME, 5672); private static final String Q = "my-queue"; + private static final Map NODES = + Map.of( + LEADER_NODENAME, LEADER_ADDRESS, + FOLLOWER1_NODENAME, FOLLOWER1_ADDRESS, + FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS); AutoCloseable mocks; @@ -71,17 +77,39 @@ void tearDown() throws Exception { } @Test - void noInfoLookupIfAlreadyInCache() { + void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldUseConnectionIfMatch() { cache.queueInfo(info()); + when(management.queueInfo(Q)).thenReturn(info()); when(cf.apply(anyList())).thenReturn(leaderConnection()); AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache); assertThat(w).isLeader(); - verifyNoInteractions(management); + verify(management, times(1)).queueInfo(Q); verify(cf, times(1)).apply(anyList()); verify(nativeConnection, never()).close(); assertThat(cache).contains(info()).hasMapping(LEADER_NODENAME, LEADER_ADDRESS); } + @Test + void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldRetryIfConnectionDoesNotMatch() { + String initialLeader = LEADER_NODENAME; + when(management.queueInfo(Q)).thenReturn(info(initialLeader)); + when(cf.apply(anyList())).thenReturn(leaderConnection()); + AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache); + assertThat(w).hasNodename(initialLeader); + + String newLeader = FOLLOWER1_NODENAME; + // the QQ leader moves to another node for some reason + // the cache is stale, the management is the authority + when(management.queueInfo(Q)).thenReturn(info(newLeader)); + when(cf.apply(anyList())).thenReturn(leaderConnection()).thenReturn(follower1Connection()); + // we want the returned connection to be on the new leader + w = enforceAffinity(cf, management, affinity(), cache); + assertThat(w).hasNodename(newLeader); + verify(management, times(2)).queueInfo(Q); + verify(cf, times(3)).apply(anyList()); + verify(nativeConnection, times(1)).close(); + } + @Test void infoLookupIfNotInCache() { when(cf.apply(anyList())).thenReturn(leaderConnection()); @@ -145,10 +173,20 @@ AmqpConnection.NativeConnectionWrapper follower2Connection() { this.nativeConnection, FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS); } + AmqpConnection.NativeConnectionWrapper connection(String nodename) { + return new AmqpConnection.NativeConnectionWrapper( + this.nativeConnection, nodename, NODES.get(nodename)); + } + static ConnectionUtils.ConnectionAffinity affinity() { return new ConnectionUtils.ConnectionAffinity(Q, ConnectionSettings.Affinity.Operation.PUBLISH); } + static Management.QueueInfo info(String leader) { + return new TestQueueInfo( + Q, QUORUM, leader, List.of(LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME)); + } + static Management.QueueInfo info() { return info( Management.QueueType.QUORUM, LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Cli.java b/src/test/java/com/rabbitmq/client/amqp/impl/Cli.java index f6e95c8f4..17c40309f 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Cli.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/Cli.java @@ -53,10 +53,23 @@ public static String rabbitmqctlCommand() { } } + static String rabbitmqQueuesCommand() { + String rabbitmqctl = rabbitmqctlCommand(); + int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl"); + if (lastIndex == -1) { + throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl); + } + return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-queues"; + } + static ProcessState rabbitmqctl(String command) { return executeCommand(rabbitmqctlCommand() + " " + command); } + static ProcessState rabbitmqQueues(String command) { + return executeCommand(rabbitmqQueuesCommand() + " " + command); + } + static ProcessState rabbitmqctlIgnoreError(String command) { return executeCommand(rabbitmqctlCommand() + " " + command, true); } @@ -177,6 +190,14 @@ static boolean exchangeExists(String exchange) { return Arrays.asList(output.split("\n")).contains(exchange); } + static void addQuorumQueueMember(String queue, String node) { + rabbitmqQueues(" add_member " + queue + " " + node); + } + + static void deleteQuorumQueueMember(String queue, String node) { + rabbitmqQueues(" delete_member " + queue + " " + node); + } + static List listConnections() { String output = rabbitmqctl("list_connections -q pid peer_port client_properties").output(); // output (header line presence depends on broker version): diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java index 2fb6af364..22465c6a3 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java @@ -19,6 +19,7 @@ import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.CONSUME; import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.PUBLISH; +import static java.time.Duration.ofMillis; import static org.assertj.core.api.Assertions.assertThat; import com.rabbitmq.client.amqp.*; @@ -32,13 +33,15 @@ @TestUtils.DisabledIfNotCluster public class ClusterTest { + static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy.fixed(ofMillis(100)); Environment environment; Connection connection; Management management; - String name; + String q, name; @BeforeEach void init(TestInfo info) { + this.q = TestUtils.name(info); this.name = TestUtils.name(info); environment = new AmqpEnvironmentBuilder() @@ -60,24 +63,69 @@ void tearDown() { @ParameterizedTest void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) { try { - management.queue(name).type(type).declare(); - AmqpConnection consumeConnection = - connection(b -> b.affinity().queue(name).operation(CONSUME)); - AmqpConnection publishConnection = - connection(b -> b.affinity().queue(name).operation(PUBLISH)); - Management.QueueInfo info = connection.management().queueInfo(name); + management.queue(q).type(type).declare(); + AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME)); + AmqpConnection publishConnection = connection(b -> b.affinity().queue(q).operation(PUBLISH)); + Management.QueueInfo info = connection.management().queueInfo(q); assertThat(publishConnection.connectionNodename()).isEqualTo(info.leader()); assertThat(consumeConnection.connectionNodename()) .isIn(info.replicas()) .isNotEqualTo(info.leader()); assertThat(Cli.listConnections()).hasSize(3); } finally { - management.queueDeletion().delete(name); + management.queueDeletion().delete(q); + } + } + + @Test + void connectionShouldRecoverToNewQuorumQueueLeaderAfterAfterItHasMoved() { + try { + management.queue(q).type(Management.QueueType.QUORUM).declare(); + Management.QueueInfo info = queueInfo(); + String initialLeader = info.leader(); + + TestUtils.Sync recoveredSync = TestUtils.sync(); + AmqpConnection publishConnection = + connection( + b -> + b.name(name) + .listeners( + context -> { + if (context.previousState() == Resource.State.RECOVERING + && context.currentState() == Resource.State.OPEN) { + recoveredSync.down(); + } + }) + .affinity() + .queue(q) + .operation(PUBLISH)); + assertThat(publishConnection.connectionNodename()).isEqualTo(initialLeader); + + int initialReplicaCount = info.replicas().size(); + Cli.deleteQuorumQueueMember(q, initialLeader); + TestUtils.waitAtMost(() -> !queueInfo().leader().equals(initialLeader)); + assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1); + Cli.addQuorumQueueMember(q, initialLeader); + TestUtils.waitAtMost(() -> queueInfo().replicas().size() == initialReplicaCount); + info = queueInfo(); + TestUtils.assertThat(info).doesNotHaveLeader(initialLeader); + String newLeader = info.leader(); + + Cli.closeConnection(name); + TestUtils.assertThat(recoveredSync).completes(); + assertThat(publishConnection.connectionNodename()).isEqualTo(newLeader); + } finally { + management.queueDeletion().delete(q); } } - AmqpConnection connection(Consumer operation) { - ConnectionBuilder builder = environment.connectionBuilder(); + Management.QueueInfo queueInfo() { + return this.management.queueInfo(q); + } + + AmqpConnection connection(Consumer operation) { + AmqpConnectionBuilder builder = (AmqpConnectionBuilder) environment.connectionBuilder(); + builder.recovery().backOffDelayPolicy(BACK_OFF_DELAY_POLICY); operation.accept(builder); return (AmqpConnection) builder.build(); } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java index 26440ead7..84ff96958 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java @@ -589,6 +589,24 @@ QueueInfoAssert is(Management.QueueType type) { return this; } + QueueInfoAssert hasLeader(String leader) { + Assert.notNull(leader, "Expected leader cannot be null"); + isNotNull(); + if (!leader.equals(actual.leader())) { + fail("Queue leader should be '%s' but is '%s'", leader, actual.leader()); + } + return this; + } + + QueueInfoAssert doesNotHaveLeader(String leader) { + Assert.notNull(leader, "Leader cannot be null"); + isNotNull(); + if (leader.equals(actual.leader())) { + fail("Queue leader should not be '%s'", leader); + } + return this; + } + QueueInfoAssert hasArgument(String key, Object value) { isNotNull(); if (!actual.arguments().containsKey(key) || !actual.arguments().get(key).equals(value)) { diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index d80cc6627..7be25eb64 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - +