From 5cf96c905534ebb68ffa7a9d9fe6678c8ff24a18 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 23 Jan 2025 14:18:17 +0100 Subject: [PATCH] Redis Cache: replace the KEYS command with SCAN in invalidation The `KEYS` command in Redis can block the Redis server for quite a while. Moreover, certain implementations of Redis (such as Amazon ElastiCache) do not allow the `KEYS` command at all. Hence, this commit replaces the usage of `KEYS` with `SCAN`, which can possibly take multiple iterations to collect all the keys, but each iteration takes only a short while and in between iterations, other commands may be invoked against the Redis server. The `COUNT` argument may be configured; by default, it is not included in the `SCAN` command at all. --- .../redis/deployment/BasicRedisCacheTest.java | 31 +++++--- .../redis/deployment/NamedRedisCacheTest.java | 31 +++++--- .../RedisCacheWithOptimisticLockingTest.java | 31 +++++--- .../cache/redis/runtime/RedisCacheImpl.java | 73 ++++++++++++++----- .../cache/redis/runtime/RedisCacheInfo.java | 8 ++ .../redis/runtime/RedisCacheInfoBuilder.java | 6 ++ .../runtime/RedisCacheRuntimeConfig.java | 8 ++ .../redis/runtime/RedisCacheImplTest.java | 2 +- 8 files changed, 137 insertions(+), 53 deletions(-) diff --git a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/BasicRedisCacheTest.java b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/BasicRedisCacheTest.java index 67a08643f0683..432478b2b57ec 100644 --- a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/BasicRedisCacheTest.java +++ b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/BasicRedisCacheTest.java @@ -107,27 +107,36 @@ public void testAllCacheAnnotations() { TestUtil.allRedisKeys(redisDataSource).size()); // STEP 7 + // Action: add 100 cached keys, to make sure the SCAN command in next step requires multiple iterations + // Expected effect: + 100 keys in Redis + // Verified by: comparison with previous number of keys + for (int i = 0; i < 100; i++) { + simpleCachedService.cachedMethod("extra-" + i); + } + assertEquals(allKeysAtStart.size() + 102, TestUtil.allRedisKeys(redisDataSource).size()); + + // STEP 8 // Action: full cache invalidation. // Expected effect: empty cache. - // Verified by: STEPS 8 and 9. + // Verified by: comparison with previous number of keys, STEPS 9 and 10. simpleCachedService.invalidateAll(); newKeys = TestUtil.allRedisKeys(redisDataSource); assertEquals(allKeysAtStart.size(), newKeys.size()); Assertions.assertThat(newKeys).doesNotContain(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2)); - // STEP 8 + // STEP 9 // Action: same call as STEP 5. - // Expected effect: method invoked because of STEP 7 and result cached. - // Verified by: different objects references between STEPS 5 and 8 results. - String value8 = simpleCachedService.cachedMethod(KEY_1); - assertNotEquals(value5, value8); + // Expected effect: method invoked because of STEP 8 and result cached. + // Verified by: different objects references between STEPS 5 and 9 results. + String value9 = simpleCachedService.cachedMethod(KEY_1); + assertNotEquals(value5, value9); - // STEP 9 + // STEP 10 // Action: same call as STEP 6. - // Expected effect: method invoked because of STEP 7 and result cached. - // Verified by: different objects references between STEPS 6 and 9 results. - String value9 = simpleCachedService.cachedMethod(KEY_2); - assertNotEquals(value6, value9); + // Expected effect: method invoked because of STEP 8 and result cached. + // Verified by: different objects references between STEPS 6 and 10 results. + String value10 = simpleCachedService.cachedMethod(KEY_2); + assertNotEquals(value6, value10); } private static String expectedCacheKey(String key) { diff --git a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/NamedRedisCacheTest.java b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/NamedRedisCacheTest.java index 5e03be1ef6e8d..988ae7e6752d6 100644 --- a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/NamedRedisCacheTest.java +++ b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/NamedRedisCacheTest.java @@ -84,25 +84,34 @@ public void testAllCacheAnnotations() { assertEquals(allKeysAtStart.size() + 2, TestUtil.allRedisKeys(redisDataSource).size()); // STEP 7 + // Action: add 100 cached keys, to make sure the SCAN command in next step requires multiple iterations + // Expected effect: + 100 keys in Redis + // Verified by: comparison with previous number of keys + for (int i = 0; i < 100; i++) { + simpleCachedService.cachedMethod("extra-" + i); + } + assertEquals(allKeysAtStart.size() + 102, TestUtil.allRedisKeys(redisDataSource).size()); + + // STEP 8 // Action: full cache invalidation. // Expected effect: empty cache. - // Verified by: STEPS 8 and 9. + // Verified by: comparison with previous number of keys, STEPS 9 and 10. simpleCachedService.invalidateAll(); assertEquals(allKeysAtStart.size(), TestUtil.allRedisKeys(redisDataSource).size()); - // STEP 8 + // STEP 9 // Action: same call as STEP 5. - // Expected effect: method invoked because of STEP 7 and result cached. - // Verified by: different objects references between STEPS 5 and 8 results. - String value8 = simpleCachedService.cachedMethod(KEY_1); - assertNotEquals(value5, value8); + // Expected effect: method invoked because of STEP 8 and result cached. + // Verified by: different objects references between STEPS 5 and 9 results. + String value9 = simpleCachedService.cachedMethod(KEY_1); + assertNotEquals(value5, value9); - // STEP 9 + // STEP 10 // Action: same call as STEP 6. - // Expected effect: method invoked because of STEP 7 and result cached. - // Verified by: different objects references between STEPS 6 and 9 results. - String value9 = simpleCachedService.cachedMethod(KEY_2); - assertNotEquals(value6, value9); + // Expected effect: method invoked because of STEP 8 and result cached. + // Verified by: different objects references between STEPS 6 and 10 results. + String value10 = simpleCachedService.cachedMethod(KEY_2); + assertNotEquals(value6, value10); } } diff --git a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java index 10362aa07c000..fb93f4193e95e 100644 --- a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java +++ b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java @@ -105,27 +105,36 @@ public void testAllCacheAnnotations() { TestUtil.allRedisKeys(redisDataSource).size()); // STEP 7 + // Action: add 100 cached keys, to make sure the SCAN command in next step requires multiple iterations + // Expected effect: + 100 keys in Redis + // Verified by: comparison with previous number of keys + for (int i = 0; i < 100; i++) { + simpleCachedService.cachedMethod("extra-" + i); + } + assertEquals(allKeysAtStart.size() + 102, TestUtil.allRedisKeys(redisDataSource).size()); + + // STEP 8 // Action: full cache invalidation. // Expected effect: empty cache. - // Verified by: STEPS 8 and 9. + // Verified by: comparison with previous number of keys, STEPS 9 and 10. simpleCachedService.invalidateAll(); newKeys = TestUtil.allRedisKeys(redisDataSource); assertEquals(allKeysAtStart.size(), newKeys.size()); Assertions.assertThat(newKeys).doesNotContain(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2)); - // STEP 8 + // STEP 9 // Action: same call as STEP 5. - // Expected effect: method invoked because of STEP 7 and result cached. - // Verified by: different objects references between STEPS 5 and 8 results. - String value8 = simpleCachedService.cachedMethod(KEY_1); - assertNotEquals(value5, value8); + // Expected effect: method invoked because of STEP 8 and result cached. + // Verified by: different objects references between STEPS 5 and 9 results. + String value9 = simpleCachedService.cachedMethod(KEY_1); + assertNotEquals(value5, value9); - // STEP 9 + // STEP 10 // Action: same call as STEP 6. - // Expected effect: method invoked because of STEP 7 and result cached. - // Verified by: different objects references between STEPS 6 and 9 results. - String value9 = simpleCachedService.cachedMethod(KEY_2); - assertNotEquals(value6, value9); + // Expected effect: method invoked because of STEP 8 and result cached. + // Verified by: different objects references between STEPS 6 and 10 results. + String value10 = simpleCachedService.cachedMethod(KEY_2); + assertNotEquals(value6, value10); } private static String expectedCacheKey(String key) { diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java index fc9d3460f26b1..bfcfb77377b1f 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java @@ -4,9 +4,11 @@ import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.List; +import java.util.HashSet; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -24,6 +26,7 @@ import io.quarkus.redis.runtime.datasource.Marshaller; import io.quarkus.runtime.BlockingOperationControl; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniEmitter; import io.smallrye.mutiny.unchecked.Unchecked; import io.smallrye.mutiny.unchecked.UncheckedFunction; import io.smallrye.mutiny.vertx.MutinyHelper; @@ -371,29 +374,61 @@ public Uni invalidateAll() { @Override public Uni invalidateIf(Predicate predicate) { - return redis.send(Request.cmd(Command.KEYS).arg(getKeyPattern())) - .> map(response -> marshaller.decodeAsList(response, String.class)) - .chain(new Function, Uni>() { + return Uni.createFrom().emitter(new Consumer>>() { + @Override + public void accept(UniEmitter> uniEmitter) { + scanForKeys("0", new HashSet<>(), uniEmitter); + } + }).chain(new Function, Uni>() { + @Override + public Uni apply(Set setOfKeys) { + var req = Request.cmd(Command.DEL); + boolean hasAtLeastOneMatch = false; + for (String key : setOfKeys) { + Object userKey = computeUserKey(key); + if (predicate.test(userKey)) { + hasAtLeastOneMatch = true; + req.arg(marshaller.encode(key)); + } + } + if (hasAtLeastOneMatch) { + // We cannot send the command without parameters, it would not be a valid command. + return redis.send(req); + } else { + return Uni.createFrom().voidItem(); + } + } + }) + .replaceWithVoid(); + } + + private void scanForKeys(String cursor, Set result, UniEmitter> em) { + Request cmd = Request.cmd(Command.SCAN).arg(cursor) + .arg("MATCH").arg(getKeyPattern()); + if (cacheInfo.invalidationScanSize.isPresent()) { + cmd.arg("COUNT").arg(cacheInfo.invalidationScanSize.getAsInt()); + } + redis.send(cmd) + .subscribe().with(new Consumer() { @Override - public Uni apply(List listOfKeys) { - var req = Request.cmd(Command.DEL); - boolean hasAtLEastOneMatch = false; - for (String key : listOfKeys) { - Object userKey = computeUserKey(key); - if (predicate.test(userKey)) { - hasAtLEastOneMatch = true; - req.arg(marshaller.encode(key)); - } + public void accept(Response response) { + String newCursor = response.get(0).toString(); + Response partResponse = response.get(1); + if (partResponse != null) { + result.addAll(marshaller.decodeAsList(partResponse, String.class)); } - if (hasAtLEastOneMatch) { - // We cannot send the command with parameters, it would not be a valid command. - return redis.send(req); + if ("0".equals(newCursor)) { + em.complete(result); } else { - return Uni.createFrom().voidItem(); + scanForKeys(newCursor, result, em); } } - }) - .replaceWithVoid(); + }, new Consumer() { + @Override + public void accept(Throwable throwable) { + em.fail(throwable); + } + }); } String computeActualKey(String key) { diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfo.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfo.java index e47448ccd2fbf..1efeaff5126e5 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfo.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfo.java @@ -3,6 +3,7 @@ import java.lang.reflect.Type; import java.time.Duration; import java.util.Optional; +import java.util.OptionalInt; public class RedisCacheInfo { @@ -43,4 +44,11 @@ public class RedisCacheInfo { * Locking for details. */ public boolean useOptimisticLocking = false; + + /** + * If set, the {@code SCAN} command (used to implement invalidation) will have + * the {@code COUNT} argument with given value. If not set (default), + * no {@code COUNT} argument is present. + */ + public OptionalInt invalidationScanSize = OptionalInt.empty(); } diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfoBuilder.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfoBuilder.java index e457d348bcdec..6ec8ddbe13dc0 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfoBuilder.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheInfoBuilder.java @@ -61,6 +61,12 @@ public static Set build(Set cacheNames, RedisCachesConfi cacheInfo.useOptimisticLocking = defaultRuntimeConfig.useOptimisticLocking.get(); } + if (namedRuntimeConfig != null && namedRuntimeConfig.invalidationScanSize.isPresent()) { + cacheInfo.invalidationScanSize = namedRuntimeConfig.invalidationScanSize; + } else if (defaultRuntimeConfig.invalidationScanSize.isPresent()) { + cacheInfo.invalidationScanSize = defaultRuntimeConfig.invalidationScanSize; + } + result.add(cacheInfo); } return result; diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheRuntimeConfig.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheRuntimeConfig.java index a82902c308a74..2a2afe6860483 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheRuntimeConfig.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheRuntimeConfig.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.util.Optional; +import java.util.OptionalInt; import io.quarkus.runtime.annotations.ConfigGroup; import io.quarkus.runtime.annotations.ConfigItem; @@ -48,4 +49,11 @@ public class RedisCacheRuntimeConfig { @ConfigItem public Optional useOptimisticLocking; + /** + * If set, the {@code SCAN} command (used to implement invalidation) will have + * the {@code COUNT} argument with given value. If not set (default), + * no {@code COUNT} argument is present. + */ + @ConfigItem + public OptionalInt invalidationScanSize; } diff --git a/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java b/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java index b4c8583c0d77e..f2ca7a5f5f816 100644 --- a/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java +++ b/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java @@ -572,7 +572,7 @@ void testInvalidation() { assertThat(getAllKeys()).hasSize(9); - cache.invalidateIf(o -> o instanceof String && ((String) o).startsWith("key")).await().indefinitely(); + cache.invalidateIf(o -> o instanceof String s && s.startsWith("key")).await().indefinitely(); assertThatTheKeyDoesNotExist("cache:test-invalidation:key1"); assertThatTheKeyDoesNotExist("cache:test-invalidation:key2"); assertThatTheKeyDoesExist("key6");