Skip to content

Commit

Permalink
fix: support prefix for resource queue (#258)(#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim-Gadalov authored Mar 5, 2024
1 parent e8c5dc2 commit f3be802
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/main/java/com/epam/aidial/core/service/ResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

@Slf4j
public class ResourceService implements AutoCloseable {
private static final String REDIS_QUEUE = "resource:queue";
private static final Set<String> REDIS_FIELDS = Set.of("body", "created_at", "updated_at", "synced", "exists");
private static final Set<String> REDIS_FIELDS_NO_BODY = Set.of("created_at", "updated_at", "synced", "exists");

Expand All @@ -52,6 +51,7 @@ public class ResourceService implements AutoCloseable {
private final Duration cacheExpiration;
private final int compressionMinSize;
private final String prefix;
private final String resourceQueue;

public ResourceService(Vertx vertx,
RedissonClient redis,
Expand Down Expand Up @@ -99,6 +99,7 @@ public ResourceService(Vertx vertx,
this.cacheExpiration = Duration.ofMillis(cacheExpiration);
this.compressionMinSize = compressionMinSize;
this.prefix = prefix;
this.resourceQueue = "resource:" + BlobStorageUtil.toStoragePath(prefix, "queue");

// vertex timer is called from event loop, so sync is done in worker thread to not block event loop
this.syncTimer = vertx.setPeriodic(syncPeriod, syncPeriod, ignore -> vertx.executeBlocking(() -> sync()));
Expand Down Expand Up @@ -267,7 +268,7 @@ public boolean deleteResource(ResourceDescription descriptor) {
private Void sync() {
log.debug("Syncing");
try {
RScoredSortedSet<String> set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE);
RScoredSortedSet<String> set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE);
long now = time();

for (String redisKey : set.valueRange(Double.NEGATIVE_INFINITY, true, now, true, 0, syncBatch)) {
Expand All @@ -290,7 +291,7 @@ private void sync(String redisKey) {
Result result = redisGet(redisKey, false);
if (result == null || result.synced) {
redis.getMap(redisKey, StringCodec.INSTANCE).expireIfNotSet(cacheExpiration);
redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE).remove(redisKey);
redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE).remove(redisKey);
return;
}

Expand Down Expand Up @@ -396,7 +397,7 @@ private Result redisGet(String key, boolean withBody) {
}

private void redisPut(String key, Result result) {
RScoredSortedSet<String> set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE);
RScoredSortedSet<String> set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE);
set.add(time() + syncDelay, key); // add resource to sync set before changing because calls below can fail

RMap<String, String> map = redis.getMap(key, StringCodec.INSTANCE);
Expand Down Expand Up @@ -425,7 +426,7 @@ private void redisSync(String key) {
map.put("synced", "true");
map.expire(cacheExpiration);

RScoredSortedSet<String> set = redis.getScoredSortedSet(REDIS_QUEUE, StringCodec.INSTANCE);
RScoredSortedSet<String> set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE);
set.remove(key);
}

Expand Down

0 comments on commit f3be802

Please sign in to comment.