From 8e3e946a88cdda412647b629e282dd60f997662f Mon Sep 17 00:00:00 2001 From: Maxim-Gadalov Date: Mon, 4 Mar 2024 11:42:38 +0100 Subject: [PATCH 1/6] fix: investigate missing resource issue --- .../epam/aidial/core/service/ResourceService.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/service/ResourceService.java b/src/main/java/com/epam/aidial/core/service/ResourceService.java index 185b97829..adb5f7145 100644 --- a/src/main/java/com/epam/aidial/core/service/ResourceService.java +++ b/src/main/java/com/epam/aidial/core/service/ResourceService.java @@ -182,16 +182,20 @@ public String getResource(ResourceDescription descriptor) { @Nullable public String getResource(ResourceDescription descriptor, boolean lock) { + String resourceUrl = descriptor.getUrl(); + log.info("Downloading resource {}", resourceUrl); String redisKey = redisKey(descriptor); Result result = redisGet(redisKey, true); if (result == null) { + log.info("Resource not found in redis {}", resourceUrl); try (var ignore = lock ? lockService.lock(redisKey) : null) { result = redisGet(redisKey, true); if (result == null) { String blobKey = blobKey(descriptor); result = blobGet(blobKey, true); + log.info("Resource loaded from storage {}, exists {}, is empty {}", resourceUrl, result.exists, result.body.isEmpty()); redisPut(redisKey, result); } } @@ -206,12 +210,15 @@ public ResourceItemMetadata putResource(ResourceDescription descriptor, String b public ResourceItemMetadata putResource(ResourceDescription descriptor, String body, boolean overwrite, boolean lock) { + String resourceUrl = descriptor.getUrl(); + log.info("Saving resource {}", resourceUrl); String redisKey = redisKey(descriptor); String blobKey = blobKey(descriptor); try (var ignore = lock ? lockService.lock(redisKey) : null) { Result result = redisGet(redisKey, false); if (result == null) { + log.info("Resource {} not found in redis", resourceUrl); result = blobGet(blobKey, false); } @@ -222,9 +229,11 @@ public ResourceItemMetadata putResource(ResourceDescription descriptor, String b long updatedAt = time(); long createdAt = result.exists ? result.createdAt : updatedAt; redisPut(redisKey, new Result(body, createdAt, updatedAt, false, true)); + log.info("Resource saved in redis {}", resourceUrl); if (!result.exists) { blobPut(blobKey, "", createdAt, updatedAt); // create an empty object for listing + log.info("Resource with empty body saved on disc {}", resourceUrl); } return new ResourceItemMetadata(descriptor).setCreatedAt(createdAt).setUpdatedAt(updatedAt); @@ -298,11 +307,11 @@ private void sync(String redisKey) { String blobKey = blobKeyFromRedisKey(redisKey); if (result.exists) { - log.debug("Syncing resource: {}. Blob updating", redisKey); + log.info("Syncing resource: {}. Blob updating", redisKey); result = redisGet(redisKey, true); blobPut(blobKey, result.body, result.createdAt, result.updatedAt); } else { - log.debug("Syncing resource: {}. Blob deleting", redisKey); + log.info("Syncing resource: {}. Blob deleting", redisKey); blobDelete(blobKey); } From c83439521fffb490985de91d6b6445f720b089ee Mon Sep 17 00:00:00 2001 From: Maxim-Gadalov Date: Mon, 4 Mar 2024 16:03:38 +0100 Subject: [PATCH 2/6] add more logs --- .../java/com/epam/aidial/core/service/ResourceService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/epam/aidial/core/service/ResourceService.java b/src/main/java/com/epam/aidial/core/service/ResourceService.java index adb5f7145..232fabddb 100644 --- a/src/main/java/com/epam/aidial/core/service/ResourceService.java +++ b/src/main/java/com/epam/aidial/core/service/ResourceService.java @@ -282,6 +282,7 @@ private Void sync() { long now = time(); for (String redisKey : set.valueRange(Double.NEGATIVE_INFINITY, true, now, true, 0, syncBatch)) { + log.info("Redis key from queue {}", redisKey); sync(redisKey); } } catch (Throwable e) { @@ -431,6 +432,7 @@ private void redisPut(String key, Result result) { map.putAll(fields); if (result.synced) { // cleanup because it is already synced + log.info("RedisPut key removed from queue {}", key); map.expire(cacheExpiration); set.remove(key); } @@ -442,6 +444,7 @@ private void redisSync(String key) { map.expire(cacheExpiration); RScoredSortedSet set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE); + log.info("RedisSync key removed from queue {}", key); set.remove(key); } From 25d116365f5f9ac4b457ec559a839f88167f2f68 Mon Sep 17 00:00:00 2001 From: Maxim-Gadalov Date: Mon, 4 Mar 2024 17:05:16 +0100 Subject: [PATCH 3/6] fix redis queue --- .../epam/aidial/core/service/ResourceService.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/service/ResourceService.java b/src/main/java/com/epam/aidial/core/service/ResourceService.java index 232fabddb..63987ed50 100644 --- a/src/main/java/com/epam/aidial/core/service/ResourceService.java +++ b/src/main/java/com/epam/aidial/core/service/ResourceService.java @@ -278,7 +278,7 @@ public boolean deleteResource(ResourceDescription descriptor) { private Void sync() { log.debug("Syncing"); try { - RScoredSortedSet set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE); + RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); long now = time(); for (String redisKey : set.valueRange(Double.NEGATIVE_INFINITY, true, now, true, 0, syncBatch)) { @@ -302,7 +302,7 @@ private void sync(String redisKey) { Result result = redisGet(redisKey, false); if (result == null || result.synced) { redis.getMap(redisKey, StringCodec.INSTANCE).expireIfNotSet(cacheExpiration); - redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE).remove(redisKey); + redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE).remove(redisKey); return; } @@ -413,7 +413,7 @@ private Result redisGet(String key, boolean withBody) { } private void redisPut(String key, Result result) { - RScoredSortedSet set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE); + RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); set.add(time() + syncDelay, key); // add resource to sync set before changing because calls below can fail RMap map = redis.getMap(key, StringCodec.INSTANCE); @@ -443,11 +443,15 @@ private void redisSync(String key) { map.put("synced", "true"); map.expire(cacheExpiration); - RScoredSortedSet set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE); + RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); log.info("RedisSync key removed from queue {}", key); set.remove(key); } + private String getRedisQueue() { + return BlobStorageUtil.toStoragePath(prefix, REDIS_QUEUE); + } + private String redisKey(ResourceDescription descriptor) { String resourcePath = BlobStorageUtil.toStoragePath(prefix, descriptor.getAbsoluteFilePath()); return descriptor.getType().name().toLowerCase() + ":" + resourcePath; From 34dc1e6659ece630d0ece286adcfd87b2f69abe5 Mon Sep 17 00:00:00 2001 From: Maxim-Gadalov Date: Tue, 5 Mar 2024 18:13:33 +0100 Subject: [PATCH 4/6] clean up logs --- .../aidial/core/service/ResourceService.java | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/service/ResourceService.java b/src/main/java/com/epam/aidial/core/service/ResourceService.java index 63987ed50..f99c8100f 100644 --- a/src/main/java/com/epam/aidial/core/service/ResourceService.java +++ b/src/main/java/com/epam/aidial/core/service/ResourceService.java @@ -182,20 +182,16 @@ public String getResource(ResourceDescription descriptor) { @Nullable public String getResource(ResourceDescription descriptor, boolean lock) { - String resourceUrl = descriptor.getUrl(); - log.info("Downloading resource {}", resourceUrl); String redisKey = redisKey(descriptor); Result result = redisGet(redisKey, true); if (result == null) { - log.info("Resource not found in redis {}", resourceUrl); try (var ignore = lock ? lockService.lock(redisKey) : null) { result = redisGet(redisKey, true); if (result == null) { String blobKey = blobKey(descriptor); result = blobGet(blobKey, true); - log.info("Resource loaded from storage {}, exists {}, is empty {}", resourceUrl, result.exists, result.body.isEmpty()); redisPut(redisKey, result); } } @@ -210,15 +206,12 @@ public ResourceItemMetadata putResource(ResourceDescription descriptor, String b public ResourceItemMetadata putResource(ResourceDescription descriptor, String body, boolean overwrite, boolean lock) { - String resourceUrl = descriptor.getUrl(); - log.info("Saving resource {}", resourceUrl); String redisKey = redisKey(descriptor); String blobKey = blobKey(descriptor); try (var ignore = lock ? lockService.lock(redisKey) : null) { Result result = redisGet(redisKey, false); if (result == null) { - log.info("Resource {} not found in redis", resourceUrl); result = blobGet(blobKey, false); } @@ -229,11 +222,9 @@ public ResourceItemMetadata putResource(ResourceDescription descriptor, String b long updatedAt = time(); long createdAt = result.exists ? result.createdAt : updatedAt; redisPut(redisKey, new Result(body, createdAt, updatedAt, false, true)); - log.info("Resource saved in redis {}", resourceUrl); if (!result.exists) { blobPut(blobKey, "", createdAt, updatedAt); // create an empty object for listing - log.info("Resource with empty body saved on disc {}", resourceUrl); } return new ResourceItemMetadata(descriptor).setCreatedAt(createdAt).setUpdatedAt(updatedAt); @@ -282,7 +273,6 @@ private Void sync() { long now = time(); for (String redisKey : set.valueRange(Double.NEGATIVE_INFINITY, true, now, true, 0, syncBatch)) { - log.info("Redis key from queue {}", redisKey); sync(redisKey); } } catch (Throwable e) { @@ -308,11 +298,11 @@ private void sync(String redisKey) { String blobKey = blobKeyFromRedisKey(redisKey); if (result.exists) { - log.info("Syncing resource: {}. Blob updating", redisKey); + log.debug("Syncing resource: {}. Blob updating", redisKey); result = redisGet(redisKey, true); blobPut(blobKey, result.body, result.createdAt, result.updatedAt); } else { - log.info("Syncing resource: {}. Blob deleting", redisKey); + log.debug("Syncing resource: {}. Blob deleting", redisKey); blobDelete(blobKey); } @@ -432,7 +422,6 @@ private void redisPut(String key, Result result) { map.putAll(fields); if (result.synced) { // cleanup because it is already synced - log.info("RedisPut key removed from queue {}", key); map.expire(cacheExpiration); set.remove(key); } @@ -444,7 +433,6 @@ private void redisSync(String key) { map.expire(cacheExpiration); RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); - log.info("RedisSync key removed from queue {}", key); set.remove(key); } From d9ed93d77ef10e1e19fe4ffed50b516c23cbbbb9 Mon Sep 17 00:00:00 2001 From: Maxim-Gadalov Date: Tue, 5 Mar 2024 21:35:00 +0100 Subject: [PATCH 5/6] address review comments --- .../epam/aidial/core/service/ResourceService.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/service/ResourceService.java b/src/main/java/com/epam/aidial/core/service/ResourceService.java index dbdbd6a28..cf2429489 100644 --- a/src/main/java/com/epam/aidial/core/service/ResourceService.java +++ b/src/main/java/com/epam/aidial/core/service/ResourceService.java @@ -34,7 +34,6 @@ @Slf4j public class ResourceService implements AutoCloseable { - private static final String REDIS_QUEUE = "resource:queue"; private static final Set REDIS_FIELDS = Set.of("body", "created_at", "updated_at", "synced", "exists"); private static final Set REDIS_FIELDS_NO_BODY = Set.of("created_at", "updated_at", "synced", "exists"); @@ -52,6 +51,7 @@ public class ResourceService implements AutoCloseable { private final Duration cacheExpiration; private final int compressionMinSize; private final String prefix; + private final String resourceQueue; public ResourceService(Vertx vertx, RedissonClient redis, @@ -99,6 +99,7 @@ public ResourceService(Vertx vertx, this.cacheExpiration = Duration.ofMillis(cacheExpiration); this.compressionMinSize = compressionMinSize; this.prefix = prefix; + this.resourceQueue = BlobStorageUtil.toStoragePath(prefix, "resource:queue"); // vertex timer is called from event loop, so sync is done in worker thread to not block event loop this.syncTimer = vertx.setPeriodic(syncPeriod, syncPeriod, ignore -> vertx.executeBlocking(() -> sync())); @@ -267,7 +268,7 @@ public boolean deleteResource(ResourceDescription descriptor) { private Void sync() { log.debug("Syncing"); try { - RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); + RScoredSortedSet set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE); long now = time(); for (String redisKey : set.valueRange(Double.NEGATIVE_INFINITY, true, now, true, 0, syncBatch)) { @@ -290,7 +291,7 @@ private void sync(String redisKey) { Result result = redisGet(redisKey, false); if (result == null || result.synced) { redis.getMap(redisKey, StringCodec.INSTANCE).expireIfNotSet(cacheExpiration); - redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE).remove(redisKey); + redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE).remove(redisKey); return; } @@ -396,7 +397,7 @@ private Result redisGet(String key, boolean withBody) { } private void redisPut(String key, Result result) { - RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); + RScoredSortedSet set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE); set.add(time() + syncDelay, key); // add resource to sync set before changing because calls below can fail RMap map = redis.getMap(key, StringCodec.INSTANCE); @@ -425,14 +426,10 @@ private void redisSync(String key) { map.put("synced", "true"); map.expire(cacheExpiration); - RScoredSortedSet set = redis.getScoredSortedSet(getRedisQueue(), StringCodec.INSTANCE); + RScoredSortedSet set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE); set.remove(key); } - private String getRedisQueue() { - return BlobStorageUtil.toStoragePath(prefix, REDIS_QUEUE); - } - private String redisKey(ResourceDescription descriptor) { String resourcePath = BlobStorageUtil.toStoragePath(prefix, descriptor.getAbsoluteFilePath()); return descriptor.getType().name().toLowerCase() + ":" + resourcePath; From 0f07a6a98283f6b664809d29a933812aca8741f0 Mon Sep 17 00:00:00 2001 From: Maxim-Gadalov Date: Tue, 5 Mar 2024 22:49:23 +0100 Subject: [PATCH 6/6] address review comments --- src/main/java/com/epam/aidial/core/service/ResourceService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/epam/aidial/core/service/ResourceService.java b/src/main/java/com/epam/aidial/core/service/ResourceService.java index cf2429489..2659b4857 100644 --- a/src/main/java/com/epam/aidial/core/service/ResourceService.java +++ b/src/main/java/com/epam/aidial/core/service/ResourceService.java @@ -99,7 +99,7 @@ public ResourceService(Vertx vertx, this.cacheExpiration = Duration.ofMillis(cacheExpiration); this.compressionMinSize = compressionMinSize; this.prefix = prefix; - this.resourceQueue = BlobStorageUtil.toStoragePath(prefix, "resource:queue"); + this.resourceQueue = "resource:" + BlobStorageUtil.toStoragePath(prefix, "queue"); // vertex timer is called from event loop, so sync is done in worker thread to not block event loop this.syncTimer = vertx.setPeriodic(syncPeriod, syncPeriod, ignore -> vertx.executeBlocking(() -> sync()));