Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configurable CE metadata extension prefix #4164

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
public class CloudEventOverridesMutator implements CloudEventMutator {

private final DataPlaneContract.CloudEventOverrides cloudEventOverrides;
private final String ceMetadataExtensionPrefix;

public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) {
public CloudEventOverridesMutator(
final DataPlaneContract.CloudEventOverrides cloudEventOverrides, final String ceMetadataExtensionPrefix) {
this.cloudEventOverrides = cloudEventOverrides;
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
}

@Override
Expand All @@ -49,7 +52,7 @@ private void applyCloudEventOverrides(CloudEventBuilder builder) {
}

private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) {
builder.withExtension("knativekafkapartition", partition);
builder.withExtension("knativekafkaoffset", offset);
builder.withExtension(ceMetadataExtensionPrefix + "partition", partition);
builder.withExtension(ceMetadataExtensionPrefix + "offset", offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f
TracingPolicy.PROPAGATE),
Metrics.getRegistry()),
new CloudEventOverridesMutator(
consumerVerticleContext.getResource().getCloudEventOverrides()));
consumerVerticleContext.getResource().getCloudEventOverrides(),
consumerVerticleContext.getCeMetadataExtensionPrefix()));
consumerVerticle.setRecordDispatcher(recordDispatcher);

final var partitionRevokedHandlers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ConsumerVerticleContext {

private Tags tags;

private String ceMetadataExtensionPrefix;

public ConsumerVerticleContext withConsumerConfigs(final Map<String, Object> consumerConfigs) {
this.consumerConfigs = new HashMap<>(consumerConfigs);
return this;
Expand Down Expand Up @@ -170,6 +172,11 @@ public ConsumerVerticleContext withProducerFactory(
return this;
}

public ConsumerVerticleContext withCeMetadataExtensionPrefix(final String ceMetadataExtensionPrefix) {
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
return this;
}

public ConsumerVerticleContext withEventTypeListerFactory(EventTypeListerFactory eventTypeListerFactory) {
this.eventTypeListerFactory = eventTypeListerFactory;
return this;
Expand Down Expand Up @@ -247,6 +254,10 @@ public ReactiveProducerFactory<String, CloudEvent> getProducerFactory() {
return this.producerFactory;
}

public String getCeMetadataExtensionPrefix() {
return this.ceMetadataExtensionPrefix;
}

public EventTypeCreator getEventTypeCreator() {
return this.eventTypeCreator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ConsumerVerticleFactoryImpl implements ConsumerVerticleFactory {
private final ReactiveProducerFactory reactiveProducerFactory;
private final EventTypeCreator eventTypeCreator;
private final EventTypeListerFactory eventTypeListerFactory;
private final String ceMetadataExtensionPrefix;

/**
* All args constructor.
Expand All @@ -62,7 +63,8 @@ public ConsumerVerticleFactoryImpl(
final ReactiveConsumerFactory reactiveConsumerFactory,
final ReactiveProducerFactory reactiveProducerFactory,
EventTypeCreator eventTypeCreator,
EventTypeListerFactory eventTypeListerFactory) {
EventTypeListerFactory eventTypeListerFactory,
final String ceMetadataExtensionPrefix) {
this.eventTypeCreator = eventTypeCreator;
this.eventTypeListerFactory = eventTypeListerFactory;

Expand All @@ -83,6 +85,7 @@ public ConsumerVerticleFactoryImpl(
this.metricsRegistry = metricsRegistry;
this.reactiveConsumerFactory = reactiveConsumerFactory;
this.reactiveProducerFactory = reactiveProducerFactory;
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
}

/**
Expand All @@ -103,7 +106,8 @@ public ConsumerVerticle get(final EgressContext egressContext) {
.withConsumerFactory(reactiveConsumerFactory)
.withProducerFactory(reactiveProducerFactory)
.withEventTypeCreator(eventTypeCreator)
.withEventTypeListerFactory(eventTypeListerFactory))
.withEventTypeListerFactory(eventTypeListerFactory)
.withCeMetadataExtensionPrefix(ceMetadataExtensionPrefix))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dev.knative.eventing.kafka.broker.dispatcher.main;

import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;

import dev.knative.eventing.kafka.broker.core.utils.BaseEnv;
import java.util.function.Function;
Expand All @@ -31,12 +32,17 @@ public class DispatcherEnv extends BaseEnv {
public static final String EGRESSES_INITIAL_CAPACITY = "EGRESSES_INITIAL_CAPACITY";
private final int egressesInitialCapacity;

public static final String CE_METADATA_EXTENSION_PREFIX = "CE_METADATA_EXTENSION_PREFIX";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was there any feedback from the node.js library on why they fail - even the spec says should ?

CC @Cali0707

Asking here, instead of adding all this code for pluggable prefixes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node.js library was updated successfully. We are in a tough position to update some of our legacy services so were hoping this change could be considered regardless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we fixed it in the SDK since as you said @matzew this was a "should" requirement. But, I guess that doesn't fix it if people are using a non-fixed version of the SDK

private final String ceMetadataExtensionPrefix;

public DispatcherEnv(Function<String, String> envProvider) {
super(envProvider);

this.consumerConfigFilePath = requireNonNull(envProvider.apply(CONSUMER_CONFIG_FILE_PATH));
this.webClientConfigFilePath = requireNonNull(envProvider.apply(WEBCLIENT_CONFIG_FILE_PATH));
this.egressesInitialCapacity = Integer.parseInt(requireNonNull(envProvider.apply(EGRESSES_INITIAL_CAPACITY)));
this.ceMetadataExtensionPrefix =
requireNonNullElse(envProvider.apply(CE_METADATA_EXTENSION_PREFIX), "knativekafka");
}

public String getConsumerConfigFilePath() {
Expand All @@ -51,6 +57,10 @@ public int getEgressesInitialCapacity() {
return egressesInitialCapacity;
}

public String getCeMetadataExtensionPrefix() {
return ceMetadataExtensionPrefix;
}

@Override
public String toString() {
return "DispatcherEnv{" + "consumerConfigFilePath='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public static void start(
reactiveConsumerFactory,
reactiveProducerFactory,
eventTypeCreator,
eventTypeListerFactory),
eventTypeListerFactory,
env.getCeMetadataExtensionPrefix()),
env.getEgressesInitialCapacity());

// Deploy the consumer deployer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public void shouldAddExtensions() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(extensions)
.build();
final var ceMetadataExtensionPrefix = "knativekafka";

final var mutator = new CloudEventOverridesMutator(ceOverrides);
final var mutator = new CloudEventOverridesMutator(ceOverrides, ceMetadataExtensionPrefix);

final var given = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
Expand All @@ -50,8 +51,8 @@ public void shouldAddExtensions() {

final var expected = CloudEventBuilder.from(given);
extensions.forEach(expected::withExtension);
expected.withExtension("knativekafkaoffset", 1L);
expected.withExtension("knativekafkapartition", 1);
expected.withExtension(ceMetadataExtensionPrefix + "offset", 1L);
expected.withExtension(ceMetadataExtensionPrefix + "partition", 1);

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

Expand All @@ -67,7 +68,7 @@ public void shouldNotThrowOnInvalidCloudEvent() {
.putAllExtensions(extensions)
.build();

final var mutator = new CloudEventOverridesMutator(ceOverrides);
final var mutator = new CloudEventOverridesMutator(ceOverrides, "knativekafka");

final var given = new InvalidCloudEvent(null);

Expand All @@ -87,8 +88,9 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(Map.of())
.build();
final var ceMetadataExtensionPrefix = "knativekafka";

final var mutator = new CloudEventOverridesMutator(ceOverrides);
final var mutator = new CloudEventOverridesMutator(ceOverrides, ceMetadataExtensionPrefix);

final var given = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
Expand All @@ -98,8 +100,8 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() {
.build();

final var expected = CloudEventBuilder.from(given)
.withExtension("knativekafkaoffset", 1L)
.withExtension("knativekafkapartition", 1)
.withExtension(ceMetadataExtensionPrefix + "offset", 1L)
.withExtension(ceMetadataExtensionPrefix + "partition", 1)
.build();

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void shouldAlwaysSucceed() {
producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());
producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final var ceMetadataExtensionPrefix = "knativekafka";

final var verticleFactory = new ConsumerVerticleFactoryImpl(
consumerProperties,
new WebClientOptions(),
Expand All @@ -79,7 +81,8 @@ public void shouldAlwaysSucceed() {
new MockReactiveConsumerFactory<>(),
new dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory<>(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var egress = DataPlaneContract.Egress.newBuilder()
.setConsumerGroup("1234")
Expand Down Expand Up @@ -117,6 +120,8 @@ public void shouldAlwaysSucceedWhenPassingResourceReference() {
producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());
producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final var ceMetadataExtensionPrefix = "knativekafka";

final var verticleFactory = new ConsumerVerticleFactoryImpl(
consumerProperties,
new WebClientOptions(),
Expand All @@ -126,7 +131,8 @@ public void shouldAlwaysSucceedWhenPassingResourceReference() {
new MockReactiveConsumerFactory<>(),
new MockReactiveProducerFactory<>(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var egress = DataPlaneContract.Egress.newBuilder()
.setConsumerGroup("1234")
Expand Down Expand Up @@ -168,6 +174,8 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDeadLetterSink() {
producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());
producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final var ceMetadataExtensionPrefix = "knativekafka";

final var verticleFactory = new ConsumerVerticleFactoryImpl(
consumerProperties,
new WebClientOptions(),
Expand All @@ -177,7 +185,8 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDeadLetterSink() {
new MockReactiveConsumerFactory<>(),
new MockReactiveProducerFactory<>(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var egress = DataPlaneContract.Egress.newBuilder()
.setConsumerGroup("1234")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT
NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName());

final var producerConfigs = producerConfigs();
final var ceMetadataExtensionPrefix = "knativekafka";

final var consumerVerticleFactory = new ConsumerVerticleFactoryImpl(
consumerConfigs,
Expand All @@ -363,7 +364,8 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT
getReactiveConsumerFactory(),
getReactiveProducerFactory(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var verticle = new ConsumerDeployerVerticle(consumerVerticleFactory, 10);

Expand Down
Loading