diff --git a/.github/workflows/knative-java-test.yaml b/.github/workflows/knative-java-test.yaml index dbe2fc5bb0..90a95ca94a 100644 --- a/.github/workflows/knative-java-test.yaml +++ b/.github/workflows/knative-java-test.yaml @@ -20,10 +20,10 @@ name: Test on: push: - branches: [ 'master' ] + branches: [ 'main' ] pull_request: - branches: [ 'master', 'release-*' ] + branches: [ 'main', 'release-*' ] jobs: diff --git a/.github/workflows/knative-profile-data-plane.yaml b/.github/workflows/knative-profile-data-plane.yaml index 6089126ca8..3c358bd733 100644 --- a/.github/workflows/knative-profile-data-plane.yaml +++ b/.github/workflows/knative-profile-data-plane.yaml @@ -20,10 +20,10 @@ name: Profiling on: push: - branches: [ 'master' ] + branches: [ 'main' ] pull_request: - branches: [ 'master', 'release-*' ] + branches: [ 'main', 'release-*' ] jobs: diff --git a/data-plane/THIRD-PARTY.txt b/data-plane/THIRD-PARTY.txt index cfe4498ec3..180a83a417 100644 --- a/data-plane/THIRD-PARTY.txt +++ b/data-plane/THIRD-PARTY.txt @@ -16,12 +16,12 @@ Lists of 153 third-party dependencies. (Apache 2.0) Gson (com.google.code.gson:gson:2.8.6 - https://github.com/google/gson/gson) (Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.3.4 - http://nexus.sonatype.org/oss-repository-hosting.html/error_prone_parent/error_prone_annotations) (The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.1 - https://github.com/google/guava/failureaccess) - (Apache License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:29.0-android - https://github.com/google/guava/guava) + (Apache License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:30.0-android - https://github.com/google/guava/guava) (Apache License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:30.0-jre - https://github.com/google/guava/guava) (The Apache Software License, Version 2.0) Guava ListenableFuture only (com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava - https://github.com/google/guava/listenablefuture) (The Apache Software License, Version 2.0) J2ObjC Annotations (com.google.j2objc:j2objc-annotations:1.3 - https://github.com/google/j2objc/) - (3-Clause BSD License) Protocol Buffers [Core] (com.google.protobuf:protobuf-java:3.14.0 - https://developers.google.com/protocol-buffers/protobuf-java/) - (3-Clause BSD License) Protocol Buffers [Util] (com.google.protobuf:protobuf-java-util:3.14.0 - https://developers.google.com/protocol-buffers/protobuf-java-util/) + (3-Clause BSD License) Protocol Buffers [Core] (com.google.protobuf:protobuf-java:3.15.1 - https://developers.google.com/protocol-buffers/protobuf-java/) + (3-Clause BSD License) Protocol Buffers [Util] (com.google.protobuf:protobuf-java-util:3.15.1 - https://developers.google.com/protocol-buffers/protobuf-java-util/) (Apache 2.0) OkHttp Logging Interceptor (com.squareup.okhttp3:logging-interceptor:3.12.12 - https://github.com/square/okhttp/logging-interceptor) (Apache 2.0) MockWebServer (com.squareup.okhttp3:mockwebserver:3.12.6 - https://github.com/square/okhttp/mockwebserver) (Apache 2.0) OkHttp (com.squareup.okhttp3:okhttp:3.14.9 - https://github.com/square/okhttp/okhttp) @@ -65,8 +65,8 @@ Lists of 153 third-party dependencies. (Apache License, Version 2.0) Fabric8 :: Kubernetes :: Server Mock (io.fabric8:kubernetes-server-mock:5.0.2 - http://fabric8.io/kubernetes-server-mock/) (Apache License, Version 2.0) Fabric8 :: Mock Web Server (io.fabric8:mockwebserver:0.1.8 - http://fabric8.io/) (The Apache Software License, Version 2.0) zjsonpatch (io.fabric8:zjsonpatch:0.3.0 - https://github.com/fabric8io/zjsonpatch/) - (The Apache Software License, Version 2.0) micrometer-core (io.micrometer:micrometer-core:1.6.3 - https://github.com/micrometer-metrics/micrometer) - (The Apache Software License, Version 2.0) micrometer-registry-prometheus (io.micrometer:micrometer-registry-prometheus:1.6.3 - https://github.com/micrometer-metrics/micrometer) + (The Apache Software License, Version 2.0) micrometer-core (io.micrometer:micrometer-core:1.6.4 - https://github.com/micrometer-metrics/micrometer) + (The Apache Software License, Version 2.0) micrometer-registry-prometheus (io.micrometer:micrometer-registry-prometheus:1.6.4 - https://github.com/micrometer-metrics/micrometer) (Apache License, Version 2.0) Netty/Buffer (io.netty:netty-buffer:4.1.52.Final - https://netty.io/netty-buffer/) (Apache License, Version 2.0) Netty/Codec (io.netty:netty-codec:4.1.52.Final - https://netty.io/netty-codec/) (Apache License, Version 2.0) Netty/Codec/DNS (io.netty:netty-codec-dns:4.1.52.Final - https://netty.io/netty-codec-dns/) @@ -109,8 +109,8 @@ Lists of 153 third-party dependencies. (The Apache Software License, Version 2.0) Zipkin Sender: OkHttp 3 (io.zipkin.reporter2:zipkin-sender-okhttp3:2.16.0 - https://github.com/openzipkin/zipkin-reporter-java/zipkin-sender-okhttp3) (The Apache Software License, Version 2.0) Zipkin Core Library (io.zipkin.zipkin2:zipkin:2.22.2 - https://github.com/openzipkin/zipkin/zipkin) (Eclipse Public License 1.0) JUnit (junit:junit:4.13.1 - http://junit.org) - (Apache License, Version 2.0) Byte Buddy (without dependencies) (net.bytebuddy:byte-buddy:1.10.19 - https://bytebuddy.net/byte-buddy) - (Apache License, Version 2.0) Byte Buddy agent (net.bytebuddy:byte-buddy-agent:1.10.19 - https://bytebuddy.net/byte-buddy-agent) + (Apache License, Version 2.0) Byte Buddy (without dependencies) (net.bytebuddy:byte-buddy:1.10.20 - https://bytebuddy.net/byte-buddy) + (Apache License, Version 2.0) Byte Buddy agent (net.bytebuddy:byte-buddy-agent:1.10.20 - https://bytebuddy.net/byte-buddy-agent) (Apache License, Version 2.0) (MIT License) Logstash Logback Encoder (net.logstash.logback:logstash-logback-encoder:6.6 - https://github.com/logstash/logstash-logback-encoder) (The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:4.6 - http://pholser.github.com/jopt-simple) (The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.4 - http://jopt-simple.github.io/jopt-simple) @@ -138,11 +138,11 @@ Lists of 153 third-party dependencies. (Eclipse Public License v2.0) JUnit Platform Engine API (org.junit.platform:junit-platform-engine:1.7.0 - https://junit.org/junit5/) (Public Domain, per Creative Commons CC0) LatencyUtils (org.latencyutils:LatencyUtils:2.0.3 - http://latencyutils.github.io/LatencyUtils/) (The Apache Software License, Version 2.0) LZ4 and xxHash (org.lz4:lz4-java:1.7.1 - https://github.com/lz4/lz4-java) - (The MIT License) mockito-core (org.mockito:mockito-core:3.7.7 - https://github.com/mockito/mockito) - (The MIT License) mockito-junit-jupiter (org.mockito:mockito-junit-jupiter:3.7.7 - https://github.com/mockito/mockito) + (The MIT License) mockito-core (org.mockito:mockito-core:3.8.0 - https://github.com/mockito/mockito) + (The MIT License) mockito-junit-jupiter (org.mockito:mockito-junit-jupiter:3.8.0 - https://github.com/mockito/mockito) (Apache License, Version 2.0) Objenesis (org.objenesis:objenesis:3.1 - http://objenesis.org) - (GNU General Public License (GPL), version 2, with the Classpath exception) JMH Core (org.openjdk.jmh:jmh-core:1.23 - http://openjdk.java.net/projects/code-tools/jmh/jmh-core/) - (GNU General Public License (GPL), version 2, with the Classpath exception) JMH Generators: Annotation Processors (org.openjdk.jmh:jmh-generator-annprocess:1.23 - http://openjdk.java.net/projects/code-tools/jmh/jmh-generator-annprocess/) + (GNU General Public License (GPL), version 2, with the Classpath exception) JMH Core (org.openjdk.jmh:jmh-core:1.27 - http://openjdk.java.net/projects/code-tools/jmh/jmh-core/) + (GNU General Public License (GPL), version 2, with the Classpath exception) JMH Generators: Annotation Processors (org.openjdk.jmh:jmh-generator-annprocess:1.27 - http://openjdk.java.net/projects/code-tools/jmh/jmh-generator-annprocess/) (The Apache License, Version 2.0) org.opentest4j:opentest4j (org.opentest4j:opentest4j:1.2.0 - https://github.com/ota4j-team/opentest4j) (The New BSD License) (WTFPL) Reflections (org.reflections:reflections:0.9.12 - http://github.com/ronmamo/reflections) (Apache-2.0) Scala Library (org.scala-lang:scala-library:2.12.12 - https://www.scala-lang.org/) diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/CoreObjects.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/CoreObjects.java index 30ed49ff0a..2345eb89b5 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/CoreObjects.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/CoreObjects.java @@ -74,6 +74,10 @@ public static DataPlaneContract.Egress egress1() { .setConsumerGroup("1-1234567") .setDestination(DESTINATION) .setFilter(DataPlaneContract.Filter.newBuilder().putAttributes("type", "dev.knative")) + .setEgressConfig(DataPlaneContract.EgressConfig.newBuilder() + .setRetry(1) + .setBackoffDelay(1000) + .build()) .build(); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java index 317730b161..02ebd457a8 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java @@ -47,6 +47,11 @@ import io.vertx.kafka.client.common.KafkaClientOptions; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.producer.KafkaProducer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.HashSet; @@ -58,11 +63,13 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; + +import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; public class HttpConsumerVerticleFactory implements ConsumerVerticleFactory { + private static final Logger logger = LoggerFactory.getLogger(HttpConsumerVerticleFactory.class); + private final static ConsumerRecordSender NO_DLQ_SENDER = ConsumerRecordSender.create(Future.failedFuture("No DLQ set"), Future.succeededFuture()); @@ -200,7 +207,10 @@ private ConsumerRecordSender createConsumerRecordSender( final var circuitBreaker = CircuitBreaker .create(target, vertx, createCircuitBreakerOptions(egress)) - .retryPolicy(computeRetryPolicy(egress)); + .retryPolicy(computeRetryPolicy(egress)) + .openHandler(r -> logger.info("Circuit breaker opened {}", keyValue("target", target))) + .halfOpenHandler(r -> logger.info("Circuit breaker half-opened {}", keyValue("target", target))) + .closeHandler(r -> logger.info("Circuit breaker closed {}", keyValue("target", target))); return new HttpConsumerRecordSender( vertx, @@ -212,7 +222,16 @@ private ConsumerRecordSender createConsumerRecordSender( private static CircuitBreakerOptions createCircuitBreakerOptions(final DataPlaneContract.EgressConfig egressConfig) { if (egressConfig != null && egressConfig.getRetry() > 0) { - return new CircuitBreakerOptions().setMaxRetries(egressConfig.getRetry()); + return new CircuitBreakerOptions() + // TODO reset timeout should be configurable or, at least, set by the control plane + .setResetTimeout( + egressConfig.getBackoffDelay() > 0 ? + egressConfig.getBackoffDelay() : + CircuitBreakerOptions.DEFAULT_RESET_TIMEOUT + ) + // TODO max failures should be configurable or, at least, set by the control plane + .setMaxFailures(egressConfig.getRetry() * 2) + .setMaxRetries(egressConfig.getRetry()); } return new CircuitBreakerOptions(); } @@ -230,10 +249,9 @@ static Function computeRetryPolicy(final EgressConfig egress) { } private static boolean hasDeadLetterSink(final EgressConfig egressConfig) { - return !(egressConfig == null || egressConfig.getDeadLetter() == null || egressConfig.getDeadLetter().isEmpty()); + return !(egressConfig == null || egressConfig.getDeadLetter().isEmpty()); } - private static OffsetManager getOffsetManager(final DeliveryGuarantee type, final KafkaConsumer consumer, Consumer commitHandler) { return switch (type) { diff --git a/test/config/sacura/101-broker.yaml b/test/config/sacura/101-broker.yaml index 6a59dd280c..40cdb3e25b 100644 --- a/test/config/sacura/101-broker.yaml +++ b/test/config/sacura/101-broker.yaml @@ -21,6 +21,6 @@ spec: name: config-broker delivery: - retry: 3 + retry: 5 backoffPolicy: exponential - backoffDelay: PT0.5S + backoffDelay: PT5S diff --git a/test/config/sacura/200-config.yaml b/test/config/sacura/200-config.yaml index f93996a073..f5d2607619 100644 --- a/test/config/sacura/200-config.yaml +++ b/test/config/sacura/200-config.yaml @@ -7,8 +7,8 @@ data: sacura.yaml: | sender: target: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/sacura/broker - frequency: 200 - workers: 15 + frequency: 100 + workers: 4 keepAlive: true receiver: port: 8080