Skip to content
This repository has been archived by the owner on Jan 4, 2025. It is now read-only.

[consul] Add wait params to blocked queries #13

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ consul:

watch:
disabled: false # to disable the watcher, during test for instance
retry-count: 3 # The maximum number of retry attempts
retry-delay: 1s # The delay between retry attempts
read-timeout: 10m # Sets the watch timeout
wait-timeout: 10m # Sets the maximum duration for the blocking request
watch-delay: 500ms # Sets the watch delay before each call to avoid flooding
```

Expand Down
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies {
implementation(mn.micronaut.serde.jackson)
implementation(mn.micronaut.discovery.client)
implementation(mn.micronaut.reactor)
implementation(mn.micronaut.retry)
implementation(mn.guava)

// ----------- TESTS -----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.micronaut.http.client.HttpClientConfiguration;
Expand All @@ -13,54 +14,95 @@
@ConfigurationProperties(WatchConfiguration.PREFIX)
public class WatchConfiguration extends HttpClientConfiguration {

public static final String EXPR_CONSUL_WATCH_RETRY_COUNT = "${" + WatchConfiguration.PREFIX + ".retry-count:3}";
public static final String EXPR_CONSUL_WATCH_RETRY_DELAY = "${" + WatchConfiguration.PREFIX + ".retry-delay:1s}";

/**
* The default block timeout in minutes.
* The default wait timeout in minutes.
*/
public static final long DEFAULT_BLOCK_TIMEOUT_MINUTES = 10;
public static final String DEFAULT_WAIT_TIMEOUT_MINUTES = "10m";

/**
* The default watch delay in milliseconds.
*/
public static final long DEFAULT_WATCH_DELAY_MILLISECONDS = 500;

/**
* The prefix to use for all Consul settings.
*/
public static final String PREFIX = "consul.watch";

private Duration readTimeout = Duration.ofMinutes(DEFAULT_BLOCK_TIMEOUT_MINUTES);
private String waitTimeout = DEFAULT_WAIT_TIMEOUT_MINUTES;
private Duration watchDelay = Duration.ofSeconds(DEFAULT_WATCH_DELAY_MILLISECONDS);
private Duration readTimeout = null;

private final ConsulConfiguration consulConfiguration;
private final ConversionService conversionService;

public WatchConfiguration(final ConsulConfiguration consulConfiguration) {
/**
* Default constructor
*
* @param consulConfiguration {@link ConsulConfiguration} use as base.
* @param conversionService Use to calculate the {@link #readTimeout} from the {@link #waitTimeout}.
* @see ConsulConfiguration
*/
public WatchConfiguration(final ConsulConfiguration consulConfiguration,
final ConversionService conversionService) {
super(consulConfiguration);
this.consulConfiguration = consulConfiguration;
this.conversionService = conversionService;
}

@Override
public ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return consulConfiguration.getConnectionPoolConfiguration();
}

/**
* @return The read timeout, depending on the {@link #waitTimeout} value.
*/
@Override
public Optional<Duration> getReadTimeout() {
return Optional.ofNullable(readTimeout);
if (this.readTimeout == null) {
this.readTimeout = calculateReadTimeout();
}
return Optional.of(this.readTimeout);
}

@Override
public void setReadTimeout(@Nullable final Duration readTimeout) {
this.readTimeout = readTimeout;
private Duration calculateReadTimeout() {
final var waitValue = Optional.ofNullable(getWaitTimeout())
.orElse(DEFAULT_WAIT_TIMEOUT_MINUTES);

final var duration = conversionService.convertRequired(waitValue, Duration.class);
// to have the client timeout greater than the wait of the Blocked Query
return duration.plusMillis(duration.toMillis() / 16);
}

/**
* @return The wait timeout. Defaults to {@value DEFAULT_WAIT_TIMEOUT_MINUTES}.
*/
public String getWaitTimeout() {
return this.waitTimeout;
}

/**
* Specify the maximum duration for the blocking request. Default value ({@value #DEFAULT_WAIT_TIMEOUT_MINUTES}).
*
* @param waitTimeout The wait timeout
*/
public void setWaitTimeout(@Nullable final String waitTimeout) {
this.waitTimeout = waitTimeout;
this.readTimeout = calculateReadTimeout();
}

/**
* @return The watch delay. Defaults to {@value DEFAULT_WATCH_DELAY_MILLISECONDS} milliseconds.
*/
public Duration getWatchDelay() {
return watchDelay;
return this.watchDelay;
}

/**
* Sets the watch delay before each call to avoid flooding. Default value ({@value #DEFAULT_WATCH_DELAY_MILLISECONDS} milliseconds).
*
* @param watchDelay The read timeout
* @param watchDelay The watch delay
*/
public void setWatchDelay(final Duration watchDelay) {
this.watchDelay = watchDelay;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import io.micronaut.http.annotation.FilterMatcher;

@FilterMatcher
@Documented
@Retention(RUNTIME)
@Target({METHOD, TYPE})
public @interface BlockedQuery {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import lombok.RequiredArgsConstructor;

import jakarta.inject.Singleton;

import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;

import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.annotation.ClientFilter;
import io.micronaut.http.annotation.RequestFilter;

@BlockedQuery
@Singleton
@ClientFilter
@RequiredArgsConstructor
public class BlockedQueryClientFilter {

private final WatchConfiguration watchConfiguration;

@RequestFilter
public void filter(final MutableHttpRequest<?> request) {
final var parameters = request.getParameters();
if (parameters.contains("index")) {
parameters.add("wait", watchConfiguration.getWaitTimeout());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.retry.annotation.Retryable;
import reactor.core.publisher.Mono;

@Requires(beans = WatchConfiguration.class)
@Client(id = ConsulClient.SERVICE_ID, path = "/v1", configuration = WatchConfiguration.class)
public interface WatchConsulClient {

@BlockedQuery
@Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true)
@Retryable(
attempts = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_COUNT,
delay = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_DELAY
)
Mono<List<KeyValue>> watchValues(String key, @Nullable @QueryValue Boolean recurse, @Nullable @QueryValue Integer index);
Mono<List<KeyValue>> watchValues(String key,
@Nullable @QueryValue Boolean recurse,
@Nullable @QueryValue Integer index);

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void onError(final String kvPath, final Throwable throwable) {
log.trace("No KV found with kvPath={}", kvPath);
listeners.remove(kvPath);
} else if (throwable instanceof ReadTimeoutException) {
log.debug("Timeout for kvPath={}", kvPath);
log.warn("Timeout for kvPath={}", kvPath);
watchKvPath(kvPath);
} else {
log.error("Watching kvPath={} failed", kvPath, throwable);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.frogdevelopment.micronaut.consul.watch;

import static com.frogdevelopment.micronaut.consul.watch.WatchConfiguration.DEFAULT_WAIT_TIMEOUT_MINUTES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.times;

import java.time.Duration;
import java.util.Optional;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.reflect.ReflectionUtils;
import io.micronaut.discovery.consul.ConsulConfiguration;

@ExtendWith(MockitoExtension.class)
class WatchConfigurationTest {

@InjectMocks
private WatchConfiguration watchConfiguration;

@Mock
private ConsulConfiguration consulConfiguration;
@Mock
private ConversionService conversionService;

@Test
void should_useDefaultWatchTimeout() {
// given
given(conversionService.convertRequired(DEFAULT_WAIT_TIMEOUT_MINUTES, Duration.class)).willReturn(Duration.ofMinutes(10));

// when
final var readTimeout1 = watchConfiguration.getReadTimeout();
final var readTimeout2 = watchConfiguration.getReadTimeout();

// then
assertThat(readTimeout1)
.hasValueSatisfying(value -> assertThat(value).hasSeconds(637));
assertThat(readTimeout2).isEqualTo(readTimeout1);
then(conversionService).should(times(1)).convertRequired(DEFAULT_WAIT_TIMEOUT_MINUTES, Duration.class);
}

@Test
void should_calculateReadTimeout_when_settingWaitTimeout() {
// given
assertThat(getReadTimeoutValue()).isEmpty();
given(conversionService.convertRequired("16s", Duration.class)).willReturn(Duration.ofSeconds(16));

// when
watchConfiguration.setWaitTimeout("16s");

// then
assertThat(getReadTimeoutValue()).hasValue(Duration.ofSeconds(17));
}

private Optional<Object> getReadTimeoutValue() {
return ReflectionUtils.getFieldValue(WatchConfiguration.class, "readTimeout", watchConfiguration);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;

import io.micronaut.http.MutableHttpParameters;
import io.micronaut.http.MutableHttpRequest;

@ExtendWith(MockitoExtension.class)
class BlockedQueryClientFilterTest {

@InjectMocks
private BlockedQueryClientFilter blockedQueryClientFilter;

@Mock
private WatchConfiguration watchConfiguration;
@Mock
private MutableHttpRequest<?> request;
@Mock
private MutableHttpParameters parameters;

@Test
void should_doNothing_when_indexIsNotPresent() {
// given
given(request.getParameters()).willReturn(parameters);
given(parameters.contains("index")).willReturn(false);

// when
blockedQueryClientFilter.filter(request);

// then
then(parameters).shouldHaveNoMoreInteractions();
then(watchConfiguration).shouldHaveNoInteractions();
}

@Test
void should_addWait_when_indexIsPresent() {
// given
given(request.getParameters()).willReturn(parameters);
given(parameters.contains("index")).willReturn(true);
given(watchConfiguration.getWaitTimeout()).willReturn("666s");

// when
blockedQueryClientFilter.filter(request);

// then
then(parameters).should().add("wait", "666s");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ void should_handle_client_error_timeout() {
given(consulClient.watchValues("path/to/timeout", false, null)).willReturn(Mono.error(exception));
given(watchConfiguration.getWatchDelay()).willReturn(Duration.ofMillis(500));

CLASS_LOGGER.setLevel(Level.DEBUG);
CLASS_LOGGER.setLevel(Level.WARN);

// when
watcher.start();
Expand All @@ -313,6 +313,8 @@ void should_handle_client_error_timeout() {
then(propertiesChangeHandler).shouldHaveNoInteractions();

softAssertions.assertThat(listAppender.list)
.filteredOn(event -> event.getLevel().equals(Level.WARN))
.hasSize(1)
.extracting(ILoggingEvent::getFormattedMessage)
.contains("Timeout for kvPath=path/to/timeout");
}));
Expand Down
Loading