Skip to content

Commit

Permalink
feat: implement etag and file read-write caching (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksii-Klimov authored Jul 19, 2024
1 parent d72cfae commit 82afbab
Show file tree
Hide file tree
Showing 37 changed files with 1,313 additions and 557 deletions.
6 changes: 3 additions & 3 deletions src/main/java/com/epam/aidial/core/AiDial.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ void start() throws Exception {
LockService lockService = new LockService(redis, storage.getPrefix());
resourceService = new ResourceService(vertx, redis, storage, lockService, settings("resources"), storage.getPrefix());
InvitationService invitationService = new InvitationService(resourceService, encryptionService, settings("invitations"));
ShareService shareService = new ShareService(resourceService, invitationService, encryptionService, storage);
ResourceOperationService resourceOperationService = new ResourceOperationService(resourceService, storage, invitationService, shareService);
ShareService shareService = new ShareService(resourceService, invitationService, encryptionService);
ResourceOperationService resourceOperationService = new ResourceOperationService(resourceService, invitationService, shareService);
RuleService ruleService = new RuleService(resourceService);
AccessService accessService = new AccessService(encryptionService, shareService, ruleService, settings("access"));
NotificationService notificationService = new NotificationService(resourceService, encryptionService);
PublicationService publicationService = new PublicationService(encryptionService, resourceService, accessService,
ruleService, notificationService, storage, generator, clock);
ruleService, notificationService, generator, clock);
RateLimiter rateLimiter = new RateLimiter(vertx, resourceService);

ApiKeyStore apiKeyStore = new ApiKeyStore(resourceService, vertx);
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/epam/aidial/core/ProxyContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.epam.aidial.core.token.TokenUsage;
import com.epam.aidial.core.upstream.UpstreamRoute;
import com.epam.aidial.core.util.BufferingReadStream;
import com.epam.aidial.core.util.HttpException;
import com.epam.aidial.core.util.HttpStatus;
import com.epam.aidial.core.util.ProxyUtil;
import io.vertx.core.Future;
Expand Down Expand Up @@ -129,7 +130,14 @@ public Future<?> respond(HttpStatus status, String body) {
body.length() > LOG_MAX_ERROR_LENGTH ? body.substring(0, LOG_MAX_ERROR_LENGTH) : body);
}

return response.setStatusCode(status.getCode()).end(body);
response.setStatusCode(status.getCode()).end(body);
return Future.succeededFuture();
}

public Future<?> respond(Throwable error, String fallbackError) {
return error instanceof HttpException exception
? respond(exception.getStatus(), exception.getMessage())
: respond(HttpStatus.INTERNAL_SERVER_ERROR, fallbackError);
}

public String getProject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import com.epam.aidial.core.ProxyContext;
import com.epam.aidial.core.service.InvitationService;
import com.epam.aidial.core.service.LockService;
import com.epam.aidial.core.service.ResourceService;
import com.epam.aidial.core.service.ShareService;
import com.epam.aidial.core.storage.BlobStorage;
import com.epam.aidial.core.storage.ResourceDescription;
import com.epam.aidial.core.util.EtagHeader;
import com.epam.aidial.core.util.HttpStatus;
import io.vertx.core.Future;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -17,12 +18,14 @@ public class DeleteFileController extends AccessControlBaseController {
private final ShareService shareService;
private final InvitationService invitationService;
private final LockService lockService;
private final ResourceService resourceService;

public DeleteFileController(Proxy proxy, ProxyContext context) {
super(proxy, context, true);
this.shareService = proxy.getShareService();
this.invitationService = proxy.getInvitationService();
this.lockService = proxy.getLockService();
this.resourceService = proxy.getResourceService();
}

@Override
Expand All @@ -31,26 +34,23 @@ protected Future<?> handle(ResourceDescription resource, boolean hasWriteAccess)
return context.respond(HttpStatus.BAD_REQUEST, "Can't delete a folder");
}

String absoluteFilePath = resource.getAbsoluteFilePath();

BlobStorage storage = proxy.getStorage();
proxy.getVertx().executeBlocking(() -> {
EtagHeader etag = EtagHeader.fromRequest(context.getRequest());
String bucketName = resource.getBucketName();
String bucketLocation = resource.getBucketLocation();
try {
return lockService.underBucketLock(bucketLocation, () -> {
invitationService.cleanUpResourceLink(bucketName, bucketLocation, resource);
shareService.revokeSharedResource(bucketName, bucketLocation, resource);
storage.delete(absoluteFilePath);
return null;
});
} catch (Exception ex) {
log.error("Failed to delete file %s/%s".formatted(bucketName, resource.getOriginalPath()), ex);
throw new RuntimeException(ex);
}
return lockService.underBucketLock(bucketLocation, () -> {
invitationService.cleanUpResourceLink(bucketName, bucketLocation, resource);
shareService.revokeSharedResource(bucketName, bucketLocation, resource);
resourceService.deleteResource(resource, etag);

return null;
});
}, false)
.onSuccess(success -> context.respond(HttpStatus.OK))
.onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR, error.getMessage()));
.onFailure(error -> {
log.error("Failed to delete file {}/{}", resource.getBucketName(), resource.getOriginalPath(), error);
context.respond(error, error.getMessage());
});

return Future.succeededFuture();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@
import com.epam.aidial.core.storage.ResourceDescription;
import com.epam.aidial.core.util.HttpStatus;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import lombok.extern.slf4j.Slf4j;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.io.MutableContentMetadata;
import org.jclouds.io.Payload;

import java.io.IOException;

@Slf4j
public class DownloadFileController extends AccessControlBaseController {
Expand All @@ -29,37 +23,27 @@ protected Future<?> handle(ResourceDescription resource, boolean hasWriteAccess)
return context.respond(HttpStatus.BAD_REQUEST, "Can't download a folder");
}

proxy.getVertx().executeBlocking(() -> proxy.getStorage().load(resource.getAbsoluteFilePath()), false)
.compose(blob -> {
if (blob == null) {
proxy.getVertx().executeBlocking(() -> proxy.getResourceService().getResourceStream(resource), false)
.compose(resourceStream -> {
if (resourceStream == null) {
return context.respond(HttpStatus.NOT_FOUND);
}

Payload payload = blob.getPayload();
MutableContentMetadata metadata = payload.getContentMetadata();
String contentType = metadata.getContentType();
Long length = metadata.getContentLength();

HttpServerResponse response = context.getResponse()
.putHeader(HttpHeaders.CONTENT_TYPE, contentType)
.putHeader(HttpHeaders.CONTENT_TYPE, resourceStream.contentType())
// content-length removed by vertx
.putHeader(HttpHeaders.CONTENT_LENGTH, length.toString());

try {
InputStreamReader stream = new InputStreamReader(proxy.getVertx(), payload.openStream());
stream.pipeTo(response)
.onFailure(error -> {
stream.close();
context.getResponse().reset();
});

return Future.succeededFuture();
} catch (IOException e) {
throw new RuntimeException(e);
}
.putHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(resourceStream.contentLength()))
.putHeader(HttpHeaders.ETAG, resourceStream.etag());

InputStreamReader stream = new InputStreamReader(proxy.getVertx(), resourceStream.inputStream());
stream.pipeTo(response)
.onFailure(error -> {
stream.close();
response.reset();
});
return Future.succeededFuture();
}).onFailure(error -> context.respond(HttpStatus.INTERNAL_SERVER_ERROR,
"Failed to fetch file with path %s/%s".formatted(resource.getBucketName(),
resource.getOriginalPath())));
"Failed to fetch file with path %s/%s".formatted(resource.getBucketName(), resource.getOriginalPath())));

return Future.succeededFuture();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.epam.aidial.core.ProxyContext;
import com.epam.aidial.core.data.MetadataBase;
import com.epam.aidial.core.security.AccessService;
import com.epam.aidial.core.storage.BlobStorage;
import com.epam.aidial.core.service.ResourceService;
import com.epam.aidial.core.storage.ResourceDescription;
import com.epam.aidial.core.util.HttpStatus;
import io.vertx.core.Future;
Expand All @@ -15,11 +15,13 @@

@Slf4j
public class FileMetadataController extends AccessControlBaseController {
private final ResourceService resourceService;
private final AccessService accessService;

public FileMetadataController(Proxy proxy, ProxyContext context) {
super(proxy, context, false);
accessService = proxy.getAccessService();
this.resourceService = proxy.getResourceService();
this.accessService = proxy.getAccessService();
}

private String getContentType() {
Expand All @@ -31,7 +33,6 @@ private String getContentType() {

@Override
protected Future<?> handle(ResourceDescription resource, boolean hasWriteAccess) {
BlobStorage storage = proxy.getStorage();
boolean recursive = Boolean.parseBoolean(context.getRequest().getParam("recursive", "false"));
String token = context.getRequest().getParam("token");
int limit = Integer.parseInt(context.getRequest().getParam("limit", "100"));
Expand All @@ -41,7 +42,7 @@ protected Future<?> handle(ResourceDescription resource, boolean hasWriteAccess)

proxy.getVertx().executeBlocking(() -> {
try {
MetadataBase metadata = storage.listMetadata(resource, token, limit, recursive);
MetadataBase metadata = resourceService.getMetadata(resource, token, limit, recursive);
if (metadata != null) {
accessService.filterForbidden(context, resource, metadata);
if (context.getBooleanRequestQueryParam("permissions")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
import com.epam.aidial.core.config.ApiKeyData;
import com.epam.aidial.core.function.BaseFunction;
import com.epam.aidial.core.function.CollectAttachmentsFn;
import com.epam.aidial.core.function.enhancement.ApplyDefaultDeploymentSettingsFn;
import com.epam.aidial.core.function.enhancement.EnhanceAssistantRequestFn;
import com.epam.aidial.core.function.enhancement.EnhanceModelRequestFn;
import com.epam.aidial.core.util.BufferingReadStream;
import com.epam.aidial.core.util.HttpStatus;
import com.epam.aidial.core.util.ProxyUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.epam.aidial.core.service.ResourceService;
import com.epam.aidial.core.service.ShareService;
import com.epam.aidial.core.storage.ResourceDescription;
import com.epam.aidial.core.util.EtagHeader;
import com.epam.aidial.core.util.HttpException;
import com.epam.aidial.core.util.HttpStatus;
import com.epam.aidial.core.util.ProxyUtil;
Expand Down Expand Up @@ -168,15 +169,17 @@ private Future<?> putResource(ResourceDescription descriptor) {
throw new HttpException(HttpStatus.REQUEST_ENTITY_TOO_LARGE, message);
}

EtagHeader etag = EtagHeader.fromRequest(context.getRequest());
ResourceType resourceType = descriptor.getType();
String body = validateRequestBody(descriptor, resourceType, bytes.toString(StandardCharsets.UTF_8));

return vertx.executeBlocking(() -> service.putResource(descriptor, body, overwrite), false);
return vertx.executeBlocking(() -> service.putResource(descriptor, body, etag, overwrite), false);
})
.onSuccess((metadata) -> {
if (metadata == null) {
context.respond(HttpStatus.CONFLICT, "Resource already exists: " + descriptor.getUrl());
} else {
context.getResponse().putHeader(HttpHeaders.ETAG, metadata.getEtag());
context.respond(HttpStatus.OK, metadata);
}
})
Expand Down Expand Up @@ -221,12 +224,13 @@ private Future<?> deleteResource(ResourceDescription descriptor) {
}

vertx.executeBlocking(() -> {
EtagHeader etag = EtagHeader.fromRequest(context.getRequest());
String bucketName = descriptor.getBucketName();
String bucketLocation = descriptor.getBucketLocation();
return lockService.underBucketLock(bucketLocation, () -> {
invitationService.cleanUpResourceLink(bucketName, bucketLocation, descriptor);
shareService.revokeSharedResource(bucketName, bucketLocation, descriptor);
return service.deleteResource(descriptor);
return service.deleteResource(descriptor, etag);
});
}, false)
.onSuccess(deleted -> {
Expand All @@ -238,7 +242,7 @@ private Future<?> deleteResource(ResourceDescription descriptor) {
})
.onFailure(error -> {
log.warn("Can't delete resource: {}", descriptor.getUrl(), error);
context.respond(HttpStatus.INTERNAL_SERVER_ERROR);
context.respond(error, error.getMessage());
});

return Future.succeededFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class ResourceOperationController {

private static final Set<ResourceType> SUBSCRIPTION_ALLOWED_TYPES = Set.of(
/*ResourceType.FILE,*/ ResourceType.CONVERSATION, ResourceType.PROMPT);
ResourceType.FILE, ResourceType.CONVERSATION, ResourceType.PROMPT, ResourceType.APPLICATION);

private final ProxyContext context;
private final Vertx vertx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@

import com.epam.aidial.core.Proxy;
import com.epam.aidial.core.ProxyContext;
import com.epam.aidial.core.data.FileMetadata;
import com.epam.aidial.core.service.ResourceService;
import com.epam.aidial.core.storage.BlobWriteStream;
import com.epam.aidial.core.storage.ResourceDescription;
import com.epam.aidial.core.util.EtagHeader;
import com.epam.aidial.core.util.HttpStatus;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.impl.PipeImpl;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UploadFileController extends AccessControlBaseController {
private final ResourceService resourceService;

public UploadFileController(Proxy proxy, ProxyContext context) {
super(proxy, context, true);
this.resourceService = proxy.getResourceService();
}

@Override
Expand All @@ -29,26 +34,33 @@ protected Future<?> handle(ResourceDescription resource, boolean hasWriteAccess)
return context.respond(HttpStatus.BAD_REQUEST, "Resource name and/or parent folders must not end with .(dot)");
}

context.getRequest()
.setExpectMultipart(true)
.uploadHandler(upload -> {
String contentType = upload.contentType();
Pipe<Buffer> pipe = new PipeImpl<>(upload).endOnFailure(false);
BlobWriteStream writeStream = new BlobWriteStream(
proxy.getVertx(),
proxy.getStorage(),
resource,
contentType);
return proxy.getVertx().executeBlocking(() -> {
EtagHeader etag = EtagHeader.fromRequest(context.getRequest());
etag.validate(() -> proxy.getResourceService().getEtag(resource));
context.getRequest()
.setExpectMultipart(true)
.uploadHandler(upload -> {
String contentType = upload.contentType();
Pipe<Buffer> pipe = new PipeImpl<>(upload).endOnFailure(false);
BlobWriteStream writeStream = resourceService.beginFileUpload(resource, etag, contentType);
pipe.to(writeStream)
.onSuccess(success -> {
FileMetadata metadata = writeStream.getMetadata();
context.getResponse().putHeader(HttpHeaders.ETAG, metadata.getEtag());
context.respond(HttpStatus.OK, metadata);
})
.onFailure(error -> {
writeStream.abortUpload(error);
context.respond(error,
"Failed to upload file by path %s/%s".formatted(resource.getBucketName(), resource.getOriginalPath()));
});
});

pipe.to(writeStream)
.onSuccess(success -> context.respond(HttpStatus.OK, writeStream.getMetadata()))
.onFailure(error -> {
writeStream.abortUpload(error);
context.respond(HttpStatus.INTERNAL_SERVER_ERROR,
"Failed to upload file by path %s/%s".formatted(resource.getBucketName(), resource.getOriginalPath()));
});
return Future.succeededFuture();
}, false)
.otherwise(error -> {
context.respond(error, error.getMessage());
return null;
});

return Future.succeededFuture();
}
}
2 changes: 2 additions & 0 deletions src/main/java/com/epam/aidial/core/data/MetadataBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import java.util.Set;

@AllArgsConstructor
@Data
@NoArgsConstructor
@Accessors(chain = true)
public abstract class MetadataBase {
public static final String MIME_TYPE = "application/vnd.dial.metadata+json";

Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/epam/aidial/core/data/Publication.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class Publication {
List<Resource> resources;
Set<ResourceType> resourceTypes;
List<Rule> rules;
String etag;

public enum Status {
PENDING, APPROVED, REJECTED
Expand Down
Loading

0 comments on commit 82afbab

Please sign in to comment.