Skip to content

Commit

Permalink
feat: DIAL Interceptor should check responses (#434)
Browse files Browse the repository at this point in the history
  • Loading branch information
astsiapanay authored Aug 9, 2024
1 parent e4f5dbd commit f8daaf3
Showing 1 changed file with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.epam.aidial.core.config.Deployment;
import com.epam.aidial.core.function.BaseRequestFunction;
import com.epam.aidial.core.function.CollectRequestAttachmentsFn;
import com.epam.aidial.core.function.CollectRequestDataFn;
import com.epam.aidial.core.function.CollectResponseAttachmentsFn;
import com.epam.aidial.core.util.BufferingReadStream;
import com.epam.aidial.core.util.HttpStatus;
import com.epam.aidial.core.util.ProxyUtil;
Expand Down Expand Up @@ -37,7 +39,7 @@ public class InterceptorController {
public InterceptorController(Proxy proxy, ProxyContext context) {
this.proxy = proxy;
this.context = context;
this.enhancementFunctions = List.of(new CollectRequestAttachmentsFn(proxy, context));
this.enhancementFunctions = List.of(new CollectRequestAttachmentsFn(proxy, context), new CollectRequestDataFn(proxy, context));
}

public Future<?> handle() {
Expand Down Expand Up @@ -169,8 +171,10 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) {
context.getDeployment().getEndpoint(),
proxyResponse.statusCode(), proxyResponse.headers().size());

CollectResponseAttachmentsFn handler = context.isStreamingRequest() ? new CollectResponseAttachmentsFn(proxy, context) : null;

BufferingReadStream responseStream = new BufferingReadStream(proxyResponse,
ProxyUtil.contentLength(proxyResponse, 1024));
ProxyUtil.contentLength(proxyResponse, 1024), handler);

context.setProxyResponse(proxyResponse);
context.setProxyResponseTimestamp(System.currentTimeMillis());
Expand All @@ -185,11 +189,44 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) {

responseStream.pipe()
.endOnFailure(false)
.endOnSuccess(false)
.to(response)
.onSuccess(ignore -> finalizeRequest())
.onSuccess(ignore -> handleResponse(responseStream))
.onFailure(this::handleResponseError);
}

void handleResponse(BufferingReadStream responseStream) {
Buffer responseBody = context.getResponseStream().getContent();
collectResponseAttachments(responseBody).onComplete(result -> {
if (result.failed()) {
log.warn("Failed to collect attachments from response. Trace: {}. Span: {}",
context.getTraceId(), context.getSpanId(), result.cause());
}
completeProxyResponse(responseStream);
});
}

private Future<Void> collectResponseAttachments(Buffer responseBody) {
if (context.isStreamingRequest()) {
return Future.succeededFuture();
}
try (InputStream stream = new ByteBufInputStream(responseBody.getByteBuf())) {
ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(stream);
var fn = new CollectResponseAttachmentsFn(proxy, context);
return fn.apply(tree);
} catch (Throwable e) {
log.warn("Can't parse JSON response body. Trace: {}. Span: {}. Error:",
context.getTraceId(), context.getSpanId(), e);
return Future.failedFuture(e);
}
}

private void completeProxyResponse(BufferingReadStream responseStream) {
HttpServerResponse response = context.getResponse();
responseStream.end(response);
finalizeRequest();
}

/**
* Called when proxy failed to send response to the client.
*/
Expand Down

0 comments on commit f8daaf3

Please sign in to comment.