From f8daaf37c1e996a4840092668fb1f290d524cbc5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Stsiapanay Date: Fri, 9 Aug 2024 10:55:39 +0300 Subject: [PATCH] feat: DIAL Interceptor should check responses (#434) --- .../controller/InterceptorController.java | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/epam/aidial/core/controller/InterceptorController.java b/src/main/java/com/epam/aidial/core/controller/InterceptorController.java index 43a8028bb..e704f64f5 100644 --- a/src/main/java/com/epam/aidial/core/controller/InterceptorController.java +++ b/src/main/java/com/epam/aidial/core/controller/InterceptorController.java @@ -6,6 +6,8 @@ import com.epam.aidial.core.config.Deployment; import com.epam.aidial.core.function.BaseRequestFunction; import com.epam.aidial.core.function.CollectRequestAttachmentsFn; +import com.epam.aidial.core.function.CollectRequestDataFn; +import com.epam.aidial.core.function.CollectResponseAttachmentsFn; import com.epam.aidial.core.util.BufferingReadStream; import com.epam.aidial.core.util.HttpStatus; import com.epam.aidial.core.util.ProxyUtil; @@ -37,7 +39,7 @@ public class InterceptorController { public InterceptorController(Proxy proxy, ProxyContext context) { this.proxy = proxy; this.context = context; - this.enhancementFunctions = List.of(new CollectRequestAttachmentsFn(proxy, context)); + this.enhancementFunctions = List.of(new CollectRequestAttachmentsFn(proxy, context), new CollectRequestDataFn(proxy, context)); } public Future handle() { @@ -169,8 +171,10 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) { context.getDeployment().getEndpoint(), proxyResponse.statusCode(), proxyResponse.headers().size()); + CollectResponseAttachmentsFn handler = context.isStreamingRequest() ? new CollectResponseAttachmentsFn(proxy, context) : null; + BufferingReadStream responseStream = new BufferingReadStream(proxyResponse, - ProxyUtil.contentLength(proxyResponse, 1024)); + ProxyUtil.contentLength(proxyResponse, 1024), handler); context.setProxyResponse(proxyResponse); context.setProxyResponseTimestamp(System.currentTimeMillis()); @@ -185,11 +189,44 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) { responseStream.pipe() .endOnFailure(false) + .endOnSuccess(false) .to(response) - .onSuccess(ignore -> finalizeRequest()) + .onSuccess(ignore -> handleResponse(responseStream)) .onFailure(this::handleResponseError); } + void handleResponse(BufferingReadStream responseStream) { + Buffer responseBody = context.getResponseStream().getContent(); + collectResponseAttachments(responseBody).onComplete(result -> { + if (result.failed()) { + log.warn("Failed to collect attachments from response. Trace: {}. Span: {}", + context.getTraceId(), context.getSpanId(), result.cause()); + } + completeProxyResponse(responseStream); + }); + } + + private Future collectResponseAttachments(Buffer responseBody) { + if (context.isStreamingRequest()) { + return Future.succeededFuture(); + } + try (InputStream stream = new ByteBufInputStream(responseBody.getByteBuf())) { + ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(stream); + var fn = new CollectResponseAttachmentsFn(proxy, context); + return fn.apply(tree); + } catch (Throwable e) { + log.warn("Can't parse JSON response body. Trace: {}. Span: {}. Error:", + context.getTraceId(), context.getSpanId(), e); + return Future.failedFuture(e); + } + } + + private void completeProxyResponse(BufferingReadStream responseStream) { + HttpServerResponse response = context.getResponse(); + responseStream.end(response); + finalizeRequest(); + } + /** * Called when proxy failed to send response to the client. */