Skip to content

Commit

Permalink
Make MQTT modules as auto-config
Browse files Browse the repository at this point in the history
* Fix all the Checkstyle violations for those modules
* Introduce a `MosquittoContainerTest` contract into the `spring-function-test-support`
and use it in the MQTT modules
* Fix READMEs for previously migrated modules to auto-configuration
  • Loading branch information
artembilan committed Jan 5, 2024
1 parent 3f349ca commit fbe0d22
Show file tree
Hide file tree
Showing 24 changed files with 163 additions and 145 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.fn.test.support.mqtt;

import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Testcontainers;

/**
* The base contract for JUnit tests based on the container for MQTT Mosquitto broker. The
* Testcontainers 'reuse' option must be disabled,so, Ryuk container is started and will
* clean all the containers up from this test suite after JVM exit. Since the Mosquitto
* container instance is shared via static property, it is going to be started only once
* per JVM, therefore the target Docker container is reused automatically.
*
* @author Artem Bilan
*/
@Testcontainers(disabledWithoutDocker = true)
public interface MosquittoContainerTest {

GenericContainer<?> MOSQUITTO_CONTAINER = new GenericContainer<>("eclipse-mosquitto:2.0.13")
.withCommand("mosquitto -c /mosquitto-no-auth.conf")
.withExposedPorts(1883);

@BeforeAll
static void startContainer() {
MOSQUITTO_CONTAINER.start();
}

static String mqttUrl() {
return "tcp://localhost:" + MOSQUITTO_CONTAINER.getFirstMappedPort();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The MQTT protocol testing support.
*/
package org.springframework.cloud.fn.test.support.mqtt;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,28 +23,25 @@
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;

/**
* Generic mqtt configuration.
* The MQTT client auto-configuration.
*
* @author Janne Valkealahti
* @author Artem Bilan
*/
@Configuration
@AutoConfiguration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfiguration {

@Autowired
private MqttProperties mqttProperties;

@Bean
public MqttPahoClientFactory mqttClientFactory() {

public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(mqttProperties.getUrl());
mqttConnectOptions.setUserName(mqttProperties.getUsername());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
import org.springframework.validation.annotation.Validated;

/**
* Generic mqtt connection properties.
* The MQTT client properties.
*
* @author Janne Valkealahti
* @author Artem Bilan
Expand Down Expand Up @@ -81,63 +81,63 @@ public class MqttProperties {

@Size(min = 1)
public String[] getUrl() {
return url;
return this.url;
}

public void setUrl(String[] url) {
this.url = url;
}

public String getUsername() {
return username;
return this.username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
return this.password;
}

public void setPassword(String password) {
this.password = password;
}

public boolean isCleanSession() {
return cleanSession;
return this.cleanSession;
}

public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}

public int getKeepAliveInterval() {
return keepAliveInterval;
return this.keepAliveInterval;
}

public void setKeepAliveInterval(int keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}

public int getConnectionTimeout() {
return connectionTimeout;
return this.connectionTimeout;
}

public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

public String getPersistence() {
return persistence;
return this.persistence;
}

public void setPersistence(String persistence) {
this.persistence = persistence;
}

public String getPersistenceDirectory() {
return persistenceDirectory;
return this.persistenceDirectory;
}

public void setPersistenceDirectory(String persistenceDirectory) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The MQTT client auto-configuration support.
*/
package org.springframework.cloud.fn.common.mqtt;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.common.mqtt.MqttConfiguration
2 changes: 1 addition & 1 deletion consumer/spring-file-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The consumer uses the `FileWritingMessageHandler` from Spring Integration.

## Beans for injection

You can import `FileConsumerConfiguration` in the application and then inject the following bean.
The `FileConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> fileConsumer`

Expand Down
2 changes: 1 addition & 1 deletion consumer/spring-ftp-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The consumer uses the `FtpMessageHandler` from Spring Integration.

## Beans for injection

You can import `FtpConsumerConfiguration` in the application and then inject the following bean.
The `FtpConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> ftpConsumer`

Expand Down
2 changes: 1 addition & 1 deletion consumer/spring-mqtt-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A consumer that allows you to send messages using the MQTT protocol.

## Beans for injection

You can import `MqttConsumerConfiguration` in the application and then inject the following bean.
The `MqttConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> mqttConsumer`

Expand Down
2 changes: 2 additions & 0 deletions consumer/spring-mqtt-consumer/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
dependencies {
api project(':spring-mqtt-common')

testImplementation project(':spring-function-test-support')
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,11 @@
import java.util.function.Consumer;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.common.mqtt.MqttConfiguration;
import org.springframework.cloud.fn.common.mqtt.MqttProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
Expand All @@ -35,45 +32,37 @@
import org.springframework.messaging.MessageHandler;

/**
* A consumer that sends data to Mqtt.
* A consumer that sends data to MQTT.
*
* @author Janne Valkealahti
*
* @author Artem Bilan
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ MqttProperties.class, MqttConsumerProperties.class })
@Import(MqttConfiguration.class)
@EnableConfigurationProperties(MqttConsumerProperties.class)
@AutoConfiguration(after = MqttConfiguration.class)
public class MqttConsumerConfiguration {

@Autowired
private MqttConsumerProperties properties;

@Autowired
private MqttPahoClientFactory mqttClientFactory;

@Autowired
private BeanFactory beanFactory;

@Bean
public Consumer<Message<?>> mqttConsumer(MessageHandler mqttOutbound) {
return mqttOutbound::handleMessage;
}

@Bean
public MessageHandler mqttOutbound(
public MessageHandler mqttOutbound(MqttConsumerProperties properties, MqttPahoClientFactory mqttClientFactory,
BeanFactory beanFactory,
@Nullable ComponentCustomizer<MqttPahoMessageHandler> mqttMessageHandlerCustomizer) {

MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(properties.getClientId(), mqttClientFactory);
messageHandler.setAsync(properties.isAsync());
messageHandler.setDefaultTopic(properties.getTopic());
messageHandler.setConverter(pahoMessageConverter());
messageHandler.setConverter(pahoMessageConverter(properties, beanFactory));
if (mqttMessageHandlerCustomizer != null) {
mqttMessageHandlerCustomizer.customize(messageHandler);
}
return messageHandler;
}

public DefaultPahoMessageConverter pahoMessageConverter() {
private DefaultPahoMessageConverter pahoMessageConverter(MqttConsumerProperties properties,
BeanFactory beanFactory) {
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(properties.getQos(),
properties.isRetained(), properties.getCharset());
converter.setBeanFactory(beanFactory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
import org.springframework.validation.annotation.Validated;

/**
* Properties for the Mqtt Consumer.
* Properties for the MQTT Consumer.
*
* @author Janne Valkealahti
*
Expand All @@ -34,32 +34,32 @@
public class MqttConsumerProperties {

/**
* identifies the client.
* Identifies the client.
*/
private String clientId = "stream.client.id.sink";

/**
* the topic to which the sink will publish.
* The topic to which the sink will publish.
*/
private String topic = "stream.mqtt";

/**
* the quality of service to use.
* The quality of service to use.
*/
private int qos = 1;

/**
* whether to set the 'retained' flag.
* Whether to set the 'retained' flag.
*/
private boolean retained = false;

/**
* the charset used to convert a String payload to byte[].
* The charset used to convert a String payload to byte[].
*/
private String charset = "UTF-8";

/**
* whether or not to use async sends.
* Whether to use async sends.
*/
private boolean async = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The MQTT consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.mqtt;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.mqtt.MqttConsumerConfiguration
Loading

0 comments on commit fbe0d22

Please sign in to comment.