Skip to content

Commit

Permalink
Merge pull request #45828 from Ladicek/redis-cache-use-scan-instead-o…
Browse files Browse the repository at this point in the history
…f-keys
  • Loading branch information
cescoffier authored Jan 23, 2025
2 parents afc43d3 + 5cf96c9 commit 7d37ec4
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,36 @@ public void testAllCacheAnnotations() {
TestUtil.allRedisKeys(redisDataSource).size());

// STEP 7
// Action: add 100 cached keys, to make sure the SCAN command in next step requires multiple iterations
// Expected effect: + 100 keys in Redis
// Verified by: comparison with previous number of keys
for (int i = 0; i < 100; i++) {
simpleCachedService.cachedMethod("extra-" + i);
}
assertEquals(allKeysAtStart.size() + 102, TestUtil.allRedisKeys(redisDataSource).size());

// STEP 8
// Action: full cache invalidation.
// Expected effect: empty cache.
// Verified by: STEPS 8 and 9.
// Verified by: comparison with previous number of keys, STEPS 9 and 10.
simpleCachedService.invalidateAll();
newKeys = TestUtil.allRedisKeys(redisDataSource);
assertEquals(allKeysAtStart.size(), newKeys.size());
Assertions.assertThat(newKeys).doesNotContain(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2));

// STEP 8
// STEP 9
// Action: same call as STEP 5.
// Expected effect: method invoked because of STEP 7 and result cached.
// Verified by: different objects references between STEPS 5 and 8 results.
String value8 = simpleCachedService.cachedMethod(KEY_1);
assertNotEquals(value5, value8);
// Expected effect: method invoked because of STEP 8 and result cached.
// Verified by: different objects references between STEPS 5 and 9 results.
String value9 = simpleCachedService.cachedMethod(KEY_1);
assertNotEquals(value5, value9);

// STEP 9
// STEP 10
// Action: same call as STEP 6.
// Expected effect: method invoked because of STEP 7 and result cached.
// Verified by: different objects references between STEPS 6 and 9 results.
String value9 = simpleCachedService.cachedMethod(KEY_2);
assertNotEquals(value6, value9);
// Expected effect: method invoked because of STEP 8 and result cached.
// Verified by: different objects references between STEPS 6 and 10 results.
String value10 = simpleCachedService.cachedMethod(KEY_2);
assertNotEquals(value6, value10);
}

private static String expectedCacheKey(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,34 @@ public void testAllCacheAnnotations() {
assertEquals(allKeysAtStart.size() + 2, TestUtil.allRedisKeys(redisDataSource).size());

// STEP 7
// Action: add 100 cached keys, to make sure the SCAN command in next step requires multiple iterations
// Expected effect: + 100 keys in Redis
// Verified by: comparison with previous number of keys
for (int i = 0; i < 100; i++) {
simpleCachedService.cachedMethod("extra-" + i);
}
assertEquals(allKeysAtStart.size() + 102, TestUtil.allRedisKeys(redisDataSource).size());

// STEP 8
// Action: full cache invalidation.
// Expected effect: empty cache.
// Verified by: STEPS 8 and 9.
// Verified by: comparison with previous number of keys, STEPS 9 and 10.
simpleCachedService.invalidateAll();
assertEquals(allKeysAtStart.size(), TestUtil.allRedisKeys(redisDataSource).size());

// STEP 8
// STEP 9
// Action: same call as STEP 5.
// Expected effect: method invoked because of STEP 7 and result cached.
// Verified by: different objects references between STEPS 5 and 8 results.
String value8 = simpleCachedService.cachedMethod(KEY_1);
assertNotEquals(value5, value8);
// Expected effect: method invoked because of STEP 8 and result cached.
// Verified by: different objects references between STEPS 5 and 9 results.
String value9 = simpleCachedService.cachedMethod(KEY_1);
assertNotEquals(value5, value9);

// STEP 9
// STEP 10
// Action: same call as STEP 6.
// Expected effect: method invoked because of STEP 7 and result cached.
// Verified by: different objects references between STEPS 6 and 9 results.
String value9 = simpleCachedService.cachedMethod(KEY_2);
assertNotEquals(value6, value9);
// Expected effect: method invoked because of STEP 8 and result cached.
// Verified by: different objects references between STEPS 6 and 10 results.
String value10 = simpleCachedService.cachedMethod(KEY_2);
assertNotEquals(value6, value10);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,36 @@ public void testAllCacheAnnotations() {
TestUtil.allRedisKeys(redisDataSource).size());

// STEP 7
// Action: add 100 cached keys, to make sure the SCAN command in next step requires multiple iterations
// Expected effect: + 100 keys in Redis
// Verified by: comparison with previous number of keys
for (int i = 0; i < 100; i++) {
simpleCachedService.cachedMethod("extra-" + i);
}
assertEquals(allKeysAtStart.size() + 102, TestUtil.allRedisKeys(redisDataSource).size());

// STEP 8
// Action: full cache invalidation.
// Expected effect: empty cache.
// Verified by: STEPS 8 and 9.
// Verified by: comparison with previous number of keys, STEPS 9 and 10.
simpleCachedService.invalidateAll();
newKeys = TestUtil.allRedisKeys(redisDataSource);
assertEquals(allKeysAtStart.size(), newKeys.size());
Assertions.assertThat(newKeys).doesNotContain(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2));

// STEP 8
// STEP 9
// Action: same call as STEP 5.
// Expected effect: method invoked because of STEP 7 and result cached.
// Verified by: different objects references between STEPS 5 and 8 results.
String value8 = simpleCachedService.cachedMethod(KEY_1);
assertNotEquals(value5, value8);
// Expected effect: method invoked because of STEP 8 and result cached.
// Verified by: different objects references between STEPS 5 and 9 results.
String value9 = simpleCachedService.cachedMethod(KEY_1);
assertNotEquals(value5, value9);

// STEP 9
// STEP 10
// Action: same call as STEP 6.
// Expected effect: method invoked because of STEP 7 and result cached.
// Verified by: different objects references between STEPS 6 and 9 results.
String value9 = simpleCachedService.cachedMethod(KEY_2);
assertNotEquals(value6, value9);
// Expected effect: method invoked because of STEP 8 and result cached.
// Verified by: different objects references between STEPS 6 and 10 results.
String value10 = simpleCachedService.cachedMethod(KEY_2);
assertNotEquals(value6, value10);
}

private static String expectedCacheKey(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -24,6 +26,7 @@
import io.quarkus.redis.runtime.datasource.Marshaller;
import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.mutiny.unchecked.UncheckedFunction;
import io.smallrye.mutiny.vertx.MutinyHelper;
Expand Down Expand Up @@ -371,29 +374,61 @@ public Uni<Void> invalidateAll() {

@Override
public Uni<Void> invalidateIf(Predicate<Object> predicate) {
return redis.send(Request.cmd(Command.KEYS).arg(getKeyPattern()))
.<List<String>> map(response -> marshaller.decodeAsList(response, String.class))
.chain(new Function<List<String>, Uni<?>>() {
return Uni.createFrom().emitter(new Consumer<UniEmitter<? super Set<String>>>() {
@Override
public void accept(UniEmitter<? super Set<String>> uniEmitter) {
scanForKeys("0", new HashSet<>(), uniEmitter);
}
}).chain(new Function<Set<String>, Uni<?>>() {
@Override
public Uni<?> apply(Set<String> setOfKeys) {
var req = Request.cmd(Command.DEL);
boolean hasAtLeastOneMatch = false;
for (String key : setOfKeys) {
Object userKey = computeUserKey(key);
if (predicate.test(userKey)) {
hasAtLeastOneMatch = true;
req.arg(marshaller.encode(key));
}
}
if (hasAtLeastOneMatch) {
// We cannot send the command without parameters, it would not be a valid command.
return redis.send(req);
} else {
return Uni.createFrom().voidItem();
}
}
})
.replaceWithVoid();
}

private void scanForKeys(String cursor, Set<String> result, UniEmitter<? super Set<String>> em) {
Request cmd = Request.cmd(Command.SCAN).arg(cursor)
.arg("MATCH").arg(getKeyPattern());
if (cacheInfo.invalidationScanSize.isPresent()) {
cmd.arg("COUNT").arg(cacheInfo.invalidationScanSize.getAsInt());
}
redis.send(cmd)
.subscribe().with(new Consumer<Response>() {
@Override
public Uni<?> apply(List<String> listOfKeys) {
var req = Request.cmd(Command.DEL);
boolean hasAtLEastOneMatch = false;
for (String key : listOfKeys) {
Object userKey = computeUserKey(key);
if (predicate.test(userKey)) {
hasAtLEastOneMatch = true;
req.arg(marshaller.encode(key));
}
public void accept(Response response) {
String newCursor = response.get(0).toString();
Response partResponse = response.get(1);
if (partResponse != null) {
result.addAll(marshaller.decodeAsList(partResponse, String.class));
}
if (hasAtLEastOneMatch) {
// We cannot send the command with parameters, it would not be a valid command.
return redis.send(req);
if ("0".equals(newCursor)) {
em.complete(result);
} else {
return Uni.createFrom().voidItem();
scanForKeys(newCursor, result, em);
}
}
})
.replaceWithVoid();
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
em.fail(throwable);
}
});
}

String computeActualKey(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Optional;
import java.util.OptionalInt;

public class RedisCacheInfo {

Expand Down Expand Up @@ -43,4 +44,11 @@ public class RedisCacheInfo {
* Locking</a> for details.
*/
public boolean useOptimisticLocking = false;

/**
* If set, the {@code SCAN} command (used to implement invalidation) will have
* the {@code COUNT} argument with given value. If not set (default),
* no {@code COUNT} argument is present.
*/
public OptionalInt invalidationScanSize = OptionalInt.empty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public static Set<RedisCacheInfo> build(Set<String> cacheNames, RedisCachesConfi
cacheInfo.useOptimisticLocking = defaultRuntimeConfig.useOptimisticLocking.get();
}

if (namedRuntimeConfig != null && namedRuntimeConfig.invalidationScanSize.isPresent()) {
cacheInfo.invalidationScanSize = namedRuntimeConfig.invalidationScanSize;
} else if (defaultRuntimeConfig.invalidationScanSize.isPresent()) {
cacheInfo.invalidationScanSize = defaultRuntimeConfig.invalidationScanSize;
}

result.add(cacheInfo);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Duration;
import java.util.Optional;
import java.util.OptionalInt;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;
Expand Down Expand Up @@ -48,4 +49,11 @@ public class RedisCacheRuntimeConfig {
@ConfigItem
public Optional<Boolean> useOptimisticLocking;

/**
* If set, the {@code SCAN} command (used to implement invalidation) will have
* the {@code COUNT} argument with given value. If not set (default),
* no {@code COUNT} argument is present.
*/
@ConfigItem
public OptionalInt invalidationScanSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ void testInvalidation() {

assertThat(getAllKeys()).hasSize(9);

cache.invalidateIf(o -> o instanceof String && ((String) o).startsWith("key")).await().indefinitely();
cache.invalidateIf(o -> o instanceof String s && s.startsWith("key")).await().indefinitely();
assertThatTheKeyDoesNotExist("cache:test-invalidation:key1");
assertThatTheKeyDoesNotExist("cache:test-invalidation:key2");
assertThatTheKeyDoesExist("key6");
Expand Down

0 comments on commit 7d37ec4

Please sign in to comment.