diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index ccfb0edbf4..9dd8e3eb5c 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -28,9 +28,12 @@ public class CloudEventOverridesMutator implements CloudEventMutator { private final DataPlaneContract.CloudEventOverrides cloudEventOverrides; + private final String ceMetadataExtensionPrefix; - public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) { + public CloudEventOverridesMutator( + final DataPlaneContract.CloudEventOverrides cloudEventOverrides, final String ceMetadataExtensionPrefix) { this.cloudEventOverrides = cloudEventOverrides; + this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix; } @Override @@ -49,7 +52,7 @@ private void applyCloudEventOverrides(CloudEventBuilder builder) { } private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) { - builder.withExtension("knativekafkapartition", partition); - builder.withExtension("knativekafkaoffset", offset); + builder.withExtension(ceMetadataExtensionPrefix + "partition", partition); + builder.withExtension(ceMetadataExtensionPrefix + "offset", offset); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index 436ff73059..d833a2730e 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -115,7 +115,8 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f TracingPolicy.PROPAGATE), Metrics.getRegistry()), new CloudEventOverridesMutator( - consumerVerticleContext.getResource().getCloudEventOverrides())); + consumerVerticleContext.getResource().getCloudEventOverrides(), + consumerVerticleContext.getCeMetadataExtensionPrefix())); consumerVerticle.setRecordDispatcher(recordDispatcher); final var partitionRevokedHandlers = diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java index bbff86d576..ef8c140707 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java @@ -75,6 +75,8 @@ public class ConsumerVerticleContext { private Tags tags; + private String ceMetadataExtensionPrefix; + public ConsumerVerticleContext withConsumerConfigs(final Map consumerConfigs) { this.consumerConfigs = new HashMap<>(consumerConfigs); return this; @@ -170,6 +172,11 @@ public ConsumerVerticleContext withProducerFactory( return this; } + public ConsumerVerticleContext withCeMetadataExtensionPrefix(final String ceMetadataExtensionPrefix) { + this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix; + return this; + } + public ConsumerVerticleContext withEventTypeListerFactory(EventTypeListerFactory eventTypeListerFactory) { this.eventTypeListerFactory = eventTypeListerFactory; return this; @@ -247,6 +254,10 @@ public ReactiveProducerFactory getProducerFactory() { return this.producerFactory; } + public String getCeMetadataExtensionPrefix() { + return this.ceMetadataExtensionPrefix; + } + public EventTypeCreator getEventTypeCreator() { return this.eventTypeCreator; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java index 6d35c8415c..91b01c7d64 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java @@ -43,6 +43,7 @@ public class ConsumerVerticleFactoryImpl implements ConsumerVerticleFactory { private final ReactiveProducerFactory reactiveProducerFactory; private final EventTypeCreator eventTypeCreator; private final EventTypeListerFactory eventTypeListerFactory; + private final String ceMetadataExtensionPrefix; /** * All args constructor. @@ -62,7 +63,8 @@ public ConsumerVerticleFactoryImpl( final ReactiveConsumerFactory reactiveConsumerFactory, final ReactiveProducerFactory reactiveProducerFactory, EventTypeCreator eventTypeCreator, - EventTypeListerFactory eventTypeListerFactory) { + EventTypeListerFactory eventTypeListerFactory, + final String ceMetadataExtensionPrefix) { this.eventTypeCreator = eventTypeCreator; this.eventTypeListerFactory = eventTypeListerFactory; @@ -83,6 +85,7 @@ public ConsumerVerticleFactoryImpl( this.metricsRegistry = metricsRegistry; this.reactiveConsumerFactory = reactiveConsumerFactory; this.reactiveProducerFactory = reactiveProducerFactory; + this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix; } /** @@ -103,7 +106,8 @@ public ConsumerVerticle get(final EgressContext egressContext) { .withConsumerFactory(reactiveConsumerFactory) .withProducerFactory(reactiveProducerFactory) .withEventTypeCreator(eventTypeCreator) - .withEventTypeListerFactory(eventTypeListerFactory)) + .withEventTypeListerFactory(eventTypeListerFactory) + .withCeMetadataExtensionPrefix(ceMetadataExtensionPrefix)) .build(); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java index 6436d4253b..d5b1506174 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher.main; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; import dev.knative.eventing.kafka.broker.core.utils.BaseEnv; import java.util.function.Function; @@ -31,12 +32,17 @@ public class DispatcherEnv extends BaseEnv { public static final String EGRESSES_INITIAL_CAPACITY = "EGRESSES_INITIAL_CAPACITY"; private final int egressesInitialCapacity; + public static final String CE_METADATA_EXTENSION_PREFIX = "CE_METADATA_EXTENSION_PREFIX"; + private final String ceMetadataExtensionPrefix; + public DispatcherEnv(Function envProvider) { super(envProvider); this.consumerConfigFilePath = requireNonNull(envProvider.apply(CONSUMER_CONFIG_FILE_PATH)); this.webClientConfigFilePath = requireNonNull(envProvider.apply(WEBCLIENT_CONFIG_FILE_PATH)); this.egressesInitialCapacity = Integer.parseInt(requireNonNull(envProvider.apply(EGRESSES_INITIAL_CAPACITY))); + this.ceMetadataExtensionPrefix = + requireNonNullElse(envProvider.apply(CE_METADATA_EXTENSION_PREFIX), "knativekafka"); } public String getConsumerConfigFilePath() { @@ -51,6 +57,10 @@ public int getEgressesInitialCapacity() { return egressesInitialCapacity; } + public String getCeMetadataExtensionPrefix() { + return ceMetadataExtensionPrefix; + } + @Override public String toString() { return "DispatcherEnv{" + "consumerConfigFilePath='" diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java index dea0e210c8..9e3aea5957 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java @@ -138,7 +138,8 @@ public static void start( reactiveConsumerFactory, reactiveProducerFactory, eventTypeCreator, - eventTypeListerFactory), + eventTypeListerFactory, + env.getCeMetadataExtensionPrefix()), env.getEgressesInitialCapacity()); // Deploy the consumer deployer diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java index a0661cb77c..d1f5aadffa 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java @@ -38,8 +38,9 @@ public void shouldAddExtensions() { final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder() .putAllExtensions(extensions) .build(); + final var ceMetadataExtensionPrefix = "knativekafka"; - final var mutator = new CloudEventOverridesMutator(ceOverrides); + final var mutator = new CloudEventOverridesMutator(ceOverrides, ceMetadataExtensionPrefix); final var given = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) @@ -50,8 +51,8 @@ public void shouldAddExtensions() { final var expected = CloudEventBuilder.from(given); extensions.forEach(expected::withExtension); - expected.withExtension("knativekafkaoffset", 1L); - expected.withExtension("knativekafkapartition", 1); + expected.withExtension(ceMetadataExtensionPrefix + "offset", 1L); + expected.withExtension(ceMetadataExtensionPrefix + "partition", 1); final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given)); @@ -67,7 +68,7 @@ public void shouldNotThrowOnInvalidCloudEvent() { .putAllExtensions(extensions) .build(); - final var mutator = new CloudEventOverridesMutator(ceOverrides); + final var mutator = new CloudEventOverridesMutator(ceOverrides, "knativekafka"); final var given = new InvalidCloudEvent(null); @@ -87,8 +88,9 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() { final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder() .putAllExtensions(Map.of()) .build(); + final var ceMetadataExtensionPrefix = "knativekafka"; - final var mutator = new CloudEventOverridesMutator(ceOverrides); + final var mutator = new CloudEventOverridesMutator(ceOverrides, ceMetadataExtensionPrefix); final var given = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) @@ -98,8 +100,8 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() { .build(); final var expected = CloudEventBuilder.from(given) - .withExtension("knativekafkaoffset", 1L) - .withExtension("knativekafkapartition", 1) + .withExtension(ceMetadataExtensionPrefix + "offset", 1L) + .withExtension(ceMetadataExtensionPrefix + "partition", 1) .build(); final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given)); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImplTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImplTest.java index 4ae7abe10a..e4de6dd21c 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImplTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImplTest.java @@ -70,6 +70,8 @@ public void shouldAlwaysSucceed() { producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName()); producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName()); + final var ceMetadataExtensionPrefix = "knativekafka"; + final var verticleFactory = new ConsumerVerticleFactoryImpl( consumerProperties, new WebClientOptions(), @@ -79,7 +81,8 @@ public void shouldAlwaysSucceed() { new MockReactiveConsumerFactory<>(), new dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory<>(), mock(EventTypeCreator.class), - mock(EventTypeListerFactory.class)); + mock(EventTypeListerFactory.class), + ceMetadataExtensionPrefix); final var egress = DataPlaneContract.Egress.newBuilder() .setConsumerGroup("1234") @@ -117,6 +120,8 @@ public void shouldAlwaysSucceedWhenPassingResourceReference() { producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName()); producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName()); + final var ceMetadataExtensionPrefix = "knativekafka"; + final var verticleFactory = new ConsumerVerticleFactoryImpl( consumerProperties, new WebClientOptions(), @@ -126,7 +131,8 @@ public void shouldAlwaysSucceedWhenPassingResourceReference() { new MockReactiveConsumerFactory<>(), new MockReactiveProducerFactory<>(), mock(EventTypeCreator.class), - mock(EventTypeListerFactory.class)); + mock(EventTypeListerFactory.class), + ceMetadataExtensionPrefix); final var egress = DataPlaneContract.Egress.newBuilder() .setConsumerGroup("1234") @@ -168,6 +174,8 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDeadLetterSink() { producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName()); producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName()); + final var ceMetadataExtensionPrefix = "knativekafka"; + final var verticleFactory = new ConsumerVerticleFactoryImpl( consumerProperties, new WebClientOptions(), @@ -177,7 +185,8 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDeadLetterSink() { new MockReactiveConsumerFactory<>(), new MockReactiveProducerFactory<>(), mock(EventTypeCreator.class), - mock(EventTypeListerFactory.class)); + mock(EventTypeListerFactory.class), + ceMetadataExtensionPrefix); final var egress = DataPlaneContract.Egress.newBuilder() .setConsumerGroup("1234") diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index 056b2a4482..97cdb908a5 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -353,6 +353,7 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName()); final var producerConfigs = producerConfigs(); + final var ceMetadataExtensionPrefix = "knativekafka"; final var consumerVerticleFactory = new ConsumerVerticleFactoryImpl( consumerConfigs, @@ -363,7 +364,8 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT getReactiveConsumerFactory(), getReactiveProducerFactory(), mock(EventTypeCreator.class), - mock(EventTypeListerFactory.class)); + mock(EventTypeListerFactory.class), + ceMetadataExtensionPrefix); final var verticle = new ConsumerDeployerVerticle(consumerVerticleFactory, 10);