From 5282490801c1cca281cb23c51f3e2654afdab95a Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Tue, 24 Oct 2023 17:24:38 +0200 Subject: [PATCH 01/10] initial implementation of Files API --- build.gradle | 1 + .../java/com/epam/aidial/core/AiDial.java | 12 +- src/main/java/com/epam/aidial/core/Proxy.java | 15 +- .../com/epam/aidial/core/config/Storage.java | 30 +++ .../core/controller/ControllerSelector.java | 43 +++- .../core/controller/DeleteFileController.java | 33 +++ .../controller/DownloadFileController.java | 64 ++++++ .../controller/FileMetadataController.java | 35 ++++ .../core/controller/UploadFileController.java | 56 +++++ .../epam/aidial/core/data/FileMetadata.java | 19 ++ .../aidial/core/data/FileMetadataBase.java | 11 + .../com/epam/aidial/core/data/FileType.java | 5 + .../epam/aidial/core/data/FolderMetadata.java | 8 + .../epam/aidial/core/storage/BlobStorage.java | 192 ++++++++++++++++++ .../aidial/core/storage/BlobStorageUtil.java | 48 +++++ .../aidial/core/storage/BlobWriteStream.java | 184 +++++++++++++++++ .../aidial/core/storage/BufferPayload.java | 34 ++++ .../core/storage/InputStreamReader.java | 150 ++++++++++++++ .../aidial/core/storage/StorageCache.java | 23 +++ src/main/resources/aidial.settings.json | 7 + 20 files changed, 963 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/epam/aidial/core/config/Storage.java create mode 100644 src/main/java/com/epam/aidial/core/controller/DeleteFileController.java create mode 100644 src/main/java/com/epam/aidial/core/controller/DownloadFileController.java create mode 100644 src/main/java/com/epam/aidial/core/controller/FileMetadataController.java create mode 100644 src/main/java/com/epam/aidial/core/controller/UploadFileController.java create mode 100644 src/main/java/com/epam/aidial/core/data/FileMetadata.java create mode 100644 src/main/java/com/epam/aidial/core/data/FileMetadataBase.java create mode 100644 src/main/java/com/epam/aidial/core/data/FileType.java create mode 100644 src/main/java/com/epam/aidial/core/data/FolderMetadata.java create mode 100644 src/main/java/com/epam/aidial/core/storage/BlobStorage.java create mode 100644 src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java create mode 100644 src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java create mode 100644 src/main/java/com/epam/aidial/core/storage/BufferPayload.java create mode 100644 src/main/java/com/epam/aidial/core/storage/InputStreamReader.java create mode 100644 src/main/java/com/epam/aidial/core/storage/StorageCache.java diff --git a/build.gradle b/build.gradle index f56bc859b..4fdb0e192 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2' implementation 'com.auth0:java-jwt:4.4.0' implementation 'com.auth0:jwks-rsa:0.22.1' + implementation 'org.apache.jclouds:jclouds-allblobstore:2.5.0' runtimeOnly 'com.epam.deltix:gflog-slf4j:3.0.0' testImplementation 'org.mockito:mockito-core:5.4.0' diff --git a/src/main/java/com/epam/aidial/core/AiDial.java b/src/main/java/com/epam/aidial/core/AiDial.java index 6b2fe2723..503595abf 100644 --- a/src/main/java/com/epam/aidial/core/AiDial.java +++ b/src/main/java/com/epam/aidial/core/AiDial.java @@ -2,10 +2,12 @@ import com.epam.aidial.core.config.ConfigStore; import com.epam.aidial.core.config.FileConfigStore; +import com.epam.aidial.core.config.Storage; import com.epam.aidial.core.limiter.RateLimiter; import com.epam.aidial.core.log.GfLogStore; import com.epam.aidial.core.log.LogStore; import com.epam.aidial.core.security.IdentityProvider; +import com.epam.aidial.core.storage.BlobStorage; import com.epam.aidial.core.upstream.UpstreamBalancer; import com.epam.deltix.gflog.core.LogConfigurator; import io.micrometer.registry.otlp.OtlpMeterRegistry; @@ -17,6 +19,7 @@ import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; import io.vertx.core.metrics.MetricsOptions; import io.vertx.micrometer.MicrometerMetricsOptions; @@ -40,6 +43,8 @@ public class AiDial { private HttpServer server; private HttpClient client; + private BlobStorage storage; + private void start() throws Exception { try { settings = settings(); @@ -56,7 +61,9 @@ private void start() throws Exception { UpstreamBalancer upstreamBalancer = new UpstreamBalancer(); IdentityProvider identityProvider = new IdentityProvider(settings("identityProvider"), vertx); - Proxy proxy = new Proxy(client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider); + Storage storageConfig = Json.decodeValue(settings("storage").toBuffer(), Storage.class); + storage = new BlobStorage(storageConfig); + Proxy proxy = new Proxy(vertx, client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider, storage); server = vertx.createHttpServer(new HttpServerOptions(settings("server"))).requestHandler(proxy); open(server, HttpServer::listen); @@ -75,6 +82,9 @@ private void stop() { close(server, HttpServer::close); close(client, HttpClient::close); close(vertx, Vertx::close); + if (storage != null) { + storage.close(); + } log.info("Proxy stopped"); LogConfigurator.unconfigure(); } catch (Throwable e) { diff --git a/src/main/java/com/epam/aidial/core/Proxy.java b/src/main/java/com/epam/aidial/core/Proxy.java index c4baf6e82..af8adeccd 100644 --- a/src/main/java/com/epam/aidial/core/Proxy.java +++ b/src/main/java/com/epam/aidial/core/Proxy.java @@ -10,11 +10,13 @@ import com.epam.aidial.core.log.LogStore; import com.epam.aidial.core.security.ExtractedClaims; import com.epam.aidial.core.security.IdentityProvider; +import com.epam.aidial.core.storage.BlobStorage; import com.epam.aidial.core.upstream.UpstreamBalancer; import com.epam.aidial.core.util.HttpStatus; import com.epam.aidial.core.util.ProxyUtil; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpMethod; @@ -42,14 +44,16 @@ public class Proxy implements Handler { public static final String HEADER_UPSTREAM_ATTEMPTS = "X-UPSTREAM-ATTEMPTS"; public static final String HEADER_CONTENT_TYPE_APPLICATION_JSON = "application/json"; - public static final int REQUEST_BODY_MAX_SIZE = 16 * 1024 * 1024; + public static final int REQUEST_BODY_MAX_SIZE = 512 * 1024 * 1024; + private final Vertx vertx; private final HttpClient client; private final ConfigStore configStore; private final LogStore logStore; private final RateLimiter rateLimiter; private final UpstreamBalancer upstreamBalancer; private final IdentityProvider identityProvider; + private final BlobStorage storage; @Override public void handle(HttpServerRequest request) { @@ -74,7 +78,8 @@ private void handleRequest(HttpServerRequest request) throws Exception { return; } - if (request.method() != HttpMethod.GET && request.method() != HttpMethod.POST) { + HttpMethod requestMethod = request.method(); + if (requestMethod != HttpMethod.GET && requestMethod != HttpMethod.POST && requestMethod != HttpMethod.DELETE) { respond(request, HttpStatus.METHOD_NOT_ALLOWED); return; } @@ -141,7 +146,8 @@ private void handleRequest(HttpServerRequest request) throws Exception { }); } - private void onExtractClaimsFailure(Throwable error, Config config, HttpServerRequest request, Key key) throws Exception { + private void onExtractClaimsFailure(Throwable error, Config config, HttpServerRequest request, Key key) + throws Exception { if (key.getUserAuth() == UserAuth.ENABLED) { log.error("Can't extract claims from authorization header", error); respond(request, HttpStatus.UNAUTHORIZED, "Bad Authorization header"); @@ -152,7 +158,8 @@ private void onExtractClaimsFailure(Throwable error, Config config, HttpServerRe } } - private void onExtractClaimsSuccess(ExtractedClaims extractedClaims, Config config, HttpServerRequest request, Key key) throws Exception { + private void onExtractClaimsSuccess(ExtractedClaims extractedClaims, Config config, + HttpServerRequest request, Key key) throws Exception { ProxyContext context = new ProxyContext(config, request, key, extractedClaims); Controller controller = ControllerSelector.select(this, context); controller.handle(); diff --git a/src/main/java/com/epam/aidial/core/config/Storage.java b/src/main/java/com/epam/aidial/core/config/Storage.java new file mode 100644 index 000000000..f954b48a5 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/config/Storage.java @@ -0,0 +1,30 @@ +package com.epam.aidial.core.config; + +import lombok.Data; + +import javax.annotation.Nullable; + +@Data +public class Storage { + /** + * Specifies storage provider. Supported providers: s3, aws-s3, azureblob, google-cloud-storage + */ + String provider; + /** + * Optional. Specifies endpoint url for s3 compatible storages + */ + @Nullable + String endpoint; + /** + * api key + */ + String identity; + /** + * secret key + */ + String credential; + /** + * container name/root bucket + */ + String bucket; +} diff --git a/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java b/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java index b43456604..f77cd478c 100644 --- a/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java +++ b/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java @@ -29,21 +29,29 @@ public class ControllerSelector { private static final Pattern PATTERN_APPLICATION = Pattern.compile("/+openai/applications/([-.@a-zA-Z0-9]+)"); private static final Pattern PATTERN_APPLICATIONS = Pattern.compile("/+openai/applications"); + private static final Pattern PATTERN_FILES = Pattern.compile("/v1/files(.*)"); + + private static final Pattern PATTERN_FILE = Pattern.compile("/v1/files/([-a-zA-Z0-9]+)"); + + private static final Pattern PATTERN_FILES_METADATA = Pattern.compile("/v1/files/metadata([-a-zA-Z0-9/]*)"); + public Controller select(Proxy proxy, ProxyContext context) { String path = URLDecoder.decode(context.getRequest().path(), StandardCharsets.UTF_8); HttpMethod method = context.getRequest().method(); Controller controller = null; if (method == HttpMethod.GET) { - controller = selectGet(context, path); + controller = selectGet(proxy, context, path); } else if (method == HttpMethod.POST) { controller = selectPost(proxy, context, path); + } else if (method == HttpMethod.DELETE) { + controller = selectDelete(proxy, context, path); } return (controller == null) ? new RouteController(proxy, context) : controller; } - private static Controller selectGet(ProxyContext context, String path) { + private static Controller selectGet(Proxy proxy, ProxyContext context, String path) { Matcher match; match = match(PATTERN_DEPLOYMENT, path); @@ -111,6 +119,20 @@ private static Controller selectGet(ProxyContext context, String path) { return controller::getApplications; } + match = match(PATTERN_FILES_METADATA, path); + if (match != null) { + String filePath = match.group(1); + FileMetadataController controller = new FileMetadataController(proxy, context); + return () -> controller.list(filePath); + } + + match = match(PATTERN_FILE, path); + if (match != null) { + DownloadFileController controller = new DownloadFileController(proxy, context); + String fileId = match.group(1); + return () -> controller.download(fileId); + } + return null; } @@ -122,6 +144,23 @@ private static Controller selectPost(Proxy proxy, ProxyContext context, String p DeploymentPostController controller = new DeploymentPostController(proxy, context); return () -> controller.handle(deploymentId, deploymentApi); } + match = match(PATTERN_FILES, path); + if (match != null) { + String filePath = match.group(1); + UploadFileController controller = new UploadFileController(proxy, context); + return () -> controller.upload(filePath); + } + + return null; + } + + private static Controller selectDelete(Proxy proxy, ProxyContext context, String path) { + Matcher match = match(PATTERN_FILE, path); + if (match != null) { + String fileId = match.group(1); + DeleteFileController controller = new DeleteFileController(proxy, context); + return () -> controller.delete(fileId); + } return null; } diff --git a/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java b/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java new file mode 100644 index 000000000..2341fee76 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java @@ -0,0 +1,33 @@ +package com.epam.aidial.core.controller; + +import com.epam.aidial.core.Proxy; +import com.epam.aidial.core.ProxyContext; +import com.epam.aidial.core.storage.BlobStorage; +import com.epam.aidial.core.util.HttpStatus; +import io.vertx.core.Future; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AllArgsConstructor +public class DeleteFileController { + private final Proxy proxy; + private final ProxyContext context; + + public Future delete(String fileId) { + BlobStorage storage = proxy.getStorage(); + Future result = proxy.getVertx().executeBlocking(future -> { + try { + storage.delete(fileId); + future.complete(); + } catch (Exception ex) { + log.error("Failed to delete file " + fileId, ex); + future.fail(ex); + } + }); + + return result + .onSuccess(success -> context.respond(HttpStatus.OK)) + .onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, error.getMessage())); + } +} diff --git a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java new file mode 100644 index 000000000..d1192afc0 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java @@ -0,0 +1,64 @@ +package com.epam.aidial.core.controller; + +import com.epam.aidial.core.Proxy; +import com.epam.aidial.core.ProxyContext; +import com.epam.aidial.core.storage.InputStreamReader; +import com.epam.aidial.core.util.HttpStatus; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerResponse; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.io.MutableContentMetadata; +import org.jclouds.io.Payload; + +import java.io.IOException; + +@Slf4j +@AllArgsConstructor +public class DownloadFileController { + + private final Proxy proxy; + private final ProxyContext context; + + public Future download(String fileId) throws Exception { + Future blobFuture = proxy.getVertx().executeBlocking(result -> { + Blob blob = proxy.getStorage().load(fileId); + result.complete(blob); + }); + + Promise result = Promise.promise(); + blobFuture.onSuccess(blob -> { + if (blob == null) { + context.respond(HttpStatus.NOT_FOUND); + result.complete(); + return; + } + + Payload payload = blob.getPayload(); + MutableContentMetadata metadata = payload.getContentMetadata(); + String contentType = metadata.getContentType(); + Long length = metadata.getContentLength(); + + HttpServerResponse response = context.getResponse() + .putHeader(HttpHeaders.CONTENT_TYPE, contentType) + // content-length removed by vertx + .putHeader(HttpHeaders.CONTENT_LENGTH, length.toString()); + + try { + InputStreamReader stream = new InputStreamReader(proxy.getVertx(), payload.openStream()); + stream.pipeTo(response, result); + result.future().onFailure(error -> { + stream.close(); + context.getResponse().close(); + }); + } catch (IOException e) { + result.fail(e); + } + }).onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to fetch file with ID " + fileId)); + + return result.future(); + } +} diff --git a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java new file mode 100644 index 000000000..cb63b15b8 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java @@ -0,0 +1,35 @@ +package com.epam.aidial.core.controller; + +import com.epam.aidial.core.Proxy; +import com.epam.aidial.core.ProxyContext; +import com.epam.aidial.core.data.FileMetadataBase; +import com.epam.aidial.core.storage.BlobStorage; +import com.epam.aidial.core.util.HttpStatus; +import io.vertx.core.Future; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@AllArgsConstructor +@Slf4j +public class FileMetadataController { + private final Proxy proxy; + private final ProxyContext context; + + public Future list(String path) { + BlobStorage storage = proxy.getStorage(); + return proxy.getVertx().executeBlocking(future -> { + try { + String recursive = context.getRequest().params().get("recursive"); + List metadata = storage.listMetadata(path, Boolean.parseBoolean(recursive)); + context.respond(HttpStatus.OK, metadata); + } catch (Exception ex) { + log.error("Failed to list files", ex); + context.respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to list files metadata"); + } + + future.complete(); + }); + } +} diff --git a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java new file mode 100644 index 000000000..c76958625 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java @@ -0,0 +1,56 @@ +package com.epam.aidial.core.controller; + +import com.epam.aidial.core.Proxy; +import com.epam.aidial.core.ProxyContext; +import com.epam.aidial.core.storage.BlobStorageUtil; +import com.epam.aidial.core.storage.BlobWriteStream; +import com.epam.aidial.core.util.HttpStatus; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.impl.MimeMapping; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.impl.PipeImpl; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.UUID; + +@Slf4j +@RequiredArgsConstructor +public class UploadFileController { + + private final Proxy proxy; + private final ProxyContext context; + + public Future upload(String path) { + String fileId = UUID.randomUUID().toString(); + Promise result = Promise.promise(); + context.getRequest() + .setExpectMultipart(true) + .uploadHandler(upload -> { + String filename = upload.filename(); + String mimeType = MimeMapping.getMimeTypeForFilename(filename); + String contentType = mimeType == null ? "application/octet-stream" : mimeType; + Pipe pipe = new PipeImpl<>(upload).endOnFailure(false); + BlobWriteStream writeStream = new BlobWriteStream( + proxy.getVertx(), + proxy.getStorage(), + filename, + contentType, + fileId, + BlobStorageUtil.removeLeadingAndTrailingPathSeparators(path)); + pipe.to(writeStream, result); + + result.future() + .onSuccess(success -> context.respond(HttpStatus.OK, writeStream.getMetadata())) + .onFailure(error -> { + writeStream.abortUpload(error); + context.respond(HttpStatus.INTERNAL_SERVER_ERROR, + "Failed to upload file: " + error.getMessage()); + }); + }); + + return result.future(); + } +} diff --git a/src/main/java/com/epam/aidial/core/data/FileMetadata.java b/src/main/java/com/epam/aidial/core/data/FileMetadata.java new file mode 100644 index 000000000..09bed437e --- /dev/null +++ b/src/main/java/com/epam/aidial/core/data/FileMetadata.java @@ -0,0 +1,19 @@ +package com.epam.aidial.core.data; + +import lombok.Getter; + +@Getter +public class FileMetadata extends FileMetadataBase { + String id; + String path; + long contentLength; + String contentType; + + public FileMetadata(String id, String name, String path, long contentLength, String contentType) { + super(name, FileType.FILE); + this.id = id; + this.path = path; + this.contentLength = contentLength; + this.contentType = contentType; + } +} diff --git a/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java b/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java new file mode 100644 index 000000000..bac27802a --- /dev/null +++ b/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java @@ -0,0 +1,11 @@ +package com.epam.aidial.core.data; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@AllArgsConstructor +@Data +public abstract class FileMetadataBase { + private String name; + private FileType type; +} diff --git a/src/main/java/com/epam/aidial/core/data/FileType.java b/src/main/java/com/epam/aidial/core/data/FileType.java new file mode 100644 index 000000000..23c330ceb --- /dev/null +++ b/src/main/java/com/epam/aidial/core/data/FileType.java @@ -0,0 +1,5 @@ +package com.epam.aidial.core.data; + +public enum FileType { + FILE, FOLDER +} diff --git a/src/main/java/com/epam/aidial/core/data/FolderMetadata.java b/src/main/java/com/epam/aidial/core/data/FolderMetadata.java new file mode 100644 index 000000000..3db92fbf9 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/data/FolderMetadata.java @@ -0,0 +1,8 @@ +package com.epam.aidial.core.data; + +public class FolderMetadata extends FileMetadataBase { + + public FolderMetadata(String name) { + super(name, FileType.FOLDER); + } +} diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java new file mode 100644 index 000000000..c134a068b --- /dev/null +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java @@ -0,0 +1,192 @@ +package com.epam.aidial.core.storage; + +import com.epam.aidial.core.config.Storage; +import com.epam.aidial.core.data.FileMetadata; +import com.epam.aidial.core.data.FileMetadataBase; +import com.epam.aidial.core.data.FolderMetadata; +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobMetadata; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.domain.StorageType; +import org.jclouds.blobstore.domain.Tier; +import org.jclouds.blobstore.domain.internal.BlobMetadataImpl; +import org.jclouds.blobstore.options.ListContainerOptions; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.io.ContentMetadata; +import org.jclouds.io.ContentMetadataBuilder; +import org.jclouds.io.payloads.BaseMutableContentMetadata; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Slf4j +public class BlobStorage { + + private static final String FILE_NAME_METADATA_KEY = "file_name"; + private static final String CONTENT_TYPE_METADATA_KEY = "content_type"; + + private final BlobStoreContext storeContext; + private final BlobStore blobStore; + private final StorageCache cache; + private final String bucketName; + + public BlobStorage(Storage config) { + ContextBuilder builder = ContextBuilder.newBuilder(config.getProvider()); + if (config.getEndpoint() != null) { + builder.endpoint(config.getEndpoint()); + } + builder.credentials(config.getIdentity(), config.getCredential()); + this.storeContext = builder.buildView(BlobStoreContext.class); + this.blobStore = storeContext.getBlobStore(); + this.bucketName = config.getBucket(); + this.cache = new StorageCache(); + createBucketIfNeeded(config); + } + + public MultipartUpload initMultipartUpload(String fileId, String path, String resourceName, String contentType) { + BlobMetadata metadata = buildBlobMetadata(fileId, path, resourceName, contentType, bucketName); + return blobStore.initiateMultipartUpload(bucketName, metadata, PutOptions.NONE); + } + + public MultipartPart storeMultipartPart(MultipartUpload multipart, int part, Buffer buffer) { + return blobStore.uploadMultipartPart(multipart, part, new BufferPayload(buffer)); + } + + public void completeMultipartUpload(FileMetadata metadata, MultipartUpload multipart, List parts) { + String etag = blobStore.completeMultipartUpload(multipart, parts); + // temp fileId to resource linking + cache.cache(metadata.getId(), metadata); + log.info("Stored etag: " + etag); + } + + public void abortMultipartUpload(MultipartUpload multipart) { + blobStore.abortMultipartUpload(multipart); + } + + public void store(FileMetadata metadata, Buffer data) { + String fileName = metadata.getName(); + String fileId = metadata.getId(); + String contentType = metadata.getContentType(); + String parentPath = metadata.getPath(); + Map userMetadata = buildUserMetadata(fileName, contentType); + String filePath = BlobStorageUtil.buildFilePath(parentPath, fileId); + Blob blob = blobStore.blobBuilder(filePath) + .payload(new BufferPayload(data)) + .contentLength(data.length()) + .contentType(contentType) + .userMetadata(userMetadata) + .build(); + + String etag = blobStore.putBlob(bucketName, blob); + // temp fileId to resource linking + cache.cache(fileId, metadata); + log.info("Stored etag: " + etag); + } + + public Blob load(String fileId) { + FileMetadata metadata = cache.load(fileId); + if (metadata == null) { + return null; + } + String parentPath = metadata.getPath(); + String id = metadata.getId(); + String resourceLocation = BlobStorageUtil.buildFilePath(parentPath, id); + return blobStore.getBlob(bucketName, resourceLocation); + } + + public void delete(String fileId) { + FileMetadata metadata = cache.load(fileId); + if (metadata == null) { + throw new IllegalArgumentException("File with ID %s not found".formatted(fileId)); + } + String parentPath = metadata.getPath(); + String id = metadata.getId(); + String resourceLocation = BlobStorageUtil.buildFilePath(parentPath, id); + blobStore.removeBlob(bucketName, resourceLocation); + cache.remove(fileId); + } + + public List listMetadata(String path, boolean recursive) { + List metadata = new ArrayList<>(); + ListContainerOptions options = buildListContainerOptions(BlobStorageUtil.normalizePathForQuery(path), recursive); + PageSet list = blobStore.list(bucketName, options); + list.forEach(meta -> { + StorageType objectType = meta.getType(); + switch (objectType) { + case BLOB -> metadata.add(buildFileMetadata(meta)); + case FOLDER, RELATIVE_PATH -> + metadata.add(new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(meta.getName()))); + default -> throw new IllegalArgumentException("Can't list container"); + } + }); + + return metadata; + } + + private static ListContainerOptions buildListContainerOptions(String prefix, boolean recursive) { + ListContainerOptions options = new ListContainerOptions() + .withDetails() + .delimiter(BlobStorageUtil.PATH_SEPARATOR); + if (prefix != null) { + options.prefix(prefix); + } + if (recursive) { + options.recursive(); + } + + return options; + } + + private static FileMetadata buildFileMetadata(StorageMetadata metadata) { + Map userMetadata = metadata.getUserMetadata(); + String fullFileName = metadata.getName(); + String fileName = userMetadata.get(FILE_NAME_METADATA_KEY); + String contentType = userMetadata.get(CONTENT_TYPE_METADATA_KEY); + String[] elements = fullFileName.split(BlobStorageUtil.PATH_SEPARATOR); + String fileId = elements[elements.length - 1]; + // strip /UUID if needed + String parentPath = elements.length > 1 ? fullFileName.substring(0, fullFileName.length() - 37) : null; + return new FileMetadata(fileId, fileName, parentPath, metadata.getSize(), contentType); + } + + private static Map buildUserMetadata(String fileName, String contentType) { + return Map.of(FILE_NAME_METADATA_KEY, fileName, CONTENT_TYPE_METADATA_KEY, contentType); + } + + public void close() { + storeContext.close(); + } + + private static BlobMetadata buildBlobMetadata(String fileId, String path, String resourceName, String contentType, String bucketName) { + Map userMetadata = buildUserMetadata(resourceName, contentType); + String filePath = BlobStorageUtil.buildFilePath(path, fileId); + ContentMetadata contentMetadata = buildContentMetadata(contentType); + return new BlobMetadataImpl(fileId, filePath, null, null, null, null, null, userMetadata, null, bucketName, contentMetadata, null, Tier.STANDARD); + } + + private static ContentMetadata buildContentMetadata(String contentType) { + ContentMetadata contentMetadata = ContentMetadataBuilder.create() + .contentType(contentType) + .build(); + return BaseMutableContentMetadata.fromContentMetadata(contentMetadata); + } + + private void createBucketIfNeeded(Storage config) { + if (config.getProvider().equals("google-cloud-storage")) { + // GCP service account do not have permissions to get bucket :( + return; + } + if (!storeContext.getBlobStore().containerExists(bucketName)) { + storeContext.getBlobStore().createContainerInLocation(null, bucketName); + } + } +} diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java new file mode 100644 index 000000000..25c40a596 --- /dev/null +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java @@ -0,0 +1,48 @@ +package com.epam.aidial.core.storage; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class BlobStorageUtil { + + public static final String PATH_SEPARATOR = "/"; + private static final char DELIMITER = PATH_SEPARATOR.charAt(0); + + + public String normalizePathForQuery(String path) { + if (path == null || path.isBlank()) { + return null; + } + + // remove leading separator + if (path.charAt(0) == DELIMITER) { + path = path.substring(1); + } + + // add trailing separator if needed + return path.charAt(path.length() - 1) == DELIMITER ? path : path + PATH_SEPARATOR; + } + + public String removeLeadingAndTrailingPathSeparators(String path) { + if (path == null || path.isBlank() || path.equals("/")) { + return null; + } + if (path.charAt(0) == DELIMITER) { + path = path.substring(1); + } + + if (path.charAt(path.length() - 1) == DELIMITER) { + path = path.substring(0, path.length() - 1); + } + + return path; + } + + public String buildFilePath(String parentPath, String fileId) { + if (parentPath == null) { + return fileId; + } + + return parentPath + PATH_SEPARATOR + fileId; + } +} diff --git a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java new file mode 100644 index 000000000..d71b3e18b --- /dev/null +++ b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java @@ -0,0 +1,184 @@ +package com.epam.aidial.core.storage; + +import com.epam.aidial.core.data.FileMetadata; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.WriteStream; +import lombok.extern.slf4j.Slf4j; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class BlobWriteStream implements WriteStream { + + private static final int MIN_PART_SIZE = 5 * 1024 * 1024; + + private final Vertx vertx; + private final BlobStorage storage; + private final String fileId; + private final String parentPath; + private final String fileName; + private final String contentType; + + private final Buffer chunkBuffer = Buffer.buffer(); + private int chunkSize = MIN_PART_SIZE; + private int position; + private MultipartUpload mpu; + private int chunkNumber = 0; + private FileMetadata metadata; + + private Throwable exception; + + private Handler errorHandler; + + private final List parts = new ArrayList<>(); + + private boolean isBufferFull; + + private long bytesHandled; + + public BlobWriteStream(Vertx vertx, + BlobStorage storage, + String fileName, + String contentType, + String fileId, + String parentPath) { + this.vertx = vertx; + this.storage = storage; + this.fileName = fileName; + this.contentType = contentType; + this.fileId = fileId; + this.parentPath = parentPath; + } + + @Override + public synchronized WriteStream exceptionHandler(Handler handler) { + this.errorHandler = handler; + return this; + } + + @Override + public synchronized Future write(Buffer data) { + Promise promise = Promise.promise(); + write(data, promise); + return promise.future(); + } + + @Override + public synchronized void write(Buffer data, Handler> handler) { + // exception might be thrown during drain handling, if so we need to stop processing chunks + // upload abortion will be handled in the end + if (exception != null) { + handler.handle(Future.failedFuture(exception)); + return; + } + + int length = data.length(); + chunkBuffer.setBuffer(position, data); + position += length; + bytesHandled += length; + if (position > chunkSize) { + isBufferFull = true; + } + + handler.handle(Future.succeededFuture()); + } + + @Override + public void end(Handler> handler) { + Future result = vertx.executeBlocking(promise -> { + synchronized (this) { + if (exception != null) { + promise.fail(exception); + return; + } + + Buffer lastChunk = chunkBuffer.slice(0, position); + metadata = new FileMetadata(fileId, fileName, parentPath, bytesHandled, contentType); + if (mpu == null) { + log.info("Resource is too small for multipart upload, sending as a regular blob"); + storage.store(metadata, lastChunk); + } else { + if (position != 0) { + MultipartPart part = storage.storeMultipartPart(mpu, ++chunkNumber, lastChunk); + parts.add(part); + } + + storage.completeMultipartUpload(metadata, mpu, parts); + log.info("Multipart upload committed, bytes handled {}", bytesHandled); + } + + promise.complete(); + } + }); + result.onSuccess(success -> { + if (handler != null) { + handler.handle(Future.succeededFuture()); + } + }).onFailure(error -> { + if (handler != null) { + handler.handle(Future.failedFuture(error)); + } + }); + } + + @Override + public synchronized WriteStream setWriteQueueMaxSize(int maxSize) { + assert maxSize > MIN_PART_SIZE; + chunkSize = maxSize; + return this; + } + + @Override + public synchronized boolean writeQueueFull() { + return isBufferFull; + } + + @Override + public WriteStream drainHandler(Handler handler) { + vertx.executeBlocking((promise) -> { + synchronized (this) { + try { + if (mpu == null) { + mpu = storage.initMultipartUpload(fileId, parentPath, fileName, contentType); + } + MultipartPart part = storage.storeMultipartPart(mpu, ++chunkNumber, chunkBuffer.slice(0, position)); + parts.add(part); + position = 0; + isBufferFull = false; + } catch (Throwable ex) { + exception = ex; + } finally { + if (handler != null) { + handler.handle(null); + } + } + } + }); + + return this; + } + + public synchronized FileMetadata getMetadata() { + return metadata; + } + + public synchronized void abortUpload(Throwable ex) { + if (mpu != null) { + storage.abortMultipartUpload(mpu); + } + + if (errorHandler != null) { + errorHandler.handle(ex); + } + + log.info("Multipart upload aborted"); + } +} diff --git a/src/main/java/com/epam/aidial/core/storage/BufferPayload.java b/src/main/java/com/epam/aidial/core/storage/BufferPayload.java new file mode 100644 index 000000000..73182d2ad --- /dev/null +++ b/src/main/java/com/epam/aidial/core/storage/BufferPayload.java @@ -0,0 +1,34 @@ +package com.epam.aidial.core.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.vertx.core.buffer.Buffer; +import org.jclouds.io.payloads.BasePayload; + +import java.io.InputStream; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class BufferPayload extends BasePayload { + + public BufferPayload(Buffer content) { + this(content, null); + } + + public BufferPayload(Buffer content, byte[] md5) { + super(content.getByteBuf()); + getContentMetadata().setContentLength((long) checkNotNull(content, "content").length()); + getContentMetadata().setContentMD5(md5); + } + + + @Override + public InputStream openStream() { + return new ByteBufInputStream(content); + } + + @Override + public boolean isRepeatable() { + return false; + } +} diff --git a/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java b/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java new file mode 100644 index 000000000..558c917ce --- /dev/null +++ b/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java @@ -0,0 +1,150 @@ +package com.epam.aidial.core.storage; + +import io.netty.buffer.Unpooled; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import io.vertx.core.streams.impl.InboundBuffer; +import io.vertx.core.streams.impl.PipeImpl; +import lombok.extern.slf4j.Slf4j; + +import java.io.InputStream; + +@Slf4j +public class InputStreamReader implements ReadStream { + + private static final int DEFAULT_READ_BUFFER_SIZE = 32768; + + private final Vertx vertx; + private final InputStream in; + private final InboundBuffer queue; + private final int bufferSize; + + private Handler dataHandler; + private Handler endHandler; + private Handler exceptionHandler; + + public InputStreamReader(Vertx vertx, InputStream stream) { + this(vertx, stream, DEFAULT_READ_BUFFER_SIZE); + } + + public InputStreamReader(Vertx vertx, InputStream in, int bufferSize) { + this.vertx = vertx; + this.in = in; + this.queue = new InboundBuffer<>(vertx.getOrCreateContext(), 32); + this.bufferSize = bufferSize; + queue.handler(buff -> { + if (buff.length() > 0) { + handleData(buff); + } else { + handleEnd(); + } + }); + queue.drainHandler(v -> readDataFromStream()); + readDataFromStream(); + } + + @Override + public synchronized InputStreamReader endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + + @Override + public synchronized InputStreamReader exceptionHandler(Handler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + return this; + } + + @Override + public void pipeTo(WriteStream dst, Handler> handler) { + Pipe pipe = new PipeImpl<>(this).endOnFailure(false); + pipe.to(dst, handler); + } + + @Override + public synchronized InputStreamReader handler(Handler handler) { + this.dataHandler = handler; + return this; + } + + @Override + public synchronized InputStreamReader pause() { + queue.pause(); + return this; + } + + @Override + public synchronized InputStreamReader resume() { + queue.resume(); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + queue.fetch(amount); + return this; + } + + private synchronized void readDataFromStream() { + Future fetchResult = vertx.executeBlocking(future -> { + try { + byte[] data = in.readNBytes(bufferSize); + Buffer chunk = Buffer.buffer(Unpooled.wrappedBuffer(data)); + future.complete(chunk); + } catch (Exception e) { + future.fail(e); + } + }); + fetchResult.onSuccess(buf -> { + if (queue.write(buf) && buf.length() > 0) { + // load more data + readDataFromStream(); + } + }).onFailure(error -> { + log.info("Failed to read data from InputStream", error); + close(); + synchronized (this) { + if (exceptionHandler != null) { + exceptionHandler.handle(error); + } + } + }); + } + + private void handleData(Buffer buff) { + Handler handler; + synchronized (this) { + handler = dataHandler; + } + if (handler != null) { + handler.handle(buff); + } + } + + private synchronized void handleEnd() { + close(); + Handler endHandler; + synchronized (this) { + this.dataHandler.handle(Buffer.buffer()); + dataHandler = null; + endHandler = this.endHandler; + } + endHandler.handle(null); + } + + public synchronized void close() { + try { + in.close(); + queue.clear(); + } catch (Exception ex) { + // ignore + log.warn("Failed to close InputStream", ex); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/epam/aidial/core/storage/StorageCache.java b/src/main/java/com/epam/aidial/core/storage/StorageCache.java new file mode 100644 index 000000000..88a70862e --- /dev/null +++ b/src/main/java/com/epam/aidial/core/storage/StorageCache.java @@ -0,0 +1,23 @@ +package com.epam.aidial.core.storage; + +import com.epam.aidial.core.data.FileMetadata; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class StorageCache { + + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + + public void cache(String fileId, FileMetadata metadata) { + cache.put(fileId, metadata); + } + + public FileMetadata load(String fileId) { + return cache.get(fileId); + } + + public void remove(String fileId) { + cache.remove(fileId); + } +} diff --git a/src/main/resources/aidial.settings.json b/src/main/resources/aidial.settings.json index 6295c728a..f16acac9d 100644 --- a/src/main/resources/aidial.settings.json +++ b/src/main/resources/aidial.settings.json @@ -41,5 +41,12 @@ "identityProvider": { "jwksUrl": null, "appName": "dial" + }, + "storage": { + "provider" : "s3", + "endpoint" : "http://localhost:9000", + "identity": "access-key", + "credential": "secret-key", + "bucket": "dail" } } From 2fcc535264021c3c87c72f31c076fa26a7dc3b44 Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Thu, 16 Nov 2023 13:40:58 +0100 Subject: [PATCH 02/10] rework Files API --- .../com/epam/aidial/core/ProxyContext.java | 2 + .../core/controller/ControllerSelector.java | 33 +++--- .../core/controller/DeleteFileController.java | 14 ++- .../controller/DownloadFileController.java | 27 +++-- .../controller/FileMetadataController.java | 9 +- .../core/controller/UploadFileController.java | 11 +- .../epam/aidial/core/data/FileMetadata.java | 8 +- .../aidial/core/data/FileMetadataBase.java | 1 + .../epam/aidial/core/data/FolderMetadata.java | 4 +- .../aidial/core/security/ExtractedClaims.java | 2 +- .../core/security/IdentityProvider.java | 9 +- .../epam/aidial/core/storage/BlobStorage.java | 103 ++++++------------ .../aidial/core/storage/BlobStorageUtil.java | 36 +++++- .../aidial/core/storage/BlobWriteStream.java | 22 ++-- .../aidial/core/storage/StorageCache.java | 23 ---- 15 files changed, 136 insertions(+), 168 deletions(-) delete mode 100644 src/main/java/com/epam/aidial/core/storage/StorageCache.java diff --git a/src/main/java/com/epam/aidial/core/ProxyContext.java b/src/main/java/com/epam/aidial/core/ProxyContext.java index 677166338..5f1f32fd6 100644 --- a/src/main/java/com/epam/aidial/core/ProxyContext.java +++ b/src/main/java/com/epam/aidial/core/ProxyContext.java @@ -33,6 +33,7 @@ public class ProxyContext { private final HttpServerResponse response; private Deployment deployment; + private String userSub; private List userRoles; private String userHash; private TokenUsage tokenUsage; @@ -59,6 +60,7 @@ public ProxyContext(Config config, HttpServerRequest request, Key key, Extracted if (extractedClaims != null) { this.userRoles = extractedClaims.userRoles(); this.userHash = extractedClaims.userHash(); + this.userSub = extractedClaims.sub(); } } diff --git a/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java b/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java index f77cd478c..cc4e7f822 100644 --- a/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java +++ b/src/main/java/com/epam/aidial/core/controller/ControllerSelector.java @@ -31,10 +31,6 @@ public class ControllerSelector { private static final Pattern PATTERN_FILES = Pattern.compile("/v1/files(.*)"); - private static final Pattern PATTERN_FILE = Pattern.compile("/v1/files/([-a-zA-Z0-9]+)"); - - private static final Pattern PATTERN_FILES_METADATA = Pattern.compile("/v1/files/metadata([-a-zA-Z0-9/]*)"); - public Controller select(Proxy proxy, ProxyContext context) { String path = URLDecoder.decode(context.getRequest().path(), StandardCharsets.UTF_8); HttpMethod method = context.getRequest().method(); @@ -119,18 +115,17 @@ private static Controller selectGet(Proxy proxy, ProxyContext context, String pa return controller::getApplications; } - match = match(PATTERN_FILES_METADATA, path); + match = match(PATTERN_FILES, path); if (match != null) { String filePath = match.group(1); - FileMetadataController controller = new FileMetadataController(proxy, context); - return () -> controller.list(filePath); - } - - match = match(PATTERN_FILE, path); - if (match != null) { - DownloadFileController controller = new DownloadFileController(proxy, context); - String fileId = match.group(1); - return () -> controller.download(fileId); + String purpose = context.getRequest().params().get(DownloadFileController.PURPOSE_FILE_QUERY_PARAMETER); + if (DownloadFileController.QUERY_METADATA_QUERY_PARAMETER_VALUE.equals(purpose)) { + FileMetadataController controller = new FileMetadataController(proxy, context); + return () -> controller.list(filePath); + } else { + DownloadFileController controller = new DownloadFileController(proxy, context); + return () -> controller.download(filePath); + } } return null; @@ -146,20 +141,20 @@ private static Controller selectPost(Proxy proxy, ProxyContext context, String p } match = match(PATTERN_FILES, path); if (match != null) { - String filePath = match.group(1); + String relativeFilePath = match.group(1); UploadFileController controller = new UploadFileController(proxy, context); - return () -> controller.upload(filePath); + return () -> controller.upload(relativeFilePath); } return null; } private static Controller selectDelete(Proxy proxy, ProxyContext context, String path) { - Matcher match = match(PATTERN_FILE, path); + Matcher match = match(PATTERN_FILES, path); if (match != null) { - String fileId = match.group(1); + String relativeFilePath = match.group(1); DeleteFileController controller = new DeleteFileController(proxy, context); - return () -> controller.delete(fileId); + return () -> controller.delete(relativeFilePath); } return null; diff --git a/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java b/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java index 2341fee76..f99597c55 100644 --- a/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java @@ -3,6 +3,7 @@ import com.epam.aidial.core.Proxy; import com.epam.aidial.core.ProxyContext; import com.epam.aidial.core.storage.BlobStorage; +import com.epam.aidial.core.storage.BlobStorageUtil; import com.epam.aidial.core.util.HttpStatus; import io.vertx.core.Future; import lombok.AllArgsConstructor; @@ -14,15 +15,16 @@ public class DeleteFileController { private final Proxy proxy; private final ProxyContext context; - public Future delete(String fileId) { + public Future delete(String filePath) { + String absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, filePath); BlobStorage storage = proxy.getStorage(); - Future result = proxy.getVertx().executeBlocking(future -> { + Future result = proxy.getVertx().executeBlocking(() -> { try { - storage.delete(fileId); - future.complete(); + storage.delete(absoluteFilePath); + return null; } catch (Exception ex) { - log.error("Failed to delete file " + fileId, ex); - future.fail(ex); + log.error("Failed to delete file " + absoluteFilePath, ex); + throw new RuntimeException(ex); } }); diff --git a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java index d1192afc0..59d711b36 100644 --- a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java @@ -2,6 +2,7 @@ import com.epam.aidial.core.Proxy; import com.epam.aidial.core.ProxyContext; +import com.epam.aidial.core.storage.BlobStorageUtil; import com.epam.aidial.core.storage.InputStreamReader; import com.epam.aidial.core.util.HttpStatus; import io.vertx.core.Future; @@ -20,14 +21,25 @@ @AllArgsConstructor public class DownloadFileController { + private static final String PATH_TYPE_QUERY_PARAMETER = "path"; + private static final String ABSOLUTE_PATH_TYPE = "absolute"; + + static final String PURPOSE_FILE_QUERY_PARAMETER = "purpose"; + + static final String QUERY_METADATA_QUERY_PARAMETER_VALUE = "metadata"; + private final Proxy proxy; private final ProxyContext context; - public Future download(String fileId) throws Exception { - Future blobFuture = proxy.getVertx().executeBlocking(result -> { - Blob blob = proxy.getStorage().load(fileId); - result.complete(blob); - }); + public Future download(String filePath) { + String pathType = context.getRequest().params().get(PATH_TYPE_QUERY_PARAMETER); + String absoluteFilePath; + if (!ABSOLUTE_PATH_TYPE.equals(pathType)) { + absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, filePath); + } else { + absoluteFilePath = filePath; + } + Future blobFuture = proxy.getVertx().executeBlocking(() -> proxy.getStorage().load(absoluteFilePath)); Promise result = Promise.promise(); blobFuture.onSuccess(blob -> { @@ -52,12 +64,13 @@ public Future download(String fileId) throws Exception { stream.pipeTo(response, result); result.future().onFailure(error -> { stream.close(); - context.getResponse().close(); + context.getResponse().reset(); }); } catch (IOException e) { result.fail(e); } - }).onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to fetch file with ID " + fileId)); + }).onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, + "Failed to fetch file with ID " + filePath)); return result.future(); } diff --git a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java index cb63b15b8..28953fdea 100644 --- a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java +++ b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java @@ -4,6 +4,7 @@ import com.epam.aidial.core.ProxyContext; import com.epam.aidial.core.data.FileMetadataBase; import com.epam.aidial.core.storage.BlobStorage; +import com.epam.aidial.core.storage.BlobStorageUtil; import com.epam.aidial.core.util.HttpStatus; import io.vertx.core.Future; import lombok.AllArgsConstructor; @@ -19,17 +20,17 @@ public class FileMetadataController { public Future list(String path) { BlobStorage storage = proxy.getStorage(); - return proxy.getVertx().executeBlocking(future -> { + return proxy.getVertx().executeBlocking(() -> { try { - String recursive = context.getRequest().params().get("recursive"); - List metadata = storage.listMetadata(path, Boolean.parseBoolean(recursive)); + String absolutePath = BlobStorageUtil.buildAbsoluteFilePath(context, path); + List metadata = storage.listMetadata(absolutePath); context.respond(HttpStatus.OK, metadata); } catch (Exception ex) { log.error("Failed to list files", ex); context.respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to list files metadata"); } - future.complete(); + return null; }); } } diff --git a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java index c76958625..b6f8af504 100644 --- a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java @@ -8,14 +8,11 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.impl.MimeMapping; import io.vertx.core.streams.Pipe; import io.vertx.core.streams.impl.PipeImpl; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.util.UUID; - @Slf4j @RequiredArgsConstructor public class UploadFileController { @@ -24,22 +21,18 @@ public class UploadFileController { private final ProxyContext context; public Future upload(String path) { - String fileId = UUID.randomUUID().toString(); + String absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, path); Promise result = Promise.promise(); context.getRequest() .setExpectMultipart(true) .uploadHandler(upload -> { String filename = upload.filename(); - String mimeType = MimeMapping.getMimeTypeForFilename(filename); - String contentType = mimeType == null ? "application/octet-stream" : mimeType; Pipe pipe = new PipeImpl<>(upload).endOnFailure(false); BlobWriteStream writeStream = new BlobWriteStream( proxy.getVertx(), proxy.getStorage(), filename, - contentType, - fileId, - BlobStorageUtil.removeLeadingAndTrailingPathSeparators(path)); + absoluteFilePath); pipe.to(writeStream, result); result.future() diff --git a/src/main/java/com/epam/aidial/core/data/FileMetadata.java b/src/main/java/com/epam/aidial/core/data/FileMetadata.java index 09bed437e..8f2b90273 100644 --- a/src/main/java/com/epam/aidial/core/data/FileMetadata.java +++ b/src/main/java/com/epam/aidial/core/data/FileMetadata.java @@ -4,15 +4,11 @@ @Getter public class FileMetadata extends FileMetadataBase { - String id; - String path; long contentLength; String contentType; - public FileMetadata(String id, String name, String path, long contentLength, String contentType) { - super(name, FileType.FILE); - this.id = id; - this.path = path; + public FileMetadata(String name, String path, long contentLength, String contentType) { + super(name, path, FileType.FILE); this.contentLength = contentLength; this.contentType = contentType; } diff --git a/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java b/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java index bac27802a..095a26cb8 100644 --- a/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java +++ b/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java @@ -7,5 +7,6 @@ @Data public abstract class FileMetadataBase { private String name; + private String path; private FileType type; } diff --git a/src/main/java/com/epam/aidial/core/data/FolderMetadata.java b/src/main/java/com/epam/aidial/core/data/FolderMetadata.java index 3db92fbf9..16d060d51 100644 --- a/src/main/java/com/epam/aidial/core/data/FolderMetadata.java +++ b/src/main/java/com/epam/aidial/core/data/FolderMetadata.java @@ -2,7 +2,7 @@ public class FolderMetadata extends FileMetadataBase { - public FolderMetadata(String name) { - super(name, FileType.FOLDER); + public FolderMetadata(String name, String path) { + super(name, path, FileType.FOLDER); } } diff --git a/src/main/java/com/epam/aidial/core/security/ExtractedClaims.java b/src/main/java/com/epam/aidial/core/security/ExtractedClaims.java index 3ced3d7e4..65f3407bf 100644 --- a/src/main/java/com/epam/aidial/core/security/ExtractedClaims.java +++ b/src/main/java/com/epam/aidial/core/security/ExtractedClaims.java @@ -2,5 +2,5 @@ import java.util.List; -public record ExtractedClaims(List userRoles, String userHash) { +public record ExtractedClaims(String sub, List userRoles, String userHash) { } diff --git a/src/main/java/com/epam/aidial/core/security/IdentityProvider.java b/src/main/java/com/epam/aidial/core/security/IdentityProvider.java index 0ef61b491..e1740a2ec 100644 --- a/src/main/java/com/epam/aidial/core/security/IdentityProvider.java +++ b/src/main/java/com/epam/aidial/core/security/IdentityProvider.java @@ -27,7 +27,7 @@ @Slf4j public class IdentityProvider { - public static final ExtractedClaims CLAIMS_WITH_EMPTY_ROLES = new ExtractedClaims(Collections.emptyList(), null); + public static final ExtractedClaims CLAIMS_WITH_EMPTY_ROLES = new ExtractedClaims(null, Collections.emptyList(), null); private final String appName; @@ -147,6 +147,10 @@ private DecodedJWT verifyJwt(String encodedToken, JwkResult jwkResult) { } } + private String extractUserSub(DecodedJWT decodedJWT) { + return decodedJWT.getClaim("sub").asString(); + } + private String extractUserHash(DecodedJWT decodedJwt) { String keyClaim = decodedJwt.getClaim(loggingKey).asString(); if (keyClaim != null && obfuscateUserEmail) { @@ -180,7 +184,8 @@ public Future extractClaimsFromEncodedToken(String encodedToken } Future decodedJwt = isJwtMustBeVerified ? decodeAndVerifyJwtToken(encodedToken) : Future.succeededFuture(decodeJwtToken(encodedToken)); - return decodedJwt.map(jwt -> new ExtractedClaims(extractUserRoles(jwt), extractUserHash(jwt))); + return decodedJwt.map(jwt -> new ExtractedClaims(extractUserSub(jwt), extractUserRoles(jwt), + extractUserHash(jwt))); } private record JwkResult(Jwk jwk, Exception error, long expirationTime) { diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java index c134a068b..3da85d4d9 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java @@ -31,12 +31,8 @@ @Slf4j public class BlobStorage { - private static final String FILE_NAME_METADATA_KEY = "file_name"; - private static final String CONTENT_TYPE_METADATA_KEY = "content_type"; - private final BlobStoreContext storeContext; private final BlobStore blobStore; - private final StorageCache cache; private final String bucketName; public BlobStorage(Storage config) { @@ -48,12 +44,11 @@ public BlobStorage(Storage config) { this.storeContext = builder.buildView(BlobStoreContext.class); this.blobStore = storeContext.getBlobStore(); this.bucketName = config.getBucket(); - this.cache = new StorageCache(); createBucketIfNeeded(config); } - public MultipartUpload initMultipartUpload(String fileId, String path, String resourceName, String contentType) { - BlobMetadata metadata = buildBlobMetadata(fileId, path, resourceName, contentType, bucketName); + public MultipartUpload initMultipartUpload(String fileName, String path, String contentType) { + BlobMetadata metadata = buildBlobMetadata(fileName, path, contentType, bucketName); return blobStore.initiateMultipartUpload(bucketName, metadata, PutOptions.NONE); } @@ -61,10 +56,8 @@ public MultipartPart storeMultipartPart(MultipartUpload multipart, int part, Buf return blobStore.uploadMultipartPart(multipart, part, new BufferPayload(buffer)); } - public void completeMultipartUpload(FileMetadata metadata, MultipartUpload multipart, List parts) { + public void completeMultipartUpload(MultipartUpload multipart, List parts) { String etag = blobStore.completeMultipartUpload(multipart, parts); - // temp fileId to resource linking - cache.cache(metadata.getId(), metadata); log.info("Stored etag: " + etag); } @@ -74,57 +67,36 @@ public void abortMultipartUpload(MultipartUpload multipart) { public void store(FileMetadata metadata, Buffer data) { String fileName = metadata.getName(); - String fileId = metadata.getId(); String contentType = metadata.getContentType(); - String parentPath = metadata.getPath(); - Map userMetadata = buildUserMetadata(fileName, contentType); - String filePath = BlobStorageUtil.buildFilePath(parentPath, fileId); + String path = metadata.getPath(); + String filePath = BlobStorageUtil.buildFilePath(fileName, path); Blob blob = blobStore.blobBuilder(filePath) .payload(new BufferPayload(data)) .contentLength(data.length()) .contentType(contentType) - .userMetadata(userMetadata) .build(); String etag = blobStore.putBlob(bucketName, blob); - // temp fileId to resource linking - cache.cache(fileId, metadata); log.info("Stored etag: " + etag); } - public Blob load(String fileId) { - FileMetadata metadata = cache.load(fileId); - if (metadata == null) { - return null; - } - String parentPath = metadata.getPath(); - String id = metadata.getId(); - String resourceLocation = BlobStorageUtil.buildFilePath(parentPath, id); - return blobStore.getBlob(bucketName, resourceLocation); + public Blob load(String absoluteFilePath) { + return blobStore.getBlob(bucketName, absoluteFilePath); } - public void delete(String fileId) { - FileMetadata metadata = cache.load(fileId); - if (metadata == null) { - throw new IllegalArgumentException("File with ID %s not found".formatted(fileId)); - } - String parentPath = metadata.getPath(); - String id = metadata.getId(); - String resourceLocation = BlobStorageUtil.buildFilePath(parentPath, id); - blobStore.removeBlob(bucketName, resourceLocation); - cache.remove(fileId); + public void delete(String absoluteFilePath) { + blobStore.removeBlob(bucketName, absoluteFilePath); } - public List listMetadata(String path, boolean recursive) { + public List listMetadata(String path) { List metadata = new ArrayList<>(); - ListContainerOptions options = buildListContainerOptions(BlobStorageUtil.normalizePathForQuery(path), recursive); + ListContainerOptions options = buildListContainerOptions(BlobStorageUtil.normalizePathForQuery(path)); PageSet list = blobStore.list(bucketName, options); list.forEach(meta -> { StorageType objectType = meta.getType(); switch (objectType) { case BLOB -> metadata.add(buildFileMetadata(meta)); - case FOLDER, RELATIVE_PATH -> - metadata.add(new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(meta.getName()))); + case FOLDER, RELATIVE_PATH -> metadata.add(buildFolderMetadata(meta)); default -> throw new IllegalArgumentException("Can't list container"); } }); @@ -132,45 +104,36 @@ public List listMetadata(String path, boolean recursive) { return metadata; } - private static ListContainerOptions buildListContainerOptions(String prefix, boolean recursive) { - ListContainerOptions options = new ListContainerOptions() - .withDetails() - .delimiter(BlobStorageUtil.PATH_SEPARATOR); - if (prefix != null) { - options.prefix(prefix); - } - if (recursive) { - options.recursive(); - } - - return options; + public void close() { + storeContext.close(); } - private static FileMetadata buildFileMetadata(StorageMetadata metadata) { - Map userMetadata = metadata.getUserMetadata(); - String fullFileName = metadata.getName(); - String fileName = userMetadata.get(FILE_NAME_METADATA_KEY); - String contentType = userMetadata.get(CONTENT_TYPE_METADATA_KEY); - String[] elements = fullFileName.split(BlobStorageUtil.PATH_SEPARATOR); - String fileId = elements[elements.length - 1]; - // strip /UUID if needed - String parentPath = elements.length > 1 ? fullFileName.substring(0, fullFileName.length() - 37) : null; - return new FileMetadata(fileId, fileName, parentPath, metadata.getSize(), contentType); + private static ListContainerOptions buildListContainerOptions(String prefix) { + return new ListContainerOptions() + .prefix(prefix) + .delimiter(BlobStorageUtil.PATH_SEPARATOR); } - private static Map buildUserMetadata(String fileName, String contentType) { - return Map.of(FILE_NAME_METADATA_KEY, fileName, CONTENT_TYPE_METADATA_KEY, contentType); + private static FileMetadata buildFileMetadata(StorageMetadata metadata) { + String absoluteFilePath = metadata.getName(); + String[] elements = absoluteFilePath.split(BlobStorageUtil.PATH_SEPARATOR); + String fileName = elements[elements.length - 1]; + String path = absoluteFilePath.substring(0, absoluteFilePath.length() - fileName.length() - 1); + return new FileMetadata(fileName, path, metadata.getSize(), BlobStorageUtil.getContentType(fileName)); } - public void close() { - storeContext.close(); + private static FolderMetadata buildFolderMetadata(StorageMetadata metadata) { + String absoluteFolderPath = metadata.getName(); + String[] elements = absoluteFolderPath.split(BlobStorageUtil.PATH_SEPARATOR); + String lastFolderName = elements[elements.length - 1]; + String path = absoluteFolderPath.substring(0, absoluteFolderPath.length() - lastFolderName.length() - 1); + return new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(lastFolderName), path); } - private static BlobMetadata buildBlobMetadata(String fileId, String path, String resourceName, String contentType, String bucketName) { - Map userMetadata = buildUserMetadata(resourceName, contentType); - String filePath = BlobStorageUtil.buildFilePath(path, fileId); + private static BlobMetadata buildBlobMetadata(String fileName, String path, String contentType, String bucketName) { + String filePath = BlobStorageUtil.buildFilePath(fileName, path); ContentMetadata contentMetadata = buildContentMetadata(contentType); - return new BlobMetadataImpl(fileId, filePath, null, null, null, null, null, userMetadata, null, bucketName, contentMetadata, null, Tier.STANDARD); + return new BlobMetadataImpl(null, filePath, null, null, null, null, null, Map.of(), null, bucketName, contentMetadata, null, Tier.STANDARD); } private static ContentMetadata buildContentMetadata(String contentType) { diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java index 25c40a596..2f7fdd36c 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java @@ -1,10 +1,16 @@ package com.epam.aidial.core.storage; +import com.epam.aidial.core.ProxyContext; +import io.vertx.core.http.impl.MimeMapping; import lombok.experimental.UtilityClass; @UtilityClass public class BlobStorageUtil { + private static final String USER_ROOT_DIR_PATTERN = "Users/%s/files/%s"; + + private static final String API_KEY_ROOT_DIR_PATTERN = "Keys/%s/files/%s"; + public static final String PATH_SEPARATOR = "/"; private static final char DELIMITER = PATH_SEPARATOR.charAt(0); @@ -24,8 +30,8 @@ public String normalizePathForQuery(String path) { } public String removeLeadingAndTrailingPathSeparators(String path) { - if (path == null || path.isBlank() || path.equals("/")) { - return null; + if (path == null || path.isBlank() || path.equals(PATH_SEPARATOR)) { + return ""; } if (path.charAt(0) == DELIMITER) { path = path.substring(1); @@ -38,11 +44,29 @@ public String removeLeadingAndTrailingPathSeparators(String path) { return path; } - public String buildFilePath(String parentPath, String fileId) { - if (parentPath == null) { - return fileId; + public String buildFilePath(String fileName, String path) { + if (path.charAt(path.length() - 1) == DELIMITER) { + return path + fileName; } - return parentPath + PATH_SEPARATOR + fileId; + return path + PATH_SEPARATOR + fileName; + } + + public String getContentType(String fileName) { + String mimeType = MimeMapping.getMimeTypeForFilename(fileName); + return mimeType == null ? "application/octet-stream" : mimeType; + } + + public String buildAbsoluteFilePath(ProxyContext context, String path) { + return buildAbsoluteFilePath(context.getUserSub(), context.getKey().getProject(), path); + } + + private String buildAbsoluteFilePath(String userSub, String apiKeyId, String path) { + path = removeLeadingAndTrailingPathSeparators(path); + if (userSub != null) { + return USER_ROOT_DIR_PATTERN.formatted(userSub, path); + } else { + return API_KEY_ROOT_DIR_PATTERN.formatted(apiKeyId, path); + } } } diff --git a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java index d71b3e18b..9f6c6686f 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java @@ -22,7 +22,6 @@ public class BlobWriteStream implements WriteStream { private final Vertx vertx; private final BlobStorage storage; - private final String fileId; private final String parentPath; private final String fileName; private final String contentType; @@ -47,15 +46,12 @@ public class BlobWriteStream implements WriteStream { public BlobWriteStream(Vertx vertx, BlobStorage storage, String fileName, - String contentType, - String fileId, String parentPath) { this.vertx = vertx; this.storage = storage; this.fileName = fileName; - this.contentType = contentType; - this.fileId = fileId; this.parentPath = parentPath; + this.contentType = BlobStorageUtil.getContentType(fileName); } @Override @@ -93,15 +89,14 @@ public synchronized void write(Buffer data, Handler> handler) @Override public void end(Handler> handler) { - Future result = vertx.executeBlocking(promise -> { + Future result = vertx.executeBlocking(() -> { synchronized (this) { if (exception != null) { - promise.fail(exception); - return; + throw new RuntimeException(exception); } Buffer lastChunk = chunkBuffer.slice(0, position); - metadata = new FileMetadata(fileId, fileName, parentPath, bytesHandled, contentType); + metadata = new FileMetadata(fileName, parentPath, bytesHandled, contentType); if (mpu == null) { log.info("Resource is too small for multipart upload, sending as a regular blob"); storage.store(metadata, lastChunk); @@ -111,11 +106,11 @@ public void end(Handler> handler) { parts.add(part); } - storage.completeMultipartUpload(metadata, mpu, parts); + storage.completeMultipartUpload(mpu, parts); log.info("Multipart upload committed, bytes handled {}", bytesHandled); } - promise.complete(); + return null; } }); result.onSuccess(success -> { @@ -143,11 +138,11 @@ public synchronized boolean writeQueueFull() { @Override public WriteStream drainHandler(Handler handler) { - vertx.executeBlocking((promise) -> { + vertx.executeBlocking(() -> { synchronized (this) { try { if (mpu == null) { - mpu = storage.initMultipartUpload(fileId, parentPath, fileName, contentType); + mpu = storage.initMultipartUpload(parentPath, fileName, contentType); } MultipartPart part = storage.storeMultipartPart(mpu, ++chunkNumber, chunkBuffer.slice(0, position)); parts.add(part); @@ -161,6 +156,7 @@ public WriteStream drainHandler(Handler handler) { } } } + return null; }); return this; diff --git a/src/main/java/com/epam/aidial/core/storage/StorageCache.java b/src/main/java/com/epam/aidial/core/storage/StorageCache.java deleted file mode 100644 index 88a70862e..000000000 --- a/src/main/java/com/epam/aidial/core/storage/StorageCache.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.epam.aidial.core.storage; - -import com.epam.aidial.core.data.FileMetadata; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class StorageCache { - - private final ConcurrentMap cache = new ConcurrentHashMap<>(); - - public void cache(String fileId, FileMetadata metadata) { - cache.put(fileId, metadata); - } - - public FileMetadata load(String fileId) { - return cache.get(fileId); - } - - public void remove(String fileId) { - cache.remove(fileId); - } -} From 4f4370dd265aa5c0ab5503809c711eeb940e82a8 Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Thu, 16 Nov 2023 13:57:56 +0100 Subject: [PATCH 03/10] fix checkstyle issue --- .../java/com/epam/aidial/core/security/IdentityProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/security/IdentityProvider.java b/src/main/java/com/epam/aidial/core/security/IdentityProvider.java index e1740a2ec..c5f90aa30 100644 --- a/src/main/java/com/epam/aidial/core/security/IdentityProvider.java +++ b/src/main/java/com/epam/aidial/core/security/IdentityProvider.java @@ -147,8 +147,8 @@ private DecodedJWT verifyJwt(String encodedToken, JwkResult jwkResult) { } } - private String extractUserSub(DecodedJWT decodedJWT) { - return decodedJWT.getClaim("sub").asString(); + private String extractUserSub(DecodedJWT decodedJwt) { + return decodedJwt.getClaim("sub").asString(); } private String extractUserHash(DecodedJWT decodedJwt) { From ae6267d662fed2a807bae23c993890dcb193cca8 Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Thu, 16 Nov 2023 17:18:28 +0100 Subject: [PATCH 04/10] fix download file endpoint --- .../epam/aidial/core/controller/DownloadFileController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java index 59d711b36..8ab55a7b1 100644 --- a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java @@ -39,7 +39,8 @@ public Future download(String filePath) { } else { absoluteFilePath = filePath; } - Future blobFuture = proxy.getVertx().executeBlocking(() -> proxy.getStorage().load(absoluteFilePath)); + Future blobFuture = proxy.getVertx().executeBlocking(() -> + proxy.getStorage().load(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(absoluteFilePath))); Promise result = Promise.promise(); blobFuture.onSuccess(blob -> { From 33c1871a2afef0a45637fb849fca5c7f80169e1f Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Fri, 17 Nov 2023 13:44:47 +0100 Subject: [PATCH 05/10] add javadocs --- .../core/controller/DeleteFileController.java | 10 +++- .../controller/DownloadFileController.java | 15 +++-- .../controller/FileMetadataController.java | 6 ++ .../core/controller/UploadFileController.java | 7 +++ .../epam/aidial/core/storage/BlobStorage.java | 59 ++++++++++++++++--- .../aidial/core/storage/BlobStorageUtil.java | 25 ++++---- .../aidial/core/storage/BlobWriteStream.java | 12 +++- .../core/storage/InputStreamReader.java | 25 ++++---- 8 files changed, 119 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java b/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java index f99597c55..ae9945cf6 100644 --- a/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DeleteFileController.java @@ -15,8 +15,14 @@ public class DeleteFileController { private final Proxy proxy; private final ProxyContext context; - public Future delete(String filePath) { - String absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, filePath); + /** + * Deletes file from storage. + * Current API implementation requires a relative path, absolute path will be calculated based on authentication context + * + * @param path relative path, for example: /inputs/data.csv + */ + public Future delete(String path) { + String absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, path); BlobStorage storage = proxy.getStorage(); Future result = proxy.getVertx().executeBlocking(() -> { try { diff --git a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java index 8ab55a7b1..13d06331c 100644 --- a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java @@ -31,13 +31,20 @@ public class DownloadFileController { private final Proxy proxy; private final ProxyContext context; - public Future download(String filePath) { + /** + * Downloads file content from provided path. + * Path can be either absolute or relative. + * Path type determined by "path" query parameter which can be "absolute" or "relative"(default value) + * + * @param path file path; absolute or relative + */ + public Future download(String path) { String pathType = context.getRequest().params().get(PATH_TYPE_QUERY_PARAMETER); String absoluteFilePath; if (!ABSOLUTE_PATH_TYPE.equals(pathType)) { - absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, filePath); + absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, path); } else { - absoluteFilePath = filePath; + absoluteFilePath = path; } Future blobFuture = proxy.getVertx().executeBlocking(() -> proxy.getStorage().load(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(absoluteFilePath))); @@ -71,7 +78,7 @@ public Future download(String filePath) { result.fail(e); } }).onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, - "Failed to fetch file with ID " + filePath)); + "Failed to fetch file with ID " + path)); return result.future(); } diff --git a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java index 28953fdea..6df275d46 100644 --- a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java +++ b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java @@ -18,6 +18,12 @@ public class FileMetadataController { private final Proxy proxy; private final ProxyContext context; + /** + * Lists all files and folders that belong to the provided path. + * Current API implementation requires a relative path, absolute path will be calculated based on authentication context + * + * @param path relative path, for example: /inputs + */ public Future list(String path) { BlobStorage storage = proxy.getStorage(); return proxy.getVertx().executeBlocking(() -> { diff --git a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java index b6f8af504..d2ac80ca1 100644 --- a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java @@ -20,6 +20,13 @@ public class UploadFileController { private final Proxy proxy; private final ProxyContext context; + /** + * Uploads file to the storage. + * Current API implementation requires a relative path, absolute path will be calculated based on authentication context. + * File name defined in multipart/form-data context + * + * @param path relative path, for example: /inputs + */ public Future upload(String path) { String absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, path); Promise result = Promise.promise(); diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java index 3da85d4d9..a5076f20b 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java @@ -47,28 +47,55 @@ public BlobStorage(Storage config) { createBucketIfNeeded(config); } + /** + * Initialize s3 multipart upload + * + * @param fileName name of the file, for example: data.csv + * @param path absolute path according to the bucket, for example: Users/user1/files/input data + * @param contentType MIME type of the content, for example: text/csv + */ public MultipartUpload initMultipartUpload(String fileName, String path, String contentType) { BlobMetadata metadata = buildBlobMetadata(fileName, path, contentType, bucketName); return blobStore.initiateMultipartUpload(bucketName, metadata, PutOptions.NONE); } + /** + * Upload part/chunk of the file + * + * @param multipart MultipartUpload that chunk related to + * @param part chunk number, starting from 1 + * @param buffer data + */ public MultipartPart storeMultipartPart(MultipartUpload multipart, int part, Buffer buffer) { return blobStore.uploadMultipartPart(multipart, part, new BufferPayload(buffer)); } + /** + * Commit multipart upload. + * This method must be called after all parts/chunks uploaded + */ public void completeMultipartUpload(MultipartUpload multipart, List parts) { String etag = blobStore.completeMultipartUpload(multipart, parts); log.info("Stored etag: " + etag); } + /** + * Abort multipart upload. + * This method must be called if something was wrong during upload to clean up uploaded parts/chunks + */ public void abortMultipartUpload(MultipartUpload multipart) { blobStore.abortMultipartUpload(multipart); } - public void store(FileMetadata metadata, Buffer data) { - String fileName = metadata.getName(); - String contentType = metadata.getContentType(); - String path = metadata.getPath(); + /** + * Upload file in a single request + * + * @param fileName name of the file, for example: data.csv + * @param path absolute path according to the bucket, for example: Users/user1/files/input data + * @param contentType MIME type of the content, for example: text/csv + * @param data whole content data + */ + public void store(String fileName, String path, String contentType, Buffer data) { String filePath = BlobStorageUtil.buildFilePath(fileName, path); Blob blob = blobStore.blobBuilder(filePath) .payload(new BufferPayload(data)) @@ -80,14 +107,30 @@ public void store(FileMetadata metadata, Buffer data) { log.info("Stored etag: " + etag); } - public Blob load(String absoluteFilePath) { - return blobStore.getBlob(bucketName, absoluteFilePath); + /** + * Load file content from blob store + * + * @param filePath absolute file path, for example: Users/user1/files/inputs/data.csv + * @return Blob instance if file was found, null - otherwise + */ + public Blob load(String filePath) { + return blobStore.getBlob(bucketName, filePath); } - public void delete(String absoluteFilePath) { - blobStore.removeBlob(bucketName, absoluteFilePath); + /** + * Delete file content from blob store + * + * @param filePath absolute file path, for example: Users/user1/files/inputs/data.csv + */ + public void delete(String filePath) { + blobStore.removeBlob(bucketName, filePath); } + /** + * List all files/folder metadata for a given path + * + * @param path absolute path for a folder, for example: Users/user1/files + */ public List listMetadata(String path) { List metadata = new ArrayList<>(); ListContainerOptions options = buildListContainerOptions(BlobStorageUtil.normalizePathForQuery(path)); diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java index 2f7fdd36c..87c4a2ee6 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java @@ -33,22 +33,27 @@ public String removeLeadingAndTrailingPathSeparators(String path) { if (path == null || path.isBlank() || path.equals(PATH_SEPARATOR)) { return ""; } - if (path.charAt(0) == DELIMITER) { - path = path.substring(1); - } + path = removeLeadingPathSeparator(path); + return removeTrailingPathSeparator(path); + } - if (path.charAt(path.length() - 1) == DELIMITER) { - path = path.substring(0, path.length() - 1); + public String removeLeadingPathSeparator(String path) { + if (path == null) { + return null; } - - return path; + return path.charAt(0) == DELIMITER ? path.substring(1) : path; } - public String buildFilePath(String fileName, String path) { - if (path.charAt(path.length() - 1) == DELIMITER) { - return path + fileName; + public String removeTrailingPathSeparator(String path) { + if (path == null) { + return null; } + int length = path.length(); + return path.charAt(length - 1) == DELIMITER ? path.substring(0, length - 1) : path; + } + public String buildFilePath(String fileName, String path) { + path = removeTrailingPathSeparator(path); return path + PATH_SEPARATOR + fileName; } diff --git a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java index 9f6c6686f..f0e4f7fd8 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java @@ -15,6 +15,12 @@ import java.util.ArrayList; import java.util.List; +/** + * Implementation of vertx {@link io.vertx.core.streams.WriteStream} that handles data chunks (from {@link io.vertx.core.streams.ReadStream}) and writes them to the blob storage. + * If file content is bigger than 5MB - multipart upload will be used. + * Chunk size can be configured via {@link #setWriteQueueMaxSize(int)} method, but should be no less than 5 MB according to the s3 specification. + * If any exception is caught in between - multipart upload will be aborted. + */ @Slf4j public class BlobWriteStream implements WriteStream { @@ -90,7 +96,7 @@ public synchronized void write(Buffer data, Handler> handler) @Override public void end(Handler> handler) { Future result = vertx.executeBlocking(() -> { - synchronized (this) { + synchronized (BlobWriteStream.this) { if (exception != null) { throw new RuntimeException(exception); } @@ -99,7 +105,7 @@ public void end(Handler> handler) { metadata = new FileMetadata(fileName, parentPath, bytesHandled, contentType); if (mpu == null) { log.info("Resource is too small for multipart upload, sending as a regular blob"); - storage.store(metadata, lastChunk); + storage.store(fileName, parentPath, contentType, lastChunk); } else { if (position != 0) { MultipartPart part = storage.storeMultipartPart(mpu, ++chunkNumber, lastChunk); @@ -139,7 +145,7 @@ public synchronized boolean writeQueueFull() { @Override public WriteStream drainHandler(Handler handler) { vertx.executeBlocking(() -> { - synchronized (this) { + synchronized (BlobWriteStream.this) { try { if (mpu == null) { mpu = storage.initMultipartUpload(parentPath, fileName, contentType); diff --git a/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java b/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java index 558c917ce..f7f91062a 100644 --- a/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java +++ b/src/main/java/com/epam/aidial/core/storage/InputStreamReader.java @@ -6,6 +6,7 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.streams.Pipe; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; @@ -15,6 +16,9 @@ import java.io.InputStream; +/** + * Implementation of vertx {@link io.vertx.core.streams.ReadStream} that wraps {@link java.io.InputStream}. + */ @Slf4j public class InputStreamReader implements ReadStream { @@ -91,14 +95,13 @@ public synchronized ReadStream fetch(long amount) { return this; } - private synchronized void readDataFromStream() { - Future fetchResult = vertx.executeBlocking(future -> { + private void readDataFromStream() { + Future fetchResult = vertx.executeBlocking(() -> { try { byte[] data = in.readNBytes(bufferSize); - Buffer chunk = Buffer.buffer(Unpooled.wrappedBuffer(data)); - future.complete(chunk); + return BufferImpl.buffer(Unpooled.wrappedBuffer(data)); } catch (Exception e) { - future.fail(e); + throw new RuntimeException(e); } }); fetchResult.onSuccess(buf -> { @@ -109,7 +112,7 @@ private synchronized void readDataFromStream() { }).onFailure(error -> { log.info("Failed to read data from InputStream", error); close(); - synchronized (this) { + synchronized (InputStreamReader.this) { if (exceptionHandler != null) { exceptionHandler.handle(error); } @@ -129,13 +132,9 @@ private void handleData(Buffer buff) { private synchronized void handleEnd() { close(); - Handler endHandler; - synchronized (this) { - this.dataHandler.handle(Buffer.buffer()); - dataHandler = null; - endHandler = this.endHandler; - } - endHandler.handle(null); + this.dataHandler.handle(Buffer.buffer()); + dataHandler = null; + this.endHandler.handle(null); } public synchronized void close() { From eff239b46608a9ef3cd183809e326c3f26428879 Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Fri, 17 Nov 2023 17:47:16 +0100 Subject: [PATCH 06/10] fix multipart upload issue --- src/main/java/com/epam/aidial/core/storage/BlobStorage.java | 3 ++- .../java/com/epam/aidial/core/storage/BlobWriteStream.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java index a5076f20b..c2dff75a3 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java @@ -170,7 +170,8 @@ private static FolderMetadata buildFolderMetadata(StorageMetadata metadata) { String[] elements = absoluteFolderPath.split(BlobStorageUtil.PATH_SEPARATOR); String lastFolderName = elements[elements.length - 1]; String path = absoluteFolderPath.substring(0, absoluteFolderPath.length() - lastFolderName.length() - 1); - return new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(lastFolderName), path); + return new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(lastFolderName), + BlobStorageUtil.removeTrailingPathSeparator(path)); } private static BlobMetadata buildBlobMetadata(String fileName, String path, String contentType, String bucketName) { diff --git a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java index f0e4f7fd8..65a4e340e 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java @@ -102,7 +102,8 @@ public void end(Handler> handler) { } Buffer lastChunk = chunkBuffer.slice(0, position); - metadata = new FileMetadata(fileName, parentPath, bytesHandled, contentType); + metadata = new FileMetadata(fileName, BlobStorageUtil.removeTrailingPathSeparator(parentPath), + bytesHandled, contentType); if (mpu == null) { log.info("Resource is too small for multipart upload, sending as a regular blob"); storage.store(fileName, parentPath, contentType, lastChunk); @@ -148,7 +149,7 @@ public WriteStream drainHandler(Handler handler) { synchronized (BlobWriteStream.this) { try { if (mpu == null) { - mpu = storage.initMultipartUpload(parentPath, fileName, contentType); + mpu = storage.initMultipartUpload(fileName, parentPath, contentType); } MultipartPart part = storage.storeMultipartPart(mpu, ++chunkNumber, chunkBuffer.slice(0, position)); parts.add(part); From cd90ce0df3b4a14b43bc53f69770aaf0c21f6caa Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Tue, 21 Nov 2023 04:21:47 +0100 Subject: [PATCH 07/10] add file API tests --- build.gradle | 4 + .../java/com/epam/aidial/core/AiDial.java | 56 +++- src/main/java/com/epam/aidial/core/Proxy.java | 19 +- .../com/epam/aidial/core/config/Storage.java | 5 + .../controller/DownloadFileController.java | 2 +- .../controller/FileMetadataController.java | 2 +- .../core/controller/UploadFileController.java | 2 +- .../epam/aidial/core/data/FileMetadata.java | 4 + .../aidial/core/data/FileMetadataBase.java | 2 + .../core/security/IdentityProvider.java | 2 +- .../epam/aidial/core/storage/BlobStorage.java | 67 +++-- .../aidial/core/storage/BlobWriteStream.java | 8 +- .../com/epam/aidial/core/FileApiTest.java | 269 ++++++++++++++++++ .../java/com/epam/aidial/core/TestFiles.java | 37 +++ src/test/resources/aidial.config.json | 94 ++++++ src/test/resources/aidial.settings.json | 45 +++ 16 files changed, 564 insertions(+), 54 deletions(-) create mode 100644 src/test/java/com/epam/aidial/core/FileApiTest.java create mode 100644 src/test/java/com/epam/aidial/core/TestFiles.java create mode 100644 src/test/resources/aidial.config.json create mode 100644 src/test/resources/aidial.settings.json diff --git a/build.gradle b/build.gradle index 4fdb0e192..411876b3c 100644 --- a/build.gradle +++ b/build.gradle @@ -44,6 +44,10 @@ dependencies { runtimeOnly 'com.epam.deltix:gflog-slf4j:3.0.0' testImplementation 'org.mockito:mockito-core:5.4.0' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3' + testImplementation 'commons-io:commons-io:2.11.0' + testImplementation 'io.vertx:vertx-web-client:4.4.6' + testImplementation 'io.vertx:vertx-junit5:4.4.6' + testImplementation 'org.apache.jclouds.api:filesystem:2.5.0' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.3' } diff --git a/src/main/java/com/epam/aidial/core/AiDial.java b/src/main/java/com/epam/aidial/core/AiDial.java index 503595abf..08939e99a 100644 --- a/src/main/java/com/epam/aidial/core/AiDial.java +++ b/src/main/java/com/epam/aidial/core/AiDial.java @@ -10,6 +10,7 @@ import com.epam.aidial.core.storage.BlobStorage; import com.epam.aidial.core.upstream.UpstreamBalancer; import com.epam.deltix.gflog.core.LogConfigurator; +import com.google.common.annotations.VisibleForTesting; import io.micrometer.registry.otlp.OtlpMeterRegistry; import io.vertx.config.spi.utils.JsonObjectHelper; import io.vertx.core.Future; @@ -25,6 +26,7 @@ import io.vertx.micrometer.MicrometerMetricsOptions; import lombok.extern.slf4j.Slf4j; +import java.io.Closeable; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -69,7 +71,6 @@ private void start() throws Exception { open(server, HttpServer::listen); log.info("Proxy started on {}", server.actualPort()); - Runtime.getRuntime().addShutdownHook(new Thread(this::stop, "shutdown-hook")); } catch (Throwable e) { log.warn("Proxy failed to start:", e); stop(); @@ -77,14 +78,44 @@ private void start() throws Exception { } } - private void stop() { + @VisibleForTesting + void start(BlobStorage storage) throws Exception { + try { + settings = settings(); + + VertxOptions vertxOptions = new VertxOptions(settings("vertx")); + setupMetrics(vertxOptions); + + vertx = Vertx.vertx(vertxOptions); + client = vertx.createHttpClient(new HttpClientOptions(settings("client"))); + + ConfigStore configStore = new FileConfigStore(vertx, settings("config")); + LogStore logStore = new GfLogStore(vertx); + RateLimiter rateLimiter = new RateLimiter(); + UpstreamBalancer upstreamBalancer = new UpstreamBalancer(); + + IdentityProvider identityProvider = new IdentityProvider(settings("identityProvider"), vertx); + this.storage = storage; + Proxy proxy = new Proxy(vertx, client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider, storage); + + server = vertx.createHttpServer(new HttpServerOptions(settings("server"))).requestHandler(proxy); + open(server, HttpServer::listen); + + log.info("Proxy started on {}", server.actualPort()); + } catch (Throwable e) { + log.warn("Proxy failed to start:", e); + stop(); + throw e; + } + } + + @VisibleForTesting + void stop() { try { close(server, HttpServer::close); close(client, HttpClient::close); close(vertx, Vertx::close); - if (storage != null) { - storage.close(); - } + close(storage); log.info("Proxy stopped"); LogConfigurator.unconfigure(); } catch (Throwable e) { @@ -94,6 +125,11 @@ private void stop() { } } + @VisibleForTesting + HttpServer getServer() { + return server; + } + private JsonObject settings(String key) { return settings.getJsonObject(key, new JsonObject()); } @@ -160,6 +196,12 @@ private static void close(R resource, AsyncCloser closer) throws Exceptio } } + private static void close(Closeable resource) throws IOException { + if (resource != null) { + resource.close(); + } + } + private interface AsyncOpener { Future open(R resource); } @@ -169,7 +211,9 @@ private interface AsyncCloser { } public static void main(String[] args) throws Exception { - new AiDial().start(); + AiDial dial = new AiDial(); + dial.start(); + Runtime.getRuntime().addShutdownHook(new Thread(dial::stop, "shutdown-hook")); } private static void setupMetrics(VertxOptions options) { diff --git a/src/main/java/com/epam/aidial/core/Proxy.java b/src/main/java/com/epam/aidial/core/Proxy.java index af8adeccd..0c03d2eeb 100644 --- a/src/main/java/com/epam/aidial/core/Proxy.java +++ b/src/main/java/com/epam/aidial/core/Proxy.java @@ -44,7 +44,8 @@ public class Proxy implements Handler { public static final String HEADER_UPSTREAM_ATTEMPTS = "X-UPSTREAM-ATTEMPTS"; public static final String HEADER_CONTENT_TYPE_APPLICATION_JSON = "application/json"; - public static final int REQUEST_BODY_MAX_SIZE = 512 * 1024 * 1024; + public static final int REQUEST_BODY_MAX_SIZE_BYTES = 16 * 1024 * 1024; + public static final int FILES_REQUEST_BODY_MAX_SIZE_BYTES = 512 * 1024 * 1024; private final Vertx vertx; private final HttpClient client; @@ -84,10 +85,18 @@ private void handleRequest(HttpServerRequest request) throws Exception { return; } - // not only the case, Content-Length can be missing when Transfer-Encoding: chunked - if (ProxyUtil.contentLength(request, 1024) > REQUEST_BODY_MAX_SIZE) { - respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large"); - return; + String contentType = request.getHeader(HttpHeaders.CONTENT_TYPE); + if (contentType != null && contentType.startsWith("multipart/form-data")) { + if (ProxyUtil.contentLength(request, 1024) > FILES_REQUEST_BODY_MAX_SIZE_BYTES) { + respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large"); + return; + } + } else { + // not only the case, Content-Length can be missing when Transfer-Encoding: chunked + if (ProxyUtil.contentLength(request, 1024) > REQUEST_BODY_MAX_SIZE_BYTES) { + respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large"); + return; + } } String path = URLDecoder.decode(request.path(), StandardCharsets.UTF_8); diff --git a/src/main/java/com/epam/aidial/core/config/Storage.java b/src/main/java/com/epam/aidial/core/config/Storage.java index f954b48a5..0502ce17f 100644 --- a/src/main/java/com/epam/aidial/core/config/Storage.java +++ b/src/main/java/com/epam/aidial/core/config/Storage.java @@ -27,4 +27,9 @@ public class Storage { * container name/root bucket */ String bucket; + + /** + * Indicates whether bucket should be created on start up + */ + boolean createBucket; } diff --git a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java index 13d06331c..e8b217247 100644 --- a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java @@ -78,7 +78,7 @@ public Future download(String path) { result.fail(e); } }).onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, - "Failed to fetch file with ID " + path)); + "Failed to fetch file with path " + path)); return result.future(); } diff --git a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java index 6df275d46..aabb1fb0e 100644 --- a/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java +++ b/src/main/java/com/epam/aidial/core/controller/FileMetadataController.java @@ -33,7 +33,7 @@ public Future list(String path) { context.respond(HttpStatus.OK, metadata); } catch (Exception ex) { log.error("Failed to list files", ex); - context.respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to list files metadata"); + context.respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to list files by path %s".formatted(path)); } return null; diff --git a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java index d2ac80ca1..4419d2451 100644 --- a/src/main/java/com/epam/aidial/core/controller/UploadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/UploadFileController.java @@ -47,7 +47,7 @@ public Future upload(String path) { .onFailure(error -> { writeStream.abortUpload(error); context.respond(HttpStatus.INTERNAL_SERVER_ERROR, - "Failed to upload file: " + error.getMessage()); + "Failed to upload file by path " + path); }); }); diff --git a/src/main/java/com/epam/aidial/core/data/FileMetadata.java b/src/main/java/com/epam/aidial/core/data/FileMetadata.java index 8f2b90273..1a1fe6403 100644 --- a/src/main/java/com/epam/aidial/core/data/FileMetadata.java +++ b/src/main/java/com/epam/aidial/core/data/FileMetadata.java @@ -1,8 +1,12 @@ package com.epam.aidial.core.data; import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; @Getter +@Setter +@NoArgsConstructor public class FileMetadata extends FileMetadataBase { long contentLength; String contentType; diff --git a/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java b/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java index 095a26cb8..d75dc6b8d 100644 --- a/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java +++ b/src/main/java/com/epam/aidial/core/data/FileMetadataBase.java @@ -2,9 +2,11 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @AllArgsConstructor @Data +@NoArgsConstructor public abstract class FileMetadataBase { private String name; private String path; diff --git a/src/main/java/com/epam/aidial/core/security/IdentityProvider.java b/src/main/java/com/epam/aidial/core/security/IdentityProvider.java index c5f90aa30..ae92c8507 100644 --- a/src/main/java/com/epam/aidial/core/security/IdentityProvider.java +++ b/src/main/java/com/epam/aidial/core/security/IdentityProvider.java @@ -147,7 +147,7 @@ private DecodedJWT verifyJwt(String encodedToken, JwkResult jwkResult) { } } - private String extractUserSub(DecodedJWT decodedJwt) { + private static String extractUserSub(DecodedJWT decodedJwt) { return decodedJwt.getClaim("sub").asString(); } diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java index c2dff75a3..b990538ee 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java @@ -4,6 +4,7 @@ import com.epam.aidial.core.data.FileMetadata; import com.epam.aidial.core.data.FileMetadataBase; import com.epam.aidial.core.data.FolderMetadata; +import com.google.common.annotations.VisibleForTesting; import io.vertx.core.buffer.Buffer; import lombok.extern.slf4j.Slf4j; import org.jclouds.ContextBuilder; @@ -15,7 +16,6 @@ import org.jclouds.blobstore.domain.MultipartUpload; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.blobstore.domain.StorageMetadata; -import org.jclouds.blobstore.domain.StorageType; import org.jclouds.blobstore.domain.Tier; import org.jclouds.blobstore.domain.internal.BlobMetadataImpl; import org.jclouds.blobstore.options.ListContainerOptions; @@ -24,22 +24,31 @@ import org.jclouds.io.ContentMetadataBuilder; import org.jclouds.io.payloads.BaseMutableContentMetadata; -import java.util.ArrayList; +import java.io.Closeable; import java.util.List; import java.util.Map; +import java.util.Properties; @Slf4j -public class BlobStorage { +public class BlobStorage implements Closeable { private final BlobStoreContext storeContext; private final BlobStore blobStore; private final String bucketName; public BlobStorage(Storage config) { + this(config, null); + } + + @VisibleForTesting + public BlobStorage(Storage config, Properties overrides) { ContextBuilder builder = ContextBuilder.newBuilder(config.getProvider()); if (config.getEndpoint() != null) { builder.endpoint(config.getEndpoint()); } + if (overrides != null) { + builder.overrides(overrides); + } builder.credentials(config.getIdentity(), config.getCredential()); this.storeContext = builder.buildView(BlobStoreContext.class); this.blobStore = storeContext.getBlobStore(); @@ -54,6 +63,7 @@ public BlobStorage(Storage config) { * @param path absolute path according to the bucket, for example: Users/user1/files/input data * @param contentType MIME type of the content, for example: text/csv */ + @SuppressWarnings("UnstableApiUsage") // multipart upload uses beta API public MultipartUpload initMultipartUpload(String fileName, String path, String contentType) { BlobMetadata metadata = buildBlobMetadata(fileName, path, contentType, bucketName); return blobStore.initiateMultipartUpload(bucketName, metadata, PutOptions.NONE); @@ -66,6 +76,7 @@ public MultipartUpload initMultipartUpload(String fileName, String path, String * @param part chunk number, starting from 1 * @param buffer data */ + @SuppressWarnings("UnstableApiUsage") // multipart upload uses beta API public MultipartPart storeMultipartPart(MultipartUpload multipart, int part, Buffer buffer) { return blobStore.uploadMultipartPart(multipart, part, new BufferPayload(buffer)); } @@ -74,15 +85,16 @@ public MultipartPart storeMultipartPart(MultipartUpload multipart, int part, Buf * Commit multipart upload. * This method must be called after all parts/chunks uploaded */ + @SuppressWarnings("UnstableApiUsage") // multipart upload uses beta API public void completeMultipartUpload(MultipartUpload multipart, List parts) { - String etag = blobStore.completeMultipartUpload(multipart, parts); - log.info("Stored etag: " + etag); + blobStore.completeMultipartUpload(multipart, parts); } /** * Abort multipart upload. * This method must be called if something was wrong during upload to clean up uploaded parts/chunks */ + @SuppressWarnings("UnstableApiUsage") // multipart upload uses beta API public void abortMultipartUpload(MultipartUpload multipart) { blobStore.abortMultipartUpload(multipart); } @@ -103,8 +115,7 @@ public void store(String fileName, String path, String contentType, Buffer data) .contentType(contentType) .build(); - String etag = blobStore.putBlob(bucketName, blob); - log.info("Stored etag: " + etag); + blobStore.putBlob(bucketName, blob); } /** @@ -132,21 +143,12 @@ public void delete(String filePath) { * @param path absolute path for a folder, for example: Users/user1/files */ public List listMetadata(String path) { - List metadata = new ArrayList<>(); ListContainerOptions options = buildListContainerOptions(BlobStorageUtil.normalizePathForQuery(path)); PageSet list = blobStore.list(bucketName, options); - list.forEach(meta -> { - StorageType objectType = meta.getType(); - switch (objectType) { - case BLOB -> metadata.add(buildFileMetadata(meta)); - case FOLDER, RELATIVE_PATH -> metadata.add(buildFolderMetadata(meta)); - default -> throw new IllegalArgumentException("Can't list container"); - } - }); - - return metadata; + return list.stream().map(BlobStorage::buildFileMetadata).toList(); } + @Override public void close() { storeContext.close(); } @@ -157,21 +159,20 @@ private static ListContainerOptions buildListContainerOptions(String prefix) { .delimiter(BlobStorageUtil.PATH_SEPARATOR); } - private static FileMetadata buildFileMetadata(StorageMetadata metadata) { + private static FileMetadataBase buildFileMetadata(StorageMetadata metadata) { String absoluteFilePath = metadata.getName(); String[] elements = absoluteFilePath.split(BlobStorageUtil.PATH_SEPARATOR); - String fileName = elements[elements.length - 1]; - String path = absoluteFilePath.substring(0, absoluteFilePath.length() - fileName.length() - 1); - return new FileMetadata(fileName, path, metadata.getSize(), BlobStorageUtil.getContentType(fileName)); - } + String lastElement = elements[elements.length - 1]; + String path = absoluteFilePath.substring(0, absoluteFilePath.length() - lastElement.length() - 1); - private static FolderMetadata buildFolderMetadata(StorageMetadata metadata) { - String absoluteFolderPath = metadata.getName(); - String[] elements = absoluteFolderPath.split(BlobStorageUtil.PATH_SEPARATOR); - String lastFolderName = elements[elements.length - 1]; - String path = absoluteFolderPath.substring(0, absoluteFolderPath.length() - lastFolderName.length() - 1); - return new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(lastFolderName), - BlobStorageUtil.removeTrailingPathSeparator(path)); + return switch (metadata.getType()) { + case BLOB -> + new FileMetadata(lastElement, path, metadata.getSize(), BlobStorageUtil.getContentType(lastElement)); + case FOLDER, RELATIVE_PATH -> + new FolderMetadata(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(lastElement), + BlobStorageUtil.removeTrailingPathSeparator(path)); + case CONTAINER -> throw new IllegalArgumentException("Can't list container"); + }; } private static BlobMetadata buildBlobMetadata(String fileName, String path, String contentType, String bucketName) { @@ -188,11 +189,7 @@ private static ContentMetadata buildContentMetadata(String contentType) { } private void createBucketIfNeeded(Storage config) { - if (config.getProvider().equals("google-cloud-storage")) { - // GCP service account do not have permissions to get bucket :( - return; - } - if (!storeContext.getBlobStore().containerExists(bucketName)) { + if (config.isCreateBucket() && !storeContext.getBlobStore().containerExists(bucketName)) { storeContext.getBlobStore().createContainerInLocation(null, bucketName); } } diff --git a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java index 65a4e340e..b965ea700 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobWriteStream.java @@ -24,7 +24,7 @@ @Slf4j public class BlobWriteStream implements WriteStream { - private static final int MIN_PART_SIZE = 5 * 1024 * 1024; + private static final int MIN_PART_SIZE_BYTES = 5 * 1024 * 1024; private final Vertx vertx; private final BlobStorage storage; @@ -33,7 +33,7 @@ public class BlobWriteStream implements WriteStream { private final String contentType; private final Buffer chunkBuffer = Buffer.buffer(); - private int chunkSize = MIN_PART_SIZE; + private int chunkSize = MIN_PART_SIZE_BYTES; private int position; private MultipartUpload mpu; private int chunkNumber = 0; @@ -133,7 +133,7 @@ public void end(Handler> handler) { @Override public synchronized WriteStream setWriteQueueMaxSize(int maxSize) { - assert maxSize > MIN_PART_SIZE; + assert maxSize > MIN_PART_SIZE_BYTES; chunkSize = maxSize; return this; } @@ -182,6 +182,6 @@ public synchronized void abortUpload(Throwable ex) { errorHandler.handle(ex); } - log.info("Multipart upload aborted"); + log.warn("Multipart upload aborted"); } } diff --git a/src/test/java/com/epam/aidial/core/FileApiTest.java b/src/test/java/com/epam/aidial/core/FileApiTest.java new file mode 100644 index 000000000..6c91e6566 --- /dev/null +++ b/src/test/java/com/epam/aidial/core/FileApiTest.java @@ -0,0 +1,269 @@ +package com.epam.aidial.core; + +import com.auth0.jwt.JWT; +import com.auth0.jwt.algorithms.Algorithm; +import com.epam.aidial.core.config.Storage; +import com.epam.aidial.core.data.FileMetadata; +import com.epam.aidial.core.storage.BlobStorage; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.codec.BodyCodec; +import io.vertx.ext.web.multipart.MultipartForm; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import lombok.extern.slf4j.Slf4j; +import org.jclouds.filesystem.reference.FilesystemConstants; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.nio.file.Path; +import java.util.Properties; + +@ExtendWith(VertxExtension.class) +@Slf4j +public class FileApiTest { + + private static final String TEST_FILE_CONTENT = "Test file content"; + + private static AiDial dial; + private static int serverPort; + private static Path testDir; + + @BeforeAll + public static void init() throws Exception { + // initialize server + dial = new AiDial(); + testDir = TestFiles.baseTestPath(FileApiTest.class); + BlobStorage storage = buildFsBlobStorage(testDir); + dial.start(storage); + serverPort = dial.getServer().actualPort(); + } + + @BeforeEach + public void setUp() { + // prepare test directory + TestFiles.createDir(testDir.resolve("test")); + } + + @AfterEach + public void clean() { + // clean test directory + TestFiles.deleteDir(testDir); + } + + @AfterAll + public static void destroy() { + // stop server + dial.stop(); + } + + @Test + public void testEmptyFilesList(Vertx vertx, VertxTestContext context) { + WebClient client = WebClient.create(vertx); + client.get(serverPort, "localhost", "/v1/files") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .addQueryParam("purpose", "metadata") + .as(BodyCodec.jsonArray()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(JsonArray.of(), response.body()); + context.completeNow(); + }); + })); + } + + @Test + public void testFileNotFound(Vertx vertx, VertxTestContext context) { + WebClient client = WebClient.create(vertx); + client.get(serverPort, "localhost", "/v1/files/test_file.txt") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.buffer()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(404, response.statusCode()); + Assertions.assertNull(response.body()); + context.completeNow(); + }); + })); + } + + @Test + public void testFileUpload(Vertx vertx, VertxTestContext context) { + Checkpoint checkpoint = context.checkpoint(3); + WebClient client = WebClient.create(vertx); + + // verify no files + client.get(serverPort, "localhost", "/v1/files") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .addQueryParam("purpose", "metadata") + .as(BodyCodec.jsonArray()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(JsonArray.of(), response.body()); + checkpoint.flag(); + }); + })); + + FileMetadata expectedFileMetadata = new FileMetadata("file.txt", "Users/User1/files", 17, "text/plain"); + + // upload test file + client.post(serverPort, "localhost", "/v1/files") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.json(FileMetadata.class)) + .sendMultipartForm(generateMultipartForm("file.txt", TEST_FILE_CONTENT), + context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(expectedFileMetadata, response.body()); + checkpoint.flag(); + }); + }) + ); + + // verify uploaded file can be listed + client.get(serverPort, "localhost", "/v1/files") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .addQueryParam("purpose", "metadata") + .as(BodyCodec.jsonArray()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertIterableEquals(JsonArray.of(JsonObject.mapFrom(expectedFileMetadata)), response.body()); + checkpoint.flag(); + }); + })); + } + + @Test + public void testFileDownload(Vertx vertx, VertxTestContext context) { + Checkpoint checkpoint = context.checkpoint(3); + WebClient client = WebClient.create(vertx); + + FileMetadata expectedFileMetadata = new FileMetadata("file.txt", "Users/User1/files/folder1", 17, "text/plain"); + + // upload test file + client.post(serverPort, "localhost", "/v1/files/folder1") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.json(FileMetadata.class)) + .sendMultipartForm(generateMultipartForm("file.txt", TEST_FILE_CONTENT), + context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(expectedFileMetadata, response.body()); + checkpoint.flag(); + }); + }) + ); + + // download by relative path + client.get(serverPort, "localhost", "/v1/files/folder1/file.txt") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.string()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(TEST_FILE_CONTENT, response.body()); + checkpoint.flag(); + }); + })); + + // download by absolute path + client.get(serverPort, "localhost", "/v1/files/Users/User1/files/folder1/file.txt") + .addQueryParam("path", "absolute") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User2")) + .as(BodyCodec.string()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(TEST_FILE_CONTENT, response.body()); + checkpoint.flag(); + }); + })); + } + + @Test + public void testFileDelete(Vertx vertx, VertxTestContext context) { + Checkpoint checkpoint = context.checkpoint(3); + WebClient client = WebClient.create(vertx); + + FileMetadata expectedFileMetadata = new FileMetadata("test_file.txt", "Users/User1/files", 17, "text/plain"); + + // upload test file + client.post(serverPort, "localhost", "/v1/files") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.json(FileMetadata.class)) + .sendMultipartForm(generateMultipartForm("test_file.txt", TEST_FILE_CONTENT), + context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + Assertions.assertEquals(expectedFileMetadata, response.body()); + checkpoint.flag(); + }); + }) + ); + + // delete file + client.delete(serverPort, "localhost", "/v1/files/test_file.txt") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.string()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(200, response.statusCode()); + checkpoint.flag(); + }); + })); + + // try to download deleted file + client.get(serverPort, "localhost", "/v1/files/test_file.txt") + .putHeader("Api-key", "proxyKey2") + .bearerTokenAuthentication(generateJwtToken("User1")) + .as(BodyCodec.string()) + .send(context.succeeding(response -> { + context.verify(() -> { + Assertions.assertEquals(404, response.statusCode()); + checkpoint.flag(); + }); + })); + } + + private static BlobStorage buildFsBlobStorage(Path baseDir) { + Properties properties = new Properties(); + properties.setProperty(FilesystemConstants.PROPERTY_BASEDIR, baseDir.toAbsolutePath().toString()); + Storage storageConfig = new Storage(); + storageConfig.setBucket("test"); + storageConfig.setProvider("filesystem"); + storageConfig.setIdentity("access-key"); + storageConfig.setCredential("secret-key"); + return new BlobStorage(storageConfig, properties); + } + + private static MultipartForm generateMultipartForm(String fileName, String content) { + return MultipartForm.create().textFileUpload("attachment", fileName, Buffer.buffer(content), "text/plain"); + } + + private static String generateJwtToken(String user) { + Algorithm algorithm = Algorithm.HMAC256("secret_key"); + return JWT.create().withClaim("sub", user).sign(algorithm); + } +} diff --git a/src/test/java/com/epam/aidial/core/TestFiles.java b/src/test/java/com/epam/aidial/core/TestFiles.java new file mode 100644 index 000000000..81c8871e2 --- /dev/null +++ b/src/test/java/com/epam/aidial/core/TestFiles.java @@ -0,0 +1,37 @@ +package com.epam.aidial.core; + +import lombok.SneakyThrows; +import lombok.experimental.UtilityClass; +import org.apache.commons.io.file.PathUtils; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +@UtilityClass +public class TestFiles { + + public static Path baseTestPath(Class clazz) { + return resolveRes(clazz.getSimpleName()); + } + + @SneakyThrows + public static Path resolveRes(String resourcePath) { + URI resUri = Objects.requireNonNull(TestFiles.class.getClassLoader().getResource(".")).toURI(); + return Paths.get(resUri).getParent().toAbsolutePath().resolve(resourcePath); + } + + @SneakyThrows + public static void createDir(Path dir) { + Files.createDirectories(dir); + } + + @SneakyThrows + public static void deleteDir(Path dir) { + if (Files.exists(dir)) { + PathUtils.deleteDirectory(dir); + } + } +} diff --git a/src/test/resources/aidial.config.json b/src/test/resources/aidial.config.json new file mode 100644 index 000000000..9c257fd22 --- /dev/null +++ b/src/test/resources/aidial.config.json @@ -0,0 +1,94 @@ +{ + "routes" : { + "route-rate" : { + "paths": ["/+v1/rate"], + "methods": ["POST"], + "response" : { + "status": 200 + } + } + }, + "addons": { + "search": { + "endpoint": "http://localhost:7010/search", + "displayName": "Search", + "iconUrl": "http://localhost:7001/search_addon.png", + "description": "Some description of the addon for testing", + "forwardApiKey": false + }, + "forecast": { + "endpoint": "http://localhost:7010/forecast", + "forwardAuthToken": false + }, + "calculator": { + "endpoint": "http://localhost:7010/calculator", + "forwardApiKey": false, + "forwardAuthToken": false + } + }, + "assistant": { + "endpoint": "http://localhost:7001/openai/deployments/assistant/chat/completions", + + "assistants": { + "ass": { + "prompt": "Commands: sit_down, get_up, run_away", + "addons": ["search"], + "displayName": "Search Assistant", + "iconUrl": "http://localhost:7001/search_app.png", + "description": "Some description of the assistant for testing" + } + } + }, + "applications": { + "app": { + "endpoint": "http://localhost:7001/openai/deployments/10k/chat/completions", + "displayName": "10k", + "iconUrl": "http://localhost:7001/logo10k.png", + "description": "Some description of the application for testing" + } + }, + "models": { + "chat-gpt-35-turbo": { + "type": "chat", + "displayName": "GPT 3.5", + "iconUrl": "http://localhost:7001/logo.png", + "description": "Some description of the model for testing", + "endpoint" : "http://localhost:7001/openai/deployments/gpt-35-turbo/chat/completions", + "upstreams": [ + {"endpoint": "http://localhost:7001", "key": "modelKey1"}, + {"endpoint": "http://localhost:7002", "key": "modelKey2"}, + {"endpoint": "http://localhost:7003", "key": "modelKey3"} + ] + }, + "embedding-ada": { + "type": "embedding", + "endpoint" : "http://localhost:7001/openai/deployments/ada/embeddings", + "upstreams": [ + {"endpoint": "http://localhost:7001", "key": "modelKey4"} + ] + } + }, + "keys": { + "proxyKey1": { + "project": "EPM-RTC-GPT", + "role": "default" + }, + "proxyKey2": { + "project": "EPM-RTC-RAIL", + "role": "default" + } + }, + "roles": { + "default": { + "limits": { + "chat-gpt-35-turbo": {"minute": "100000", "day": "10000000"}, + "embedding-ada": {"minute": "100000", "day": "10000000"}, + "search": {}, + "forecast": {}, + "calculator": {}, + "ass": {}, + "app": {} + } + } + } +} \ No newline at end of file diff --git a/src/test/resources/aidial.settings.json b/src/test/resources/aidial.settings.json new file mode 100644 index 000000000..8e6ee825f --- /dev/null +++ b/src/test/resources/aidial.settings.json @@ -0,0 +1,45 @@ +{ + "vertx": { + "metricsOptions": { + "enabled": false, + "jvmMetricsEnabled": false, + "labels": [ + "HTTP_METHOD", + "HTTP_CODE", + "HTTP_PATH" + ], + "prometheusOptions": { + "enabled": false, + "startEmbeddedServer": false, + "embeddedServerOptions": { + "port": 9464 + } + }, + "oltpOptions": { + "enabled": false + } + } + }, + "client": { + "idleTimeoutUnit": "MILLISECONDS", + "idleTimeout": 300000, + "connectTimeout": 10000, + "keepAlive": true, + "maxPoolSize": 128 + }, + "server": { + "port": 0, + "acceptBacklog": 4096, + "idleTimeoutUnit": "MILLISECONDS", + "idleTimeout": 300000, + "compressionSupported": true + }, + "config": { + "files": ["aidial.config.json"], + "reload": 60000 + }, + "identityProvider": { + "jwksUrl": "http://fakeJwksUrl:8080", + "appName": "dial" + } +} From 19a3aebe6af5b5aecf92b64ea1336bc9e1df26fc Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Tue, 21 Nov 2023 11:38:06 +0100 Subject: [PATCH 08/10] add tests --- .../controller/DownloadFileController.java | 2 +- .../aidial/core/storage/BlobStorageUtil.java | 15 ++++-- .../core/storage/BlobStorageUtilTest.java | 51 +++++++++++++++++++ 3 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 src/test/java/com/epam/aidial/core/storage/BlobStorageUtilTest.java diff --git a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java index e8b217247..772a71926 100644 --- a/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java +++ b/src/main/java/com/epam/aidial/core/controller/DownloadFileController.java @@ -47,7 +47,7 @@ public Future download(String path) { absoluteFilePath = path; } Future blobFuture = proxy.getVertx().executeBlocking(() -> - proxy.getStorage().load(BlobStorageUtil.removeLeadingAndTrailingPathSeparators(absoluteFilePath))); + proxy.getStorage().load(BlobStorageUtil.removeLeadingPathSeparator(absoluteFilePath))); Promise result = Promise.promise(); blobFuture.onSuccess(blob -> { diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java index 87c4a2ee6..bf4a38579 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorageUtil.java @@ -14,12 +14,21 @@ public class BlobStorageUtil { public static final String PATH_SEPARATOR = "/"; private static final char DELIMITER = PATH_SEPARATOR.charAt(0); - + /** + * Normalize provided path for listing files query by removing leading and adding trailing path separator. + * For example, path /Users/User1/files/folders will be transformed to Users/User1/files/folders/ + * + * @return normalized path + */ public String normalizePathForQuery(String path) { if (path == null || path.isBlank()) { return null; } + if (path.equals(PATH_SEPARATOR)) { + return path; + } + // remove leading separator if (path.charAt(0) == DELIMITER) { path = path.substring(1); @@ -38,14 +47,14 @@ public String removeLeadingAndTrailingPathSeparators(String path) { } public String removeLeadingPathSeparator(String path) { - if (path == null) { + if (path == null || path.isBlank()) { return null; } return path.charAt(0) == DELIMITER ? path.substring(1) : path; } public String removeTrailingPathSeparator(String path) { - if (path == null) { + if (path == null || path.isBlank()) { return null; } int length = path.length(); diff --git a/src/test/java/com/epam/aidial/core/storage/BlobStorageUtilTest.java b/src/test/java/com/epam/aidial/core/storage/BlobStorageUtilTest.java new file mode 100644 index 000000000..eccdf6818 --- /dev/null +++ b/src/test/java/com/epam/aidial/core/storage/BlobStorageUtilTest.java @@ -0,0 +1,51 @@ +package com.epam.aidial.core.storage; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static com.epam.aidial.core.storage.BlobStorageUtil.normalizePathForQuery; +import static com.epam.aidial.core.storage.BlobStorageUtil.removeLeadingAndTrailingPathSeparators; +import static com.epam.aidial.core.storage.BlobStorageUtil.removeLeadingPathSeparator; +import static com.epam.aidial.core.storage.BlobStorageUtil.removeTrailingPathSeparator; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class BlobStorageUtilTest { + + @Test + public void testNormalizePathForQuery() { + assertEquals("Users/User/files/", normalizePathForQuery("/Users/User/files")); + assertEquals("Users/User/files/", normalizePathForQuery("Users/User/files")); + assertEquals("folder/", normalizePathForQuery("folder")); + assertEquals("/", normalizePathForQuery("/")); + assertNull(normalizePathForQuery("")); + } + + @Test + public void testRemoveLeadingPathSeparator() { + assertEquals("Users/User/files/", removeLeadingPathSeparator("/Users/User/files/")); + assertEquals("Users/User/files", removeLeadingPathSeparator("Users/User/files")); + assertEquals("folder/", removeLeadingPathSeparator("/folder/")); + assertEquals("", removeLeadingPathSeparator("/")); + assertNull(removeLeadingPathSeparator("")); + } + + @Test + public void testRemoveTrailingPathSeparator() { + assertEquals("/Users/User/files", removeTrailingPathSeparator("/Users/User/files/")); + assertEquals("Users/User/files", removeTrailingPathSeparator("Users/User/files")); + assertEquals("/folder", removeTrailingPathSeparator("/folder/")); + assertEquals("", removeTrailingPathSeparator("/")); + assertNull(removeTrailingPathSeparator("")); + } + + @Test + public void testRemoveLeadingAndTrailingPathSeparators() { + assertEquals("Users/User/files", removeLeadingAndTrailingPathSeparators("/Users/User/files/")); + assertEquals("Users/User/files", removeLeadingAndTrailingPathSeparators("Users/User/files")); + assertEquals("folder", removeLeadingAndTrailingPathSeparators("/folder/")); + assertEquals("", removeLeadingAndTrailingPathSeparators(null)); + assertEquals("", removeLeadingAndTrailingPathSeparators("/")); + assertEquals("", removeLeadingAndTrailingPathSeparators("")); + } +} From 693edba2cc7ab0f9cf4d853cd2d05c4f225e655e Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Tue, 21 Nov 2023 12:50:17 +0100 Subject: [PATCH 09/10] address review comments --- .../java/com/epam/aidial/core/AiDial.java | 43 +++++-------------- .../epam/aidial/core/storage/BlobStorage.java | 2 +- .../com/epam/aidial/core/FileApiTest.java | 10 ++--- .../core/{TestFiles.java => FileUtil.java} | 4 +- 4 files changed, 18 insertions(+), 41 deletions(-) rename src/test/java/com/epam/aidial/core/{TestFiles.java => FileUtil.java} (86%) diff --git a/src/main/java/com/epam/aidial/core/AiDial.java b/src/main/java/com/epam/aidial/core/AiDial.java index 08939e99a..76ef99d5f 100644 --- a/src/main/java/com/epam/aidial/core/AiDial.java +++ b/src/main/java/com/epam/aidial/core/AiDial.java @@ -47,39 +47,8 @@ public class AiDial { private BlobStorage storage; - private void start() throws Exception { - try { - settings = settings(); - - VertxOptions vertxOptions = new VertxOptions(settings("vertx")); - setupMetrics(vertxOptions); - - vertx = Vertx.vertx(vertxOptions); - client = vertx.createHttpClient(new HttpClientOptions(settings("client"))); - - ConfigStore configStore = new FileConfigStore(vertx, settings("config")); - LogStore logStore = new GfLogStore(vertx); - RateLimiter rateLimiter = new RateLimiter(); - UpstreamBalancer upstreamBalancer = new UpstreamBalancer(); - - IdentityProvider identityProvider = new IdentityProvider(settings("identityProvider"), vertx); - Storage storageConfig = Json.decodeValue(settings("storage").toBuffer(), Storage.class); - storage = new BlobStorage(storageConfig); - Proxy proxy = new Proxy(vertx, client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider, storage); - - server = vertx.createHttpServer(new HttpServerOptions(settings("server"))).requestHandler(proxy); - open(server, HttpServer::listen); - - log.info("Proxy started on {}", server.actualPort()); - } catch (Throwable e) { - log.warn("Proxy failed to start:", e); - stop(); - throw e; - } - } - @VisibleForTesting - void start(BlobStorage storage) throws Exception { + void start() throws Exception { try { settings = settings(); @@ -95,7 +64,10 @@ void start(BlobStorage storage) throws Exception { UpstreamBalancer upstreamBalancer = new UpstreamBalancer(); IdentityProvider identityProvider = new IdentityProvider(settings("identityProvider"), vertx); - this.storage = storage; + if (storage == null) { + Storage storageConfig = Json.decodeValue(settings("storage").toBuffer(), Storage.class); + storage = new BlobStorage(storageConfig); + } Proxy proxy = new Proxy(vertx, client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider, storage); server = vertx.createHttpServer(new HttpServerOptions(settings("server"))).requestHandler(proxy); @@ -130,6 +102,11 @@ HttpServer getServer() { return server; } + @VisibleForTesting + void setStorage(BlobStorage storage) { + this.storage = storage; + } + private JsonObject settings(String key) { return settings.getJsonObject(key, new JsonObject()); } diff --git a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java index b990538ee..4334ddd3d 100644 --- a/src/main/java/com/epam/aidial/core/storage/BlobStorage.java +++ b/src/main/java/com/epam/aidial/core/storage/BlobStorage.java @@ -57,7 +57,7 @@ public BlobStorage(Storage config, Properties overrides) { } /** - * Initialize s3 multipart upload + * Initialize multipart upload * * @param fileName name of the file, for example: data.csv * @param path absolute path according to the bucket, for example: Users/user1/files/input data diff --git a/src/test/java/com/epam/aidial/core/FileApiTest.java b/src/test/java/com/epam/aidial/core/FileApiTest.java index 6c91e6566..8bb0d29bc 100644 --- a/src/test/java/com/epam/aidial/core/FileApiTest.java +++ b/src/test/java/com/epam/aidial/core/FileApiTest.java @@ -42,22 +42,22 @@ public class FileApiTest { public static void init() throws Exception { // initialize server dial = new AiDial(); - testDir = TestFiles.baseTestPath(FileApiTest.class); - BlobStorage storage = buildFsBlobStorage(testDir); - dial.start(storage); + testDir = FileUtil.baseTestPath(FileApiTest.class); + dial.setStorage(buildFsBlobStorage(testDir)); + dial.start(); serverPort = dial.getServer().actualPort(); } @BeforeEach public void setUp() { // prepare test directory - TestFiles.createDir(testDir.resolve("test")); + FileUtil.createDir(testDir.resolve("test")); } @AfterEach public void clean() { // clean test directory - TestFiles.deleteDir(testDir); + FileUtil.deleteDir(testDir); } @AfterAll diff --git a/src/test/java/com/epam/aidial/core/TestFiles.java b/src/test/java/com/epam/aidial/core/FileUtil.java similarity index 86% rename from src/test/java/com/epam/aidial/core/TestFiles.java rename to src/test/java/com/epam/aidial/core/FileUtil.java index 81c8871e2..e374014c2 100644 --- a/src/test/java/com/epam/aidial/core/TestFiles.java +++ b/src/test/java/com/epam/aidial/core/FileUtil.java @@ -11,7 +11,7 @@ import java.util.Objects; @UtilityClass -public class TestFiles { +public class FileUtil { public static Path baseTestPath(Class clazz) { return resolveRes(clazz.getSimpleName()); @@ -19,7 +19,7 @@ public static Path baseTestPath(Class clazz) { @SneakyThrows public static Path resolveRes(String resourcePath) { - URI resUri = Objects.requireNonNull(TestFiles.class.getClassLoader().getResource(".")).toURI(); + URI resUri = Objects.requireNonNull(FileUtil.class.getClassLoader().getResource(".")).toURI(); return Paths.get(resUri).getParent().toAbsolutePath().resolve(resourcePath); } From 87bff54312e21af2b3b9617b33293f1e12c482ab Mon Sep 17 00:00:00 2001 From: Maksim_Hadalau Date: Tue, 21 Nov 2023 13:27:01 +0100 Subject: [PATCH 10/10] address review comments --- README.md | 6 ++++++ src/main/java/com/epam/aidial/core/Proxy.java | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 36f5535ef..9a1e4c2ae 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,12 @@ Static settings are used on startup and cannot be changed while application is r |vertx.* |- |Vertx settings. |server.* |- |Vertx HTTP server settings for incoming requests. |client.* |- |Vertx HTTP client settings for outbound requests. +|storage.provider |- |Specifies blob storage provider. Supported providers: s3, aws-s3, azureblob, google-cloud-storage +|storage.endpoint |- |Optional. Specifies endpoint url for s3 compatible storages +|storage.identity |- |Blob storage access key +|storage.credential |- |Blob storage secret key +|storage.bucket |- |Blob storage bucket +|storage.createBucket |false |Indicates whether bucket should be created on start-up ### Dynamic settings Dynamic settings are stored in JSON files, specified via "config.files" static setting, and reloaded at interval, specified via "config.reload" static setting. diff --git a/src/main/java/com/epam/aidial/core/Proxy.java b/src/main/java/com/epam/aidial/core/Proxy.java index 0c03d2eeb..52b31712b 100644 --- a/src/main/java/com/epam/aidial/core/Proxy.java +++ b/src/main/java/com/epam/aidial/core/Proxy.java @@ -86,14 +86,15 @@ private void handleRequest(HttpServerRequest request) throws Exception { } String contentType = request.getHeader(HttpHeaders.CONTENT_TYPE); + int contentLength = ProxyUtil.contentLength(request, 1024); if (contentType != null && contentType.startsWith("multipart/form-data")) { - if (ProxyUtil.contentLength(request, 1024) > FILES_REQUEST_BODY_MAX_SIZE_BYTES) { + if (contentLength > FILES_REQUEST_BODY_MAX_SIZE_BYTES) { respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large"); return; } } else { // not only the case, Content-Length can be missing when Transfer-Encoding: chunked - if (ProxyUtil.contentLength(request, 1024) > REQUEST_BODY_MAX_SIZE_BYTES) { + if (contentLength > REQUEST_BODY_MAX_SIZE_BYTES) { respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large"); return; }