diff --git a/data-plane/config/100-config-kafka-broker-data-plane.yaml b/data-plane/config/100-config-kafka-broker-data-plane.yaml index e2bf84152f..88ed785968 100644 --- a/data-plane/config/100-config-kafka-broker-data-plane.yaml +++ b/data-plane/config/100-config-kafka-broker-data-plane.yaml @@ -27,7 +27,6 @@ data: retries=2147483647 batch.size=16384 client.dns.lookup=use_all_dns_ips - client.id=KKBD connections.max.idle.ms=600000 delivery.timeout.ms=120000 linger.ms=0 @@ -87,7 +86,6 @@ data: # ssl.provider= auto.commit.interval.ms=5000 check.crcs=true - client.id=KKBD # client.rack= fetch.max.wait.ms=500 # interceptor.classes= diff --git a/data-plane/config/sink/100-config-kafka-sink-data-plane.yaml b/data-plane/config/sink/100-config-kafka-sink-data-plane.yaml index db21f666cf..42fbd0ce18 100644 --- a/data-plane/config/sink/100-config-kafka-sink-data-plane.yaml +++ b/data-plane/config/sink/100-config-kafka-sink-data-plane.yaml @@ -27,7 +27,6 @@ data: retries=2147483647 batch.size=16384 client.dns.lookup=use_all_dns_ips - client.id=KKBD connections.max.idle.ms=600000 delivery.timeout.ms=120000 linger.ms=0 diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java index 587f6b53fb..9fa3c6f038 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java @@ -35,14 +35,9 @@ * @param type of the response of given senders. * @see ConsumerRecordHandler#handle(KafkaConsumerRecord) */ -public final class ConsumerRecordHandler implements - Handler> { +public final class ConsumerRecordHandler implements Handler> { - private static final Logger logger = LoggerFactory - .getLogger(ConsumerRecordHandler.class); - - private static final String SUBSCRIBER = "subscriber"; - private static final String DLQ = "dead letter queue"; + private static final Logger logger = LoggerFactory.getLogger(ConsumerRecordHandler.class); private final Filter filter; private final ConsumerRecordSender subscriberSender; @@ -55,9 +50,8 @@ public final class ConsumerRecordHandler implements * * @param subscriberSender sender to trigger subscriber * @param filter event filter - * @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to - * plug in custom offset management depending on the success/failure - * during the algorithm. + * @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to plug in custom offset + * management depending on the success/failure during the algorithm. * @param sinkResponseHandler handler of the response from {@code subscriberSender} * @param deadLetterQueueSender sender to DLQ */ @@ -86,9 +80,8 @@ public ConsumerRecordHandler( * * @param subscriberSender sender to trigger subscriber * @param filter event filter - * @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to - * plug in custom offset management depending on the success/failure - * during the algorithm. + * @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to plug in custom offset + * management depending on the success/failure during the algorithm. * @param sinkResponseHandler handler of the response */ public ConsumerRecordHandler( @@ -116,77 +109,81 @@ record -> Future.failedFuture("no DLQ configured") @Override public void handle(final KafkaConsumerRecord record) { - logger.debug("handling record {}", record); + logDebug("handling record", record); receiver.recordReceived(record); if (filter.match(record.value())) { - logger.debug("record match filtering {}", record); - - subscriberSender.send(record) - .onSuccess(response -> onSuccessfullySentToSubscriber(record)) - .onFailure(cause -> onFailedToSendToSubscriber(record, cause)) - .compose(sinkResponseHandler::handle) - .onFailure( - t -> logger.error("Failed to send the subscriber response to the broker topic", t)); + logDebug("record match filtering", record); + send(record); } else { - logger.debug("record doesn't match filtering {}", record); - + logDebug("record doesn't match filtering", record); receiver.recordDiscarded(record); } } - private void onSuccessfullySentToSubscriber(final KafkaConsumerRecord record) { - logSuccessfulSendTo(SUBSCRIBER, record); - - receiver.successfullySentToSubscriber(record); + private void send(final KafkaConsumerRecord record) { + subscriberSender.send(record) + .onSuccess(response -> sinkResponseHandler.handle(response) + .onSuccess(ignored -> { + logDebug("Successfully send response to the broker", record); + receiver.successfullySentToSubscriber(record); + }) + .onFailure(cause -> { + logError("Failed to handle response", record, cause); + sendToDLS(record); + })) + .onFailure(cause -> { + logError("Failed to send event to subscriber", record, cause); + sendToDLS(record); + }); } - private void onFailedToSendToSubscriber( - final KafkaConsumerRecord record, - final Throwable cause) { - - logFailedSendTo(SUBSCRIBER, record, cause); - + private void sendToDLS(KafkaConsumerRecord record) { deadLetterQueueSender.send(record) - .onSuccess(ignored -> onSuccessfullySentToDLQ(record)) - .onFailure(ex -> onFailedToSendToDLQ(record, ex)) - .compose(sinkResponseHandler::handle) - .onFailure( - t -> logger.error("Failed to send the subscriber response to the broker topic", t)); - } - - private void onSuccessfullySentToDLQ(final KafkaConsumerRecord record) { - logSuccessfulSendTo(DLQ, record); - - receiver.successfullySentToDLQ(record); + .onFailure(ex -> { + logError("Failed to send record to dead letter sink", record, ex); + receiver.failedToSendToDLQ(record, ex); + }) + .onSuccess(response -> sinkResponseHandler.handle(response) + .onSuccess(ignored -> { + logDebug("Successfully send response to the broker", record); + receiver.successfullySentToDLQ(record); + }) + .onFailure(cause -> { + logError("Failed to handle response", record, cause); + receiver.failedToSendToDLQ(record, cause); + })); } - private void onFailedToSendToDLQ(KafkaConsumerRecord record, Throwable ex) { - logFailedSendTo(DLQ, record, ex); - - receiver.failedToSendToDLQ(record, ex); - } - - private static void logFailedSendTo( - final String component, + private static void logError( + final String msg, final KafkaConsumerRecord record, final Throwable cause) { - logger.error(component + " sender failed to send record {} {} {}", - keyValue("topic", record.topic()), - keyValue("partition", record.partition()), - keyValue("offset", record.offset()), - keyValue("event", record.value()), - cause - ); + if (logger.isDebugEnabled()) { + logger.error(msg + " {} {} {}", + keyValue("topic", record.topic()), + keyValue("partition", record.partition()), + keyValue("offset", record.offset()), + keyValue("event", record.value()), + cause + ); + } else { + logger.error(msg + " {} {} {}", + keyValue("topic", record.topic()), + keyValue("partition", record.partition()), + keyValue("offset", record.offset()), + cause + ); + } } - private static void logSuccessfulSendTo( - final String component, + private static void logDebug( + final String msg, final KafkaConsumerRecord record) { - logger.debug("record successfully handled by " + component + " {} {} {}", + logger.debug(msg + " {} {} {}", keyValue("topic", record.topic()), keyValue("partition", record.partition()), keyValue("offset", record.offset()), diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandler.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandler.java index f3968670c7..0c465b9f7a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandler.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandler.java @@ -60,11 +60,12 @@ public HttpSinkResponseHandler( */ @Override public Future handle(final HttpResponse response) { + // TODO if it's in structured, the SDK just calls getBytes on the response.body() which might lead to NPE. MessageReader messageReader = VertxMessageFactory.createReader(response); if (messageReader.getEncoding() == Encoding.UNKNOWN) { // When the sink returns a malformed event we return a failed future to avoid committing the message to Kafka. - if (response.body().length() > 0) { + if (response.body() != null && response.body().length() > 0) { return Future.failedFuture( new IllegalResponseException("Unable to decode response: unknown encoding and non empty response") ); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandlerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandlerTest.java index 80910b7003..afa84d955a 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandlerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpSinkResponseHandlerTest.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher.http; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,6 +71,30 @@ public void shouldSucceedOnUnknownEncodingAndEmptyResponse(final Vertx vertx, fi .onSuccess(v -> context.completeNow()); } + @Test + public void shouldSucceedOnUnknownEncodingAndNullResponseBody(final Vertx vertx, final VertxTestContext context) { + final var producer = new MockProducer<>( + true, + new StringSerializer(), + new CloudEventSerializer() + ); + final var handler = new HttpSinkResponseHandler( + TOPIC, + KafkaProducer.create(vertx, producer) + ); + + // Empty response + final HttpResponse response = mock(HttpResponse.class); + when(response.statusCode()).thenReturn(202); + when(response.body()).thenReturn(null); + when(response.headers()).thenReturn(MultiMap.caseInsensitiveMultiMap()); + + context + .assertComplete(handler.handle(response)) + .onSuccess(v -> context.completeNow()); + } + + @Test public void shouldFailOnUnknownEncodingAndNonEmptyResponse(final Vertx vertx, final VertxTestContext context) { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java index c0b45dde0f..adab484c98 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java @@ -123,8 +123,7 @@ public void testUnorderedConsumer(final Vertx vertx, final VertxTestContext cont Thread.sleep(6000); // reduce flakiness - assertThat(vertx.deploymentIDs()) - .hasSize(numEgresses); + assertThat(vertx.deploymentIDs()).hasSize(numEgresses); waitEvents.await(); @@ -144,10 +143,8 @@ public void testUnorderedConsumer(final Vertx vertx, final VertxTestContext cont final var history = producerEntry.getValue().history(); assertThat(history).hasSameSizeAs(consumerRecords); - assertThat(history.stream().map(ProducerRecord::value)) - .containsExactlyInAnyOrder(events); - assertThat(history.stream().map(ProducerRecord::key)) - .containsAnyElementsOf(partitionKeys); + assertThat(history.stream().map(ProducerRecord::value)).containsExactlyInAnyOrder(events); + assertThat(history.stream().map(ProducerRecord::key)).containsAnyElementsOf(partitionKeys); } executorService.shutdown(); diff --git a/go.sum b/go.sum index 690c4329e9..4654072503 100644 --- a/go.sum +++ b/go.sum @@ -490,6 +490,7 @@ github.com/go-toolsmith/strparse v1.0.0/go.mod h1:YI2nUKP9YGZnL/L1/DLFBfixrcjslW github.com/go-toolsmith/typep v1.0.0/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2Ns5AIQkATU= github.com/go-toolsmith/typep v1.0.2/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2Ns5AIQkATU= github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b/go.mod h1:aUCEOzzezBEjDBbFBoSiya/gduyIiWYRP6CnSFIV8AM= +github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o= github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/gobuffalo/envy v1.6.5/go.mod h1:N+GkhhZ/93bGZc6ZKhJLP6+m+tCNPKwgSpH9kaifseQ= github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= diff --git a/test/pkg/config/100-config-logging.yaml b/test/pkg/config/100-config-logging.yaml index 46fe39748f..5d702d0ecf 100644 --- a/test/pkg/config/100-config-logging.yaml +++ b/test/pkg/config/100-config-logging.yaml @@ -25,7 +25,10 @@ data: - + + + +