Skip to content

Commit

Permalink
feat: support storage prefix (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim-Gadalov authored Feb 19, 2024
1 parent b1efac4 commit 22b3146
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 28 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ Static settings are used on startup and cannot be changed while application is r
| storage.credential | - | Blob storage secret key. Can be optional for filesystem and aws-s3 providers
| storage.bucket | - | Blob storage bucket
| storage.overrides.* | - | Key-value pairs to override storage settings
| storage.createBucket | false | Indicates whether bucket should be created on start-up
| storage.createBucket | false | Indicates whether bucket should be created on start-up
| storage.prefix | - | Base prefix for all stored resources. Must not contain path separators or any invalid chars
| encryption.password | - | Password used for AES encryption
| encryption.salt | - | Salt used for AES encryption
| resources.maxSize | 1048576 | Max allowed size in bytes for a resource
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/epam/aidial/core/AiDial.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void start() throws Exception {

if (redis != null) {
LockService lockService = new LockService(redis);
resourceService = new ResourceService(vertx, redis, storage, lockService, settings("resources"));
resourceService = new ResourceService(vertx, redis, storage, lockService, settings("resources"), storage.getPrefix());
invitationService = new InvitationService(resourceService, encryptionService, settings("invitations"));
shareService = new ShareService(resourceService, invitationService, encryptionService);
} else {
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/epam/aidial/core/config/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Storage {
@Nullable
String credential;
/**
* container name/root bucket
* Container name/root bucket
*/
String bucket;

Expand All @@ -41,4 +41,10 @@ public class Storage {
*/
@Nullable
Properties overrides;

/**
* Optional. Name of the root folder in a bucket, base folder for all resource. Must not contain path separators or any illegal chars
*/
@Nullable
String prefix;
}
28 changes: 20 additions & 8 deletions src/main/java/com/epam/aidial/core/service/ResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.epam.aidial.core.data.ResourceFolderMetadata;
import com.epam.aidial.core.data.ResourceItemMetadata;
import com.epam.aidial.core.storage.BlobStorage;
import com.epam.aidial.core.storage.BlobStorageUtil;
import com.epam.aidial.core.storage.ResourceDescription;
import com.epam.aidial.core.util.Compression;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -50,19 +51,22 @@ public class ResourceService implements AutoCloseable {
private final int syncBatch;
private final Duration cacheExpiration;
private final int compressionMinSize;
private final String prefix;

public ResourceService(Vertx vertx,
RedissonClient redis,
BlobStorage blobStore,
LockService lockService,
JsonObject settings) {
JsonObject settings,
String prefix) {
this(vertx, redis, blobStore, lockService,
settings.getInteger("maxSize"),
settings.getLong("syncPeriod"),
settings.getLong("syncDelay"),
settings.getInteger("syncBatch"),
settings.getLong("cacheExpiration"),
settings.getInteger("compressionMinSize")
settings.getInteger("compressionMinSize"),
prefix
);
}

Expand All @@ -83,7 +87,8 @@ public ResourceService(Vertx vertx,
long syncDelay,
int syncBatch,
long cacheExpiration,
int compressionMinSize) {
int compressionMinSize,
String prefix) {
this.vertx = vertx;
this.redis = redis;
this.blobStore = blobStore;
Expand All @@ -93,6 +98,7 @@ public ResourceService(Vertx vertx,
this.syncBatch = syncBatch;
this.cacheExpiration = Duration.ofMillis(cacheExpiration);
this.compressionMinSize = compressionMinSize;
this.prefix = prefix;

// vertex timer is called from event loop, so sync is done in worker thread to not block event loop
this.syncTimer = vertx.setPeriodic(syncPeriod, syncPeriod, ignore -> vertx.executeBlocking(() -> sync()));
Expand Down Expand Up @@ -365,9 +371,11 @@ private static String blobKey(ResourceDescription descriptor) {
return descriptor.isFolder() ? path : (path + BLOB_EXTENSION);
}

private static String blobKeyFromRedisKey(String redisKey) {
int i = redisKey.indexOf(":");
return redisKey.substring(i + 1) + BLOB_EXTENSION;
private String blobKeyFromRedisKey(String redisKey) {
// redis key may have prefix, we need to subtract it, because BlobStore manage prefix on its own
int delimiterIndex = redisKey.indexOf(":");
int prefixChars = prefix != null ? prefix.length() + 1 : 0;
return redisKey.substring(prefixChars + delimiterIndex + 1) + BLOB_EXTENSION;
}

private static String fromBlobKey(String blobKey) {
Expand Down Expand Up @@ -426,8 +434,12 @@ private void redisSync(String key) {
set.remove(key);
}

private static String redisKey(ResourceDescription descriptor) {
return descriptor.getType().name().toLowerCase() + ":" + descriptor.getAbsoluteFilePath();
private String redisKey(ResourceDescription descriptor) {
String resourcePath = descriptor.getAbsoluteFilePath();
if (prefix != null) {
resourcePath = prefix + BlobStorageUtil.PATH_SEPARATOR + resourcePath;
}
return descriptor.getType().name().toLowerCase() + ":" + resourcePath;
}

private static long time() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/epam/aidial/core/service/ShareService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

@AllArgsConstructor
@Slf4j
@AllArgsConstructor
public class ShareService {

private static final String SHARE_RESOURCE_FILENAME = "share";
Expand Down Expand Up @@ -326,7 +326,7 @@ private ResourceDescription getResourceFromLink(String url) {
}
}

private static ResourceDescription getShareResource(ResourceType shareResourceType, ResourceType requestedResourceType, String bucket, String location) {
private ResourceDescription getShareResource(ResourceType shareResourceType, ResourceType requestedResourceType, String bucket, String location) {
return ResourceDescription.fromDecoded(shareResourceType, bucket, location,
requestedResourceType.getGroup() + BlobStorageUtil.PATH_SEPARATOR + SHARE_RESOURCE_FILENAME);
}
Expand Down
82 changes: 69 additions & 13 deletions src/main/java/com/epam/aidial/core/storage/BlobStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.epam.aidial.core.data.ResourceFolderMetadata;
import com.epam.aidial.core.data.ResourceType;
import io.vertx.core.buffer.Buffer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
Expand All @@ -18,6 +19,8 @@
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.domain.Tier;
import org.jclouds.blobstore.domain.internal.BlobMetadataImpl;
import org.jclouds.blobstore.domain.internal.MutableStorageMetadataImpl;
import org.jclouds.blobstore.domain.internal.PageSetImpl;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.io.ContentMetadata;
Expand All @@ -29,6 +32,9 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

@Slf4j
public class BlobStorage implements Closeable {
Expand All @@ -42,6 +48,11 @@ public class BlobStorage implements Closeable {
private final BlobStore blobStore;
private final String bucketName;

// defines a root folder for all resources in bucket
@Getter
@Nullable
private final String prefix;

public BlobStorage(Storage config) {
String provider = config.getProvider();
ContextBuilder builder = ContextBuilder.newBuilder(provider);
Expand All @@ -57,6 +68,7 @@ public BlobStorage(Storage config) {
this.storeContext = builder.buildView(BlobStoreContext.class);
this.blobStore = storeContext.getBlobStore();
this.bucketName = config.getBucket();
this.prefix = config.getPrefix();
createBucketIfNeeded(config);
}

Expand All @@ -68,7 +80,8 @@ public BlobStorage(Storage config) {
*/
@SuppressWarnings("UnstableApiUsage") // multipart upload uses beta API
public MultipartUpload initMultipartUpload(String absoluteFilePath, String contentType) {
BlobMetadata metadata = buildBlobMetadata(absoluteFilePath, contentType, bucketName);
String storageLocation = getStorageLocation(absoluteFilePath);
BlobMetadata metadata = buildBlobMetadata(storageLocation, contentType, bucketName);
return blobStore.initiateMultipartUpload(bucketName, metadata, PutOptions.NONE);
}

Expand Down Expand Up @@ -110,7 +123,8 @@ public void abortMultipartUpload(MultipartUpload multipart) {
* @param data whole content data
*/
public void store(String absoluteFilePath, String contentType, Buffer data) {
Blob blob = blobStore.blobBuilder(absoluteFilePath)
String storageLocation = getStorageLocation(absoluteFilePath);
Blob blob = blobStore.blobBuilder(storageLocation)
.payload(new BufferPayload(data))
.contentLength(data.length())
.contentType(contentType)
Expand All @@ -132,7 +146,8 @@ public void store(String absoluteFilePath,
String contentEncoding,
Map<String, String> metadata,
byte[] data) {
Blob blob = blobStore.blobBuilder(absoluteFilePath)
String storageLocation = getStorageLocation(absoluteFilePath);
Blob blob = blobStore.blobBuilder(storageLocation)
.payload(data)
.contentLength(data.length)
.contentType(contentType)
Expand All @@ -150,15 +165,18 @@ public void store(String absoluteFilePath,
* @return Blob instance if file was found, null - otherwise
*/
public Blob load(String filePath) {
return blobStore.getBlob(bucketName, filePath);
String storageLocation = getStorageLocation(filePath);
return blobStore.getBlob(bucketName, storageLocation);
}

public boolean exists(String filePath) {
return blobStore.blobExists(bucketName, filePath);
String storageLocation = getStorageLocation(filePath);
return blobStore.blobExists(bucketName, storageLocation);
}

public BlobMetadata meta(String filePath) {
return blobStore.blobMetadata(bucketName, filePath);
String storageLocation = getStorageLocation(filePath);
return blobStore.blobMetadata(bucketName, storageLocation);
}

/**
Expand All @@ -167,14 +185,16 @@ public BlobMetadata meta(String filePath) {
* @param filePath absolute file path, for example: Users/user1/files/inputs/data.csv
*/
public void delete(String filePath) {
blobStore.removeBlob(bucketName, filePath);
String storageLocation = getStorageLocation(filePath);
blobStore.removeBlob(bucketName, storageLocation);
}

/**
* List all files/folder metadata for a given resource
*/
public MetadataBase listMetadata(ResourceDescription resource) {
ListContainerOptions options = buildListContainerOptions(resource.getAbsoluteFilePath());
String storageLocation = getStorageLocation(resource.getAbsoluteFilePath());
ListContainerOptions options = buildListContainerOptions(storageLocation);
PageSet<? extends StorageMetadata> list = blobStore.list(this.bucketName, options);
List<MetadataBase> filesMetadata = list.stream().map(meta -> buildFileMetadata(resource, meta)).toList();

Expand All @@ -192,8 +212,9 @@ public MetadataBase listMetadata(ResourceDescription resource) {
}

public PageSet<? extends StorageMetadata> list(String absoluteFilePath, String afterMarker, int maxResults, boolean recursive) {
String storageLocation = getStorageLocation(absoluteFilePath);
ListContainerOptions options = new ListContainerOptions()
.prefix(absoluteFilePath)
.prefix(storageLocation)
.maxResults(maxResults);

if (recursive) {
Expand All @@ -206,7 +227,28 @@ public PageSet<? extends StorageMetadata> list(String absoluteFilePath, String a
options.afterMarker(afterMarker);
}

return blobStore.list(bucketName, options);
PageSet<? extends StorageMetadata> originalSet = blobStore.list(bucketName, options);
if (prefix == null) {
return originalSet;
}
// if prefix defined - subtract it from blob key
String nextMarker = originalSet.getNextMarker();
Set<MutableStorageMetadataImpl> resultSet = originalSet.stream()
.map(MutableStorageMetadataImpl::new)
.map(metadata -> {
metadata.setName(removePrefix(metadata.getName()));
return metadata;
})
.collect(Collectors.toSet());

return new PageSetImpl<>(resultSet, nextMarker);
}

private String removePrefix(String path) {
if (prefix == null) {
return path;
}
return path.substring(prefix.length() + 1);
}

@Override
Expand All @@ -220,7 +262,7 @@ private static ListContainerOptions buildListContainerOptions(String prefix) {
.delimiter(BlobStorageUtil.PATH_SEPARATOR);
}

private static MetadataBase buildFileMetadata(ResourceDescription resource, StorageMetadata metadata) {
private MetadataBase buildFileMetadata(ResourceDescription resource, StorageMetadata metadata) {
String bucketName = resource.getBucketName();
ResourceDescription resultResource = getResourceDescription(resource.getType(), bucketName,
resource.getBucketLocation(), metadata.getName());
Expand All @@ -239,8 +281,12 @@ private static MetadataBase buildFileMetadata(ResourceDescription resource, Stor
};
}

private static ResourceDescription getResourceDescription(ResourceType resourceType, String bucketName, String bucketLocation, String absoluteFilePath) {
String relativeFilePath = absoluteFilePath.substring(bucketLocation.length() + resourceType.getGroup().length() + 1);
private ResourceDescription getResourceDescription(ResourceType resourceType, String bucketName, String bucketLocation, String absoluteFilePath) {
// bucketLocation + resourceType + /
int bucketAndResourceCharsLength = bucketLocation.length() + resourceType.getGroup().length() + 1;
// bucketAndResourceCharsLength or bucketAndResourceCharsLength + prefix + /
int charsToSkip = prefix == null ? bucketAndResourceCharsLength : prefix.length() + 1 + bucketAndResourceCharsLength;
String relativeFilePath = absoluteFilePath.substring(charsToSkip);
return ResourceDescription.fromDecoded(resourceType, bucketName, bucketLocation, relativeFilePath);
}

Expand Down Expand Up @@ -275,4 +321,14 @@ private void createBucketIfNeeded(Storage config) {
storeContext.getBlobStore().createContainerInLocation(null, bucketName);
}
}

/**
* Adds a storage prefix if any.
*
* @param absoluteFilePath - absolute file path that contains a user bucket location, resource type and relative resource path
* @return a full storage path
*/
private String getStorageLocation(String absoluteFilePath) {
return prefix == null ? absoluteFilePath : prefix + BlobStorageUtil.PATH_SEPARATOR + absoluteFilePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public ResourceDescription getParent() {
}

String parentFolderName = parentFolders.get(parentFolders.size() - 1);
return new ResourceDescription(type, parentFolderName, parentFolders.subList(0, parentFolders.size() - 1), originalPath, bucketName, bucketLocation, true);
return new ResourceDescription(type, parentFolderName,
parentFolders.subList(0, parentFolders.size() - 1), originalPath, bucketName, bucketLocation, true);
}

public boolean isRootFolder() {
Expand Down
1 change: 1 addition & 0 deletions src/test/java/com/epam/aidial/core/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static BlobStorage buildFsBlobStorage(Path baseDir) {
storageConfig.setIdentity("access-key");
storageConfig.setCredential("secret-key");
storageConfig.setOverrides(properties);
storageConfig.setPrefix("test");
return new BlobStorage(storageConfig);
}
}
1 change: 1 addition & 0 deletions src/test/java/com/epam/aidial/core/ResourceBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void init() throws Exception {
"provider": "filesystem",
"identity": "access-key",
"credential": "secret-key",
"prefix": "test-2",
"overrides": {
"jclouds.filesystem.basedir": "%s"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void beforeEach() {
"compressionMinSize": 256
}
""";
ResourceService resourceService = new ResourceService(vertx, redissonClient, blobStorage, lockService, new JsonObject(resourceConfig));
ResourceService resourceService = new ResourceService(vertx, redissonClient, blobStorage, lockService, new JsonObject(resourceConfig), null);
rateLimiter = new RateLimiter(vertx, resourceService);
}

Expand Down

0 comments on commit 22b3146

Please sign in to comment.