From 71c0f2088e378a9fa026dbabb6c3d284af9ee01e Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 26 Dec 2024 16:12:46 -0500 Subject: [PATCH] Fix JDBC & MongoDB suppliers to deal with a new version of Splitter function --- .../jdbc/JdbcSupplierConfiguration.java | 15 ++++----------- .../supplier/jdbc/DefaultJdbcSupplierTests.java | 14 +++++++------- .../mongo/MongodbSupplierConfiguration.java | 17 +++++------------ .../mongo/MongodbSupplierApplicationTests.java | 4 +--- 4 files changed, 17 insertions(+), 33 deletions(-) diff --git a/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java b/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java index 3f157e10..6a490e68 100644 --- a/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java +++ b/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java @@ -16,7 +16,6 @@ package org.springframework.cloud.fn.supplier.jdbc; -import java.util.List; import java.util.function.Function; import java.util.function.Supplier; @@ -30,7 +29,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration; -import org.springframework.cloud.function.context.PollableBean; import org.springframework.context.annotation.Bean; import org.springframework.integration.core.MessageSource; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; @@ -71,21 +69,16 @@ public MessageSource jdbcMessageSource( } @Bean(name = "jdbcSupplier") - @PollableBean @ConditionalOnProperty(prefix = "jdbc.supplier", name = "split", matchIfMissing = true) public Supplier>> splittedSupplier(MessageSource jdbcMessageSource, - Function, List>> splitterFunction) { + Function>, Flux>> splitterFunction) { - return () -> { + return () -> Flux.>create(sink -> { Message received = jdbcMessageSource.receive(); if (received != null) { - // multiple Message> - return Flux.fromIterable(splitterFunction.apply(received)); + sink.next(received); } - else { - return Flux.empty(); - } - }; + }).transform(splitterFunction); } @Bean diff --git a/supplier/spring-jdbc-supplier/src/test/java/org/springframework/cloud/fn/supplier/jdbc/DefaultJdbcSupplierTests.java b/supplier/spring-jdbc-supplier/src/test/java/org/springframework/cloud/fn/supplier/jdbc/DefaultJdbcSupplierTests.java index 87b29f1b..0b13387b 100644 --- a/supplier/spring-jdbc-supplier/src/test/java/org/springframework/cloud/fn/supplier/jdbc/DefaultJdbcSupplierTests.java +++ b/supplier/spring-jdbc-supplier/src/test/java/org/springframework/cloud/fn/supplier/jdbc/DefaultJdbcSupplierTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,10 @@ import static org.assertj.core.api.Assertions.assertThat; +/** + * @author Soby Chacko + * @author Artem Bilan + */ @SpringBootTest(properties = "jdbc.supplier.query=select id, name from test order by id") @DirtiesContext public class DefaultJdbcSupplierTests { @@ -39,14 +43,11 @@ public class DefaultJdbcSupplierTests { @Autowired Supplier>> jdbcSupplier; - @Autowired - JdbcTemplate jdbcTemplate; - @Test @SuppressWarnings("rawtypes") void testExtraction() { final Flux> messageFlux = jdbcSupplier.get(); - StepVerifier stepVerifier = StepVerifier.create(messageFlux) + StepVerifier.create(messageFlux) .assertNext((message) -> assertThat(message) .satisfies((msg) -> assertThat(msg).extracting(Message::getPayload).matches((o) -> { Map map = (Map) o; @@ -63,8 +64,7 @@ void testExtraction() { return map.get("ID").equals(3L) && map.get("NAME").equals("John"); }))) .thenCancel() - .verifyLater(); - stepVerifier.verify(); + .verify(); } @SpringBootApplication diff --git a/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java b/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java index 693a461b..034b4d9a 100644 --- a/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java +++ b/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package org.springframework.cloud.fn.supplier.mongo; -import java.util.List; import java.util.function.Function; import java.util.function.Supplier; @@ -28,7 +27,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration; -import org.springframework.cloud.function.context.PollableBean; import org.springframework.context.annotation.Bean; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.expression.Expression; @@ -59,21 +57,16 @@ public MongodbSupplierConfiguration(MongodbSupplierProperties properties, MongoT } @Bean(name = "mongodbSupplier") - @PollableBean @ConditionalOnProperty(prefix = "mongodb", name = "split", matchIfMissing = true) public Supplier>> splittedSupplier(MongoDbMessageSource mongoDbSource, - Function, List>> splitterFunction) { + Function>, Flux>> splitterFunction) { - return () -> { + return () -> Flux.>create(sink -> { Message received = mongoDbSource.receive(); if (received != null) { - // multiple Message> - return Flux.fromIterable(splitterFunction.apply(received)); + sink.next(received); } - else { - return Flux.empty(); - } - }; + }).transform(splitterFunction); } @Bean diff --git a/supplier/spring-mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java b/supplier/spring-mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java index efc98c56..0a9dc9be 100644 --- a/supplier/spring-mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java +++ b/supplier/spring-mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,8 +76,6 @@ void testMongodbSupplier() { (message) -> assertThat(toMap(message)).contains(entry("greeting", "hola"), entry("name", "bar"))) .thenCancel() .verify(); - - assertThat(this.mongodbSupplier.get().collectList().block()).isEmpty(); } @SuppressWarnings("unchecked")