From fee17223ed8bdde03bbf28b1e764245057984555 Mon Sep 17 00:00:00 2001 From: "Le Gall, Benoit" Date: Tue, 17 Dec 2024 00:18:35 +0100 Subject: [PATCH] [consul] Add wait param to blocked queries --- README.md | 4 +- build.gradle.kts | 1 - .../consul/watch/WatchConfiguration.java | 68 +++++++++++++++---- .../consul/watch/client/BlockedQuery.java | 18 +++++ .../client/BlockedQueryClientFilter.java | 28 ++++++++ .../watch/client/WatchConsulClient.java | 10 ++- .../consul/watch/watcher/AbstractWatcher.java | 2 +- .../consul/watch/WatchConfigurationTest.java | 66 ++++++++++++++++++ .../client/BlockedQueryClientFilterTest.java | 59 ++++++++++++++++ .../consul/watch/watcher/WatcherTest.java | 4 +- 10 files changed, 235 insertions(+), 25 deletions(-) create mode 100644 src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQuery.java create mode 100644 src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilter.java create mode 100644 src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchConfigurationTest.java create mode 100644 src/test/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilterTest.java diff --git a/README.md b/README.md index 79de229..56a02cd 100644 --- a/README.md +++ b/README.md @@ -40,9 +40,7 @@ consul: watch: disabled: false # to disable the watcher, during test for instance - retry-count: 3 # The maximum number of retry attempts - retry-delay: 1s # The delay between retry attempts - read-timeout: 10m # Sets the watch timeout + wait-timeout: 10m # Sets the maximum duration for the blocking request watch-delay: 500ms # Sets the watch delay before each call to avoid flooding ``` diff --git a/build.gradle.kts b/build.gradle.kts index 239857b..47156c3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -23,7 +23,6 @@ dependencies { implementation(mn.micronaut.serde.jackson) implementation(mn.micronaut.discovery.client) implementation(mn.micronaut.reactor) - implementation(mn.micronaut.retry) implementation(mn.guava) // ----------- TESTS ----------- diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java index c1faa96..5c0c679 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java @@ -5,6 +5,7 @@ import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.convert.ConversionService; import io.micronaut.discovery.consul.ConsulConfiguration; import io.micronaut.discovery.consul.condition.RequiresConsul; import io.micronaut.http.client.HttpClientConfiguration; @@ -13,14 +14,14 @@ @ConfigurationProperties(WatchConfiguration.PREFIX) public class WatchConfiguration extends HttpClientConfiguration { - public static final String EXPR_CONSUL_WATCH_RETRY_COUNT = "${" + WatchConfiguration.PREFIX + ".retry-count:3}"; - public static final String EXPR_CONSUL_WATCH_RETRY_DELAY = "${" + WatchConfiguration.PREFIX + ".retry-delay:1s}"; - /** - * The default block timeout in minutes. + * The default wait timeout in minutes. */ - public static final long DEFAULT_BLOCK_TIMEOUT_MINUTES = 10; + public static final String DEFAULT_WAIT_TIMEOUT_MINUTES = "10m"; + /** + * The default watch delay in milliseconds. + */ public static final long DEFAULT_WATCH_DELAY_MILLISECONDS = 500; /** @@ -28,14 +29,25 @@ public class WatchConfiguration extends HttpClientConfiguration { */ public static final String PREFIX = "consul.watch"; - private Duration readTimeout = Duration.ofMinutes(DEFAULT_BLOCK_TIMEOUT_MINUTES); + private String waitTimeout = DEFAULT_WAIT_TIMEOUT_MINUTES; private Duration watchDelay = Duration.ofSeconds(DEFAULT_WATCH_DELAY_MILLISECONDS); + private Duration readTimeout = null; private final ConsulConfiguration consulConfiguration; + private final ConversionService conversionService; - public WatchConfiguration(final ConsulConfiguration consulConfiguration) { + /** + * Default constructor + * + * @param consulConfiguration {@link ConsulConfiguration} use as base. + * @param conversionService Use to calculate the {@link #readTimeout} from the {@link #waitTimeout}. + * @see ConsulConfiguration + */ + public WatchConfiguration(final ConsulConfiguration consulConfiguration, + final ConversionService conversionService) { super(consulConfiguration); this.consulConfiguration = consulConfiguration; + this.conversionService = conversionService; } @Override @@ -43,24 +55,54 @@ public ConnectionPoolConfiguration getConnectionPoolConfiguration() { return consulConfiguration.getConnectionPoolConfiguration(); } + /** + * @return The read timeout, depending on the {@link #waitTimeout} value. + */ @Override public Optional getReadTimeout() { - return Optional.ofNullable(readTimeout); + if (this.readTimeout == null) { + this.readTimeout = calculateReadTimeout(); + } + return Optional.of(this.readTimeout); } - @Override - public void setReadTimeout(@Nullable final Duration readTimeout) { - this.readTimeout = readTimeout; + private Duration calculateReadTimeout() { + final var waitValue = Optional.ofNullable(getWaitTimeout()) + .orElse(DEFAULT_WAIT_TIMEOUT_MINUTES); + + final var duration = conversionService.convertRequired(waitValue, Duration.class); + // to have the client timeout greater than the wait of the Blocked Query + return duration.plusMillis(duration.toMillis() / 16); } + /** + * @return The wait timeout. Defaults to {@value DEFAULT_WAIT_TIMEOUT_MINUTES}. + */ + public String getWaitTimeout() { + return this.waitTimeout; + } + + /** + * Specify the maximum duration for the blocking request. Default value ({@value #DEFAULT_WAIT_TIMEOUT_MINUTES}). + * + * @param waitTimeout The wait timeout + */ + public void setWaitTimeout(@Nullable final String waitTimeout) { + this.waitTimeout = waitTimeout; + this.readTimeout = calculateReadTimeout(); + } + + /** + * @return The watch delay. Defaults to {@value DEFAULT_WATCH_DELAY_MILLISECONDS} milliseconds. + */ public Duration getWatchDelay() { - return watchDelay; + return this.watchDelay; } /** * Sets the watch delay before each call to avoid flooding. Default value ({@value #DEFAULT_WATCH_DELAY_MILLISECONDS} milliseconds). * - * @param watchDelay The read timeout + * @param watchDelay The watch delay */ public void setWatchDelay(final Duration watchDelay) { this.watchDelay = watchDelay; diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQuery.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQuery.java new file mode 100644 index 0000000..f378fe6 --- /dev/null +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQuery.java @@ -0,0 +1,18 @@ +package com.frogdevelopment.micronaut.consul.watch.client; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import io.micronaut.http.annotation.FilterMatcher; + +@FilterMatcher +@Documented +@Retention(RUNTIME) +@Target({METHOD, TYPE}) +public @interface BlockedQuery { +} diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilter.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilter.java new file mode 100644 index 0000000..5becd2e --- /dev/null +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilter.java @@ -0,0 +1,28 @@ +package com.frogdevelopment.micronaut.consul.watch.client; + +import lombok.RequiredArgsConstructor; + +import jakarta.inject.Singleton; + +import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration; + +import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.annotation.ClientFilter; +import io.micronaut.http.annotation.RequestFilter; + +@BlockedQuery +@Singleton +@ClientFilter +@RequiredArgsConstructor +public class BlockedQueryClientFilter { + + private final WatchConfiguration watchConfiguration; + + @RequestFilter + public void filter(final MutableHttpRequest request) { + final var parameters = request.getParameters(); + if (parameters.contains("index")) { + parameters.add("wait", watchConfiguration.getWaitTimeout()); + } + } +} diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/WatchConsulClient.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/WatchConsulClient.java index 7cf5d5c..d3d550e 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/WatchConsulClient.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/WatchConsulClient.java @@ -10,18 +10,16 @@ import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.QueryValue; import io.micronaut.http.client.annotation.Client; -import io.micronaut.retry.annotation.Retryable; import reactor.core.publisher.Mono; @Requires(beans = WatchConfiguration.class) @Client(id = ConsulClient.SERVICE_ID, path = "/v1", configuration = WatchConfiguration.class) public interface WatchConsulClient { + @BlockedQuery @Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true) - @Retryable( - attempts = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_COUNT, - delay = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_DELAY - ) - Mono> watchValues(String key, @Nullable @QueryValue Boolean recurse, @Nullable @QueryValue Integer index); + Mono> watchValues(String key, + @Nullable @QueryValue Boolean recurse, + @Nullable @QueryValue Integer index); } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java index 7569e40..ecd2be3 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java @@ -126,7 +126,7 @@ private void onError(final String kvPath, final Throwable throwable) { log.trace("No KV found with kvPath={}", kvPath); listeners.remove(kvPath); } else if (throwable instanceof ReadTimeoutException) { - log.debug("Timeout for kvPath={}", kvPath); + log.warn("Timeout for kvPath={}", kvPath); watchKvPath(kvPath); } else { log.error("Watching kvPath={} failed", kvPath, throwable); diff --git a/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchConfigurationTest.java b/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchConfigurationTest.java new file mode 100644 index 0000000..532e71a --- /dev/null +++ b/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchConfigurationTest.java @@ -0,0 +1,66 @@ +package com.frogdevelopment.micronaut.consul.watch; + +import static com.frogdevelopment.micronaut.consul.watch.WatchConfiguration.DEFAULT_WAIT_TIMEOUT_MINUTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.times; + +import java.time.Duration; +import java.util.Optional; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.reflect.ReflectionUtils; +import io.micronaut.discovery.consul.ConsulConfiguration; + +@ExtendWith(MockitoExtension.class) +class WatchConfigurationTest { + + @InjectMocks + private WatchConfiguration watchConfiguration; + + @Mock + private ConsulConfiguration consulConfiguration; + @Mock + private ConversionService conversionService; + + @Test + void should_useDefaultWatchTimeout() { + // given + given(conversionService.convertRequired(DEFAULT_WAIT_TIMEOUT_MINUTES, Duration.class)).willReturn(Duration.ofMinutes(10)); + + // when + final var readTimeout1 = watchConfiguration.getReadTimeout(); + final var readTimeout2 = watchConfiguration.getReadTimeout(); + + // then + assertThat(readTimeout1) + .hasValueSatisfying(value -> assertThat(value).hasSeconds(637)); + assertThat(readTimeout2).isEqualTo(readTimeout1); + then(conversionService).should(times(1)).convertRequired(DEFAULT_WAIT_TIMEOUT_MINUTES, Duration.class); + } + + @Test + void should_calculateReadTimeout_when_settingWaitTimeout() { + // given + assertThat(getReadTimeoutValue()).isEmpty(); + given(conversionService.convertRequired("16s", Duration.class)).willReturn(Duration.ofSeconds(16)); + + // when + watchConfiguration.setWaitTimeout("16s"); + + // then + assertThat(getReadTimeoutValue()).hasValue(Duration.ofSeconds(17)); + } + + private Optional getReadTimeoutValue() { + return ReflectionUtils.getFieldValue(WatchConfiguration.class, "readTimeout", watchConfiguration); + } + +} diff --git a/src/test/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilterTest.java b/src/test/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilterTest.java new file mode 100644 index 0000000..d2cd7c5 --- /dev/null +++ b/src/test/java/com/frogdevelopment/micronaut/consul/watch/client/BlockedQueryClientFilterTest.java @@ -0,0 +1,59 @@ +package com.frogdevelopment.micronaut.consul.watch.client; + +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration; + +import io.micronaut.http.MutableHttpParameters; +import io.micronaut.http.MutableHttpRequest; + +@ExtendWith(MockitoExtension.class) +class BlockedQueryClientFilterTest { + + @InjectMocks + private BlockedQueryClientFilter blockedQueryClientFilter; + + @Mock + private WatchConfiguration watchConfiguration; + @Mock + private MutableHttpRequest request; + @Mock + private MutableHttpParameters parameters; + + @Test + void should_doNothing_when_indexIsNotPresent() { + // given + given(request.getParameters()).willReturn(parameters); + given(parameters.contains("index")).willReturn(false); + + // when + blockedQueryClientFilter.filter(request); + + // then + then(parameters).shouldHaveNoMoreInteractions(); + then(watchConfiguration).shouldHaveNoInteractions(); + } + + @Test + void should_addWait_when_indexIsPresent() { + // given + given(request.getParameters()).willReturn(parameters); + given(parameters.contains("index")).willReturn(true); + given(watchConfiguration.getWaitTimeout()).willReturn("666s"); + + // when + blockedQueryClientFilter.filter(request); + + // then + then(parameters).should().add("wait", "666s"); + } + + +} diff --git a/src/test/java/com/frogdevelopment/micronaut/consul/watch/watcher/WatcherTest.java b/src/test/java/com/frogdevelopment/micronaut/consul/watch/watcher/WatcherTest.java index 7ba3e72..1c0a585 100644 --- a/src/test/java/com/frogdevelopment/micronaut/consul/watch/watcher/WatcherTest.java +++ b/src/test/java/com/frogdevelopment/micronaut/consul/watch/watcher/WatcherTest.java @@ -300,7 +300,7 @@ void should_handle_client_error_timeout() { given(consulClient.watchValues("path/to/timeout", false, null)).willReturn(Mono.error(exception)); given(watchConfiguration.getWatchDelay()).willReturn(Duration.ofMillis(500)); - CLASS_LOGGER.setLevel(Level.DEBUG); + CLASS_LOGGER.setLevel(Level.WARN); // when watcher.start(); @@ -313,6 +313,8 @@ void should_handle_client_error_timeout() { then(propertiesChangeHandler).shouldHaveNoInteractions(); softAssertions.assertThat(listAppender.list) + .filteredOn(event -> event.getLevel().equals(Level.WARN)) + .hasSize(1) .extracting(ILoggingEvent::getFormattedMessage) .contains("Timeout for kvPath=path/to/timeout"); }));