Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: initial implementation of Files API #26

Merged
merged 12 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ Static settings are used on startup and cannot be changed while application is r
|vertx.* |- |Vertx settings.
|server.* |- |Vertx HTTP server settings for incoming requests.
|client.* |- |Vertx HTTP client settings for outbound requests.
|storage.provider |- |Specifies blob storage provider. Supported providers: s3, aws-s3, azureblob, google-cloud-storage
|storage.endpoint |- |Optional. Specifies endpoint url for s3 compatible storages
|storage.identity |- |Blob storage access key
|storage.credential |- |Blob storage secret key
|storage.bucket |- |Blob storage bucket
|storage.createBucket |false |Indicates whether bucket should be created on start-up

### Dynamic settings
Dynamic settings are stored in JSON files, specified via "config.files" static setting, and reloaded at interval, specified via "config.reload" static setting.
Expand Down
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
implementation 'com.auth0:java-jwt:4.4.0'
implementation 'com.auth0:jwks-rsa:0.22.1'
implementation 'org.apache.jclouds:jclouds-allblobstore:2.5.0'

runtimeOnly 'com.epam.deltix:gflog-slf4j:3.0.0'
testImplementation 'org.mockito:mockito-core:5.4.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'io.vertx:vertx-web-client:4.4.6'
testImplementation 'io.vertx:vertx-junit5:4.4.6'
testImplementation 'org.apache.jclouds.api:filesystem:2.5.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.3'
}

Expand Down
41 changes: 36 additions & 5 deletions src/main/java/com/epam/aidial/core/AiDial.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import com.epam.aidial.core.config.ConfigStore;
import com.epam.aidial.core.config.FileConfigStore;
import com.epam.aidial.core.config.Storage;
import com.epam.aidial.core.limiter.RateLimiter;
import com.epam.aidial.core.log.GfLogStore;
import com.epam.aidial.core.log.LogStore;
import com.epam.aidial.core.security.IdentityProvider;
import com.epam.aidial.core.storage.BlobStorage;
import com.epam.aidial.core.upstream.UpstreamBalancer;
import com.epam.deltix.gflog.core.LogConfigurator;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.registry.otlp.OtlpMeterRegistry;
import io.vertx.config.spi.utils.JsonObjectHelper;
import io.vertx.core.Future;
Expand All @@ -17,11 +20,13 @@
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.micrometer.MicrometerMetricsOptions;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -40,7 +45,10 @@ public class AiDial {
private HttpServer server;
private HttpClient client;

private void start() throws Exception {
private BlobStorage storage;

@VisibleForTesting
void start() throws Exception {
try {
settings = settings();

Expand All @@ -56,25 +64,30 @@ private void start() throws Exception {
UpstreamBalancer upstreamBalancer = new UpstreamBalancer();

IdentityProvider identityProvider = new IdentityProvider(settings("identityProvider"), vertx);
Proxy proxy = new Proxy(client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider);
if (storage == null) {
Storage storageConfig = Json.decodeValue(settings("storage").toBuffer(), Storage.class);
storage = new BlobStorage(storageConfig);
}
Proxy proxy = new Proxy(vertx, client, configStore, logStore, rateLimiter, upstreamBalancer, identityProvider, storage);

server = vertx.createHttpServer(new HttpServerOptions(settings("server"))).requestHandler(proxy);
open(server, HttpServer::listen);

log.info("Proxy started on {}", server.actualPort());
Runtime.getRuntime().addShutdownHook(new Thread(this::stop, "shutdown-hook"));
} catch (Throwable e) {
log.warn("Proxy failed to start:", e);
stop();
throw e;
}
}

private void stop() {
@VisibleForTesting
void stop() {
try {
close(server, HttpServer::close);
close(client, HttpClient::close);
close(vertx, Vertx::close);
close(storage);
log.info("Proxy stopped");
LogConfigurator.unconfigure();
} catch (Throwable e) {
Expand All @@ -84,6 +97,16 @@ private void stop() {
}
}

@VisibleForTesting
HttpServer getServer() {
return server;
}

@VisibleForTesting
void setStorage(BlobStorage storage) {
this.storage = storage;
}

private JsonObject settings(String key) {
return settings.getJsonObject(key, new JsonObject());
}
Expand Down Expand Up @@ -150,6 +173,12 @@ private static <R> void close(R resource, AsyncCloser<R> closer) throws Exceptio
}
}

private static void close(Closeable resource) throws IOException {
if (resource != null) {
resource.close();
}
}

private interface AsyncOpener<R> {
Future<R> open(R resource);
}
Expand All @@ -159,7 +188,9 @@ private interface AsyncCloser<R> {
}

public static void main(String[] args) throws Exception {
new AiDial().start();
AiDial dial = new AiDial();
dial.start();
Runtime.getRuntime().addShutdownHook(new Thread(dial::stop, "shutdown-hook"));
}

private static void setupMetrics(VertxOptions options) {
Expand Down
33 changes: 25 additions & 8 deletions src/main/java/com/epam/aidial/core/Proxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import com.epam.aidial.core.log.LogStore;
import com.epam.aidial.core.security.ExtractedClaims;
import com.epam.aidial.core.security.IdentityProvider;
import com.epam.aidial.core.storage.BlobStorage;
import com.epam.aidial.core.upstream.UpstreamBalancer;
import com.epam.aidial.core.util.HttpStatus;
import com.epam.aidial.core.util.ProxyUtil;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
Expand Down Expand Up @@ -42,14 +44,17 @@ public class Proxy implements Handler<HttpServerRequest> {
public static final String HEADER_UPSTREAM_ATTEMPTS = "X-UPSTREAM-ATTEMPTS";
public static final String HEADER_CONTENT_TYPE_APPLICATION_JSON = "application/json";

public static final int REQUEST_BODY_MAX_SIZE = 16 * 1024 * 1024;
public static final int REQUEST_BODY_MAX_SIZE_BYTES = 16 * 1024 * 1024;
public static final int FILES_REQUEST_BODY_MAX_SIZE_BYTES = 512 * 1024 * 1024;

private final Vertx vertx;
private final HttpClient client;
private final ConfigStore configStore;
private final LogStore logStore;
private final RateLimiter rateLimiter;
private final UpstreamBalancer upstreamBalancer;
private final IdentityProvider identityProvider;
private final BlobStorage storage;

@Override
public void handle(HttpServerRequest request) {
Expand All @@ -74,15 +79,25 @@ private void handleRequest(HttpServerRequest request) throws Exception {
return;
}

if (request.method() != HttpMethod.GET && request.method() != HttpMethod.POST) {
HttpMethod requestMethod = request.method();
if (requestMethod != HttpMethod.GET && requestMethod != HttpMethod.POST && requestMethod != HttpMethod.DELETE) {
respond(request, HttpStatus.METHOD_NOT_ALLOWED);
return;
}

// not only the case, Content-Length can be missing when Transfer-Encoding: chunked
if (ProxyUtil.contentLength(request, 1024) > REQUEST_BODY_MAX_SIZE) {
respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large");
return;
String contentType = request.getHeader(HttpHeaders.CONTENT_TYPE);
int contentLength = ProxyUtil.contentLength(request, 1024);
if (contentType != null && contentType.startsWith("multipart/form-data")) {
if (contentLength > FILES_REQUEST_BODY_MAX_SIZE_BYTES) {
respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large");
return;
}
} else {
// not only the case, Content-Length can be missing when Transfer-Encoding: chunked
if (contentLength > REQUEST_BODY_MAX_SIZE_BYTES) {
respond(request, HttpStatus.REQUEST_ENTITY_TOO_LARGE, "Request body is too large");
return;
}
}

String path = URLDecoder.decode(request.path(), StandardCharsets.UTF_8);
Expand Down Expand Up @@ -141,7 +156,8 @@ private void handleRequest(HttpServerRequest request) throws Exception {
});
}

private void onExtractClaimsFailure(Throwable error, Config config, HttpServerRequest request, Key key) throws Exception {
private void onExtractClaimsFailure(Throwable error, Config config, HttpServerRequest request, Key key)
throws Exception {
if (key.getUserAuth() == UserAuth.ENABLED) {
log.error("Can't extract claims from authorization header", error);
respond(request, HttpStatus.UNAUTHORIZED, "Bad Authorization header");
Expand All @@ -152,7 +168,8 @@ private void onExtractClaimsFailure(Throwable error, Config config, HttpServerRe
}
}

private void onExtractClaimsSuccess(ExtractedClaims extractedClaims, Config config, HttpServerRequest request, Key key) throws Exception {
private void onExtractClaimsSuccess(ExtractedClaims extractedClaims, Config config,
HttpServerRequest request, Key key) throws Exception {
ProxyContext context = new ProxyContext(config, request, key, extractedClaims);
Controller controller = ControllerSelector.select(this, context);
controller.handle();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/epam/aidial/core/ProxyContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ProxyContext {
private final HttpServerResponse response;

private Deployment deployment;
private String userSub;
private List<String> userRoles;
private String userHash;
private TokenUsage tokenUsage;
Expand All @@ -59,6 +60,7 @@ public ProxyContext(Config config, HttpServerRequest request, Key key, Extracted
if (extractedClaims != null) {
this.userRoles = extractedClaims.userRoles();
this.userHash = extractedClaims.userHash();
this.userSub = extractedClaims.sub();
}
}

Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/epam/aidial/core/config/Storage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.epam.aidial.core.config;

import lombok.Data;

import javax.annotation.Nullable;

@Data
public class Storage {
artsiomkorzun marked this conversation as resolved.
Show resolved Hide resolved
/**
* Specifies storage provider. Supported providers: s3, aws-s3, azureblob, google-cloud-storage
*/
String provider;
/**
* Optional. Specifies endpoint url for s3 compatible storages
*/
@Nullable
String endpoint;
/**
* api key
*/
String identity;
/**
* secret key
*/
String credential;
/**
* container name/root bucket
*/
String bucket;

/**
* Indicates whether bucket should be created on start up
*/
boolean createBucket;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class ControllerSelector {
private static final Pattern PATTERN_APPLICATION = Pattern.compile("/+openai/applications/([-.@a-zA-Z0-9]+)");
private static final Pattern PATTERN_APPLICATIONS = Pattern.compile("/+openai/applications");


private static final Pattern PATTERN_FILES = Pattern.compile("/v1/files(.*)");

private static final Pattern PATTERN_RATE_RESPONSE = Pattern.compile("/+v1/([-.@a-zA-Z0-9]+)/rate");

public Controller select(Proxy proxy, ProxyContext context) {
Expand All @@ -37,15 +40,17 @@ public Controller select(Proxy proxy, ProxyContext context) {
Controller controller = null;

if (method == HttpMethod.GET) {
controller = selectGet(context, path);
controller = selectGet(proxy, context, path);
} else if (method == HttpMethod.POST) {
controller = selectPost(proxy, context, path);
} else if (method == HttpMethod.DELETE) {
controller = selectDelete(proxy, context, path);
}

return (controller == null) ? new RouteController(proxy, context) : controller;
}

private static Controller selectGet(ProxyContext context, String path) {
private static Controller selectGet(Proxy proxy, ProxyContext context, String path) {
Matcher match;

match = match(PATTERN_DEPLOYMENT, path);
Expand Down Expand Up @@ -113,6 +118,19 @@ private static Controller selectGet(ProxyContext context, String path) {
return controller::getApplications;
}

match = match(PATTERN_FILES, path);
if (match != null) {
String filePath = match.group(1);
String purpose = context.getRequest().params().get(DownloadFileController.PURPOSE_FILE_QUERY_PARAMETER);
if (DownloadFileController.QUERY_METADATA_QUERY_PARAMETER_VALUE.equals(purpose)) {
FileMetadataController controller = new FileMetadataController(proxy, context);
return () -> controller.list(filePath);
} else {
DownloadFileController controller = new DownloadFileController(proxy, context);
return () -> controller.download(filePath);
}
}

return null;
}

Expand All @@ -124,6 +142,23 @@ private static Controller selectPost(Proxy proxy, ProxyContext context, String p
DeploymentPostController controller = new DeploymentPostController(proxy, context);
return () -> controller.handle(deploymentId, deploymentApi);
}
match = match(PATTERN_FILES, path);
if (match != null) {
String relativeFilePath = match.group(1);
UploadFileController controller = new UploadFileController(proxy, context);
return () -> controller.upload(relativeFilePath);
}

return null;
}

private static Controller selectDelete(Proxy proxy, ProxyContext context, String path) {
Matcher match = match(PATTERN_FILES, path);
if (match != null) {
String relativeFilePath = match.group(1);
DeleteFileController controller = new DeleteFileController(proxy, context);
return () -> controller.delete(relativeFilePath);
}

match = match(PATTERN_RATE_RESPONSE, path);
if (match != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.epam.aidial.core.controller;

import com.epam.aidial.core.Proxy;
import com.epam.aidial.core.ProxyContext;
import com.epam.aidial.core.storage.BlobStorage;
import com.epam.aidial.core.storage.BlobStorageUtil;
import com.epam.aidial.core.util.HttpStatus;
import io.vertx.core.Future;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AllArgsConstructor
public class DeleteFileController {
private final Proxy proxy;
private final ProxyContext context;

/**
* Deletes file from storage.
* Current API implementation requires a relative path, absolute path will be calculated based on authentication context
*
* @param path relative path, for example: /inputs/data.csv
*/
public Future<?> delete(String path) {
String absoluteFilePath = BlobStorageUtil.buildAbsoluteFilePath(context, path);
BlobStorage storage = proxy.getStorage();
Future<Void> result = proxy.getVertx().executeBlocking(() -> {
try {
storage.delete(absoluteFilePath);
return null;
} catch (Exception ex) {
log.error("Failed to delete file " + absoluteFilePath, ex);
throw new RuntimeException(ex);
}
});

return result
.onSuccess(success -> context.respond(HttpStatus.OK))
.onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, error.getMessage()));
}
}
Loading