Skip to content

Commit

Permalink
Make Cassandra consumer as auto-config
Browse files Browse the repository at this point in the history
* Fix all its Checkstyle violations
* Expose also a `cassandraConsumer` bean for convenience
* Reflect the changes in the README
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent 84aa306 commit 0e7e010
Show file tree
Hide file tree
Showing 19 changed files with 91 additions and 60 deletions.
7 changes: 4 additions & 3 deletions consumer/spring-cassandra-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ Internally it uses the `CassandraMessageHandler` from Spring Integration.

## Beans for injection

You can import the `CassnadraConsumerConfiguration` in the application and then inject the following bean.
The `CassnadraConsumerConfiguration` auto-configuration provides the following beans:

`cassandraConsumerFunction`

You can use `cassandraConsumerFunction` as a qualifier when injecting.

Type for injection: `Function<Object, Mono<? extends WriteResult>>`

You can ignore the return value from the function as this is a consumer and simply will send the data to Cassandra.
You have to subscribe to the returned `Mono` to trigger a communication with Cassandra.
Or use `Consumer<Object> cassandraConsumer` instead which ignores the result and performs just `Mono.block()` before returning.

## Configuration Options

All configuration properties are prefixed with either `cassandra` or `cassandra.cluster`.
All configuration properties are prefixed with either `cassandra.consumer` or `cassandra.cluster`.

For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/cassandra/CassandraConsumerProperties.java[CassandraConsumerProperties].
See link:src/main/java/org/springframework/cloud/fn/consumer/cassandra/cluster/CassandraClusterProperties.java[this] also.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
Expand All @@ -32,58 +33,65 @@
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.data.cassandra.CassandraReactiveDataAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.consumer.cassandra.cluster.CassandraAppClusterConfiguration;
import org.springframework.cloud.fn.consumer.cassandra.query.ColumnNameExtractor;
import org.springframework.cloud.fn.consumer.cassandra.query.InsertQueryColumnNameExtractor;
import org.springframework.cloud.fn.consumer.cassandra.query.UpdateQueryColumnNameExtractor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.data.cassandra.core.InsertOptions;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.UpdateOptions;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.integration.JavaUtils;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;
import org.springframework.integration.transformer.AbstractPayloadTransformer;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;

/**
* Apache Cassandra consumer auto-configuration.
*
* @author Artem Bilan
* @author Thomas Risberg
* @author Ashu Gairola
* @author Akos Ratku
*/
@AutoConfiguration
@AutoConfiguration(after = CassandraReactiveDataAutoConfiguration.class)
@EnableConfigurationProperties(CassandraConsumerProperties.class)
@Import(CassandraAppClusterConfiguration.class)
public class CassandraConsumerConfiguration {

@Autowired
private CassandraConsumerProperties cassandraSinkProperties;

@Bean
public IntegrationFlow cassandraConsumerFlow(MessageHandler cassandraSinkMessageHandler,
ObjectMapper objectMapper) {

IntegrationFlowBuilder integrationFlowBuilder = IntegrationFlow.from(CassandraConsumerFunction.class);
String ingestQuery = this.cassandraSinkProperties.getIngestQuery();
if (StringUtils.hasText(ingestQuery)) {
integrationFlowBuilder.transform(new PayloadToMatrixTransformer(objectMapper, ingestQuery,
CassandraMessageHandler.Type.UPDATE == this.cassandraSinkProperties.getQueryType()
? new UpdateQueryColumnNameExtractor() : new InsertQueryColumnNameExtractor()));
}
return integrationFlowBuilder.handle(cassandraSinkMessageHandler).get();
public Consumer<Object> cassandraConsumer(CassandraConsumerFunction cassandraConsumerFunction) {
return (payload) -> cassandraConsumerFunction.apply(payload).block();
}

@Bean
public IntegrationFlow cassandraConsumerFlow(
@Qualifier("cassandraMessageHandler") MessageHandler cassandraMessageHandler, ObjectMapper objectMapper) {

return (flow) -> {
String ingestQuery = this.cassandraSinkProperties.getIngestQuery();
if (StringUtils.hasText(ingestQuery)) {
flow.transform(new PayloadToMatrixTransformer(objectMapper, ingestQuery,
(CassandraMessageHandler.Type.UPDATE == this.cassandraSinkProperties.getQueryType())
? new UpdateQueryColumnNameExtractor() : new InsertQueryColumnNameExtractor()));
}
flow.handle(cassandraMessageHandler);
};
}

@Bean
public MessageHandler cassandraSinkMessageHandler(ReactiveCassandraOperations cassandraOperations) {
public MessageHandler cassandraMessageHandler(ReactiveCassandraOperations cassandraOperations) {
CassandraMessageHandler.Type queryType = Optional.ofNullable(this.cassandraSinkProperties.getQueryType())
.orElse(CassandraMessageHandler.Type.INSERT);

Expand Down Expand Up @@ -190,6 +198,7 @@ protected boolean looksLikeISO8601(String dateStr) {

}

@MessagingGateway(name = "cassandraConsumerFunction", defaultRequestChannel = "cassandraConsumerFlow.input")
interface CassandraConsumerFunction extends Function<Object, Mono<? extends WriteResult>> {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,10 +23,12 @@
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;

/**
* Apache Cassandra consumer configuration properties.
*
* @author Artem Bilan
* @author Thomas Risberg
*/
@ConfigurationProperties("cassandra")
@ConfigurationProperties("cassandra.consumer")
public class CassandraConsumerProperties {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,8 @@

import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration;
import org.springframework.boot.autoconfigure.cassandra.CassandraProperties;
import org.springframework.boot.autoconfigure.cassandra.CqlSessionBuilderCustomizer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -38,11 +40,11 @@
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.data.cassandra.config.CqlSessionFactoryBean;
Expand All @@ -53,11 +55,13 @@
import org.springframework.util.StringUtils;

/**
* The auto-configuration for Apache Cassandra cluster initialization.
*
* @author Artem Bilan
* @author Thomas Risberg
* @author Rob Hardt
*/
@Configuration
@AutoConfiguration(before = CassandraAutoConfiguration.class)
@EnableConfigurationProperties(CassandraClusterProperties.class)
@Import(CassandraAppClusterConfiguration.CassandraPackageRegistrar.class)
public class CassandraAppClusterConfiguration {
Expand All @@ -66,7 +70,7 @@ public class CassandraAppClusterConfiguration {
public CqlSessionBuilderCustomizer clusterBuilderCustomizer(CassandraClusterProperties cassandraClusterProperties) {

PropertyMapper map = PropertyMapper.get();
return builder -> map.from(cassandraClusterProperties::isSkipSslValidation).whenTrue().toCall(() -> {
return (builder) -> map.from(cassandraClusterProperties::isSkipSslValidation).whenTrue().toCall(() -> {
try {
builder.withSslContext(TrustAllSSLContextFactory.getSslContext());
}
Expand Down Expand Up @@ -96,6 +100,7 @@ public Object keyspaceCreator(CassandraProperties cassandraProperties, CqlSessio

@Bean
@Lazy
@Primary
@DependsOn("keyspaceCreator")
public CqlSession cassandraSession(CqlSessionBuilder cqlSessionBuilder) {
return cqlSessionBuilder.build();
Expand All @@ -115,7 +120,7 @@ public Object keyspaceInitializer(CassandraClusterProperties cassandraClusterPro

Flux.fromArray(StringUtils.delimitedListToStringArray(scripts, ";", "\r\n\f"))
.filter(StringUtils::hasText) // an empty String after the last ';'
.concatMap(script -> reactiveCqlOperations.execute(script + ";"))
.concatMap((script) -> reactiveCqlOperations.execute(script + ";"))
.blockLast();

return null;
Expand All @@ -138,7 +143,7 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
Binder.get(this.environment)
.bind("cassandra.cluster.entity-base-packages", String[].class)
.map(Arrays::asList)
.ifBound(packagesToScan -> EntityScanPackages.register(registry, packagesToScan));
.ifBound((packagesToScan) -> EntityScanPackages.register(registry, packagesToScan));
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@
import org.springframework.core.io.Resource;

/**
* Common properties for the cassandra modules.
* Apache Cassandra cluster configuration properties.
*
* @author Artem Bilan
* @author Thomas Risberg
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,7 @@
/**
* Helper to provide an SSL Context that does not validate certificates presented in the
* SSL handshake.
*
* <p>
* The usual caveats apply.
*
* @author Rob Hardt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The Apache Cassandra cluster support.
*/
package org.springframework.cloud.fn.consumer.cassandra.cluster;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The Apache Cassandra consumer auto-configuration.
*/
package org.springframework.cloud.fn.consumer.cassandra;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@
import java.util.List;

/**
* The contact to extract column names from the Apache Cassandra query.
*
* @author Akos Ratku
* @author Artem Bilan
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,12 +24,14 @@
import org.springframework.util.StringUtils;

/**
* The {@link ColumnNameExtractor} implementation for {@code INSERT} queries.
*
* @author Akos Ratku
* @author Artem Bilan
*/
public class InsertQueryColumnNameExtractor implements ColumnNameExtractor {

private static final Pattern PATTERN = Pattern.compile(".+\\((.+)\\).+(?:values\\s*\\((.+)\\))");
private static final Pattern PATTERN = Pattern.compile(".+\\((.+)\\).+values\\s*\\((.+)\\)");

@Override
public List<String> extract(String query) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import org.springframework.util.StringUtils;

/**
* The {@link ColumnNameExtractor} implementation for {@code UPDATE} queries.
*
* @author Akos Ratku
* @author Artem Bilan
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The Apache Cassandra query utilities.
*/
package org.springframework.cloud.fn.consumer.cassandra.query;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org.springframework.cloud.fn.consumer.cassandra.CassandraConsumerConfiguration
org.springframework.cloud.fn.consumer.cassandra.cluster.CassandraAppClusterConfiguration

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,11 +27,9 @@
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.fn.consumer.cassandra.domain.Book;
import org.springframework.context.annotation.Import;
import org.springframework.data.cassandra.core.CassandraOperations;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.test.annotation.DirtiesContext;
Expand All @@ -41,11 +39,10 @@
/**
* @author Artem Bilan
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = { "spring.cassandra.keyspace-name=" + CassandraConsumerApplicationTests.CASSANDRA_KEYSPACE,
"cassandra.cluster.createKeyspace=true" })
@SpringBootTest(properties = { "spring.cassandra.keyspace-name=" + CassandraConsumerApplicationTests.CASSANDRA_KEYSPACE,
"cassandra.cluster.createKeyspace=true" })
@DirtiesContext
abstract class CassandraConsumerApplicationTests implements CassandraContainerTest {
abstract class CassandraConsumerApplicationTests implements CassandraTestContainer {

static final String CASSANDRA_KEYSPACE = "test";

Expand All @@ -57,10 +54,10 @@ abstract class CassandraConsumerApplicationTests implements CassandraContainerTe

@DynamicPropertySource
static void registerConfigurationProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cassandra.localDatacenter", () -> CASSANDRA_CONTAINER.getLocalDatacenter());
registry.add("spring.cassandra.localDatacenter", CASSANDRA_CONTAINER::getLocalDatacenter);
registry.add("spring.cassandra.contactPoints",
() -> Optional.of(CASSANDRA_CONTAINER.getContactPoint())
.map(contactPoint -> contactPoint.getAddress().getHostAddress() + ':' + contactPoint.getPort())
.map((contactPoint) -> contactPoint.getAddress().getHostAddress() + ':' + contactPoint.getPort())
.get());
}

Expand All @@ -80,9 +77,7 @@ protected static List<Book> getBookList(int numBooks) {
return books;
}

@SpringBootConfiguration
@EnableAutoConfiguration
@Import(CassandraConsumerConfiguration.class)
@SpringBootApplication
static class CassandraConsumerTestApplication {

}
Expand Down
Loading

0 comments on commit 0e7e010

Please sign in to comment.