Skip to content

Commit

Permalink
Fix JDBC & MongoDB suppliers to deal with a new version of Splitter f…
Browse files Browse the repository at this point in the history
…unction
  • Loading branch information
artembilan committed Dec 26, 2024
1 parent d2f7637 commit 71c0f20
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.cloud.fn.supplier.jdbc;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -30,7 +29,6 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
Expand Down Expand Up @@ -71,21 +69,16 @@ public MessageSource<Object> jdbcMessageSource(
}

@Bean(name = "jdbcSupplier")
@PollableBean
@ConditionalOnProperty(prefix = "jdbc.supplier", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MessageSource<Object> jdbcMessageSource,
Function<Message<?>, List<Message<?>>> splitterFunction) {
Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction) {

return () -> {
return () -> Flux.<Message<?>>create(sink -> {
Message<?> received = jdbcMessageSource.receive();
if (received != null) {
// multiple Message<Map<String, Object>>
return Flux.fromIterable(splitterFunction.apply(received));
sink.next(received);
}
else {
return Flux.empty();
}
};
}).transform(splitterFunction);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,21 +32,22 @@

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Soby Chacko
* @author Artem Bilan
*/
@SpringBootTest(properties = "jdbc.supplier.query=select id, name from test order by id")
@DirtiesContext
public class DefaultJdbcSupplierTests {

@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;

@Autowired
JdbcTemplate jdbcTemplate;

@Test
@SuppressWarnings("rawtypes")
void testExtraction() {
final Flux<Message<?>> messageFlux = jdbcSupplier.get();
StepVerifier stepVerifier = StepVerifier.create(messageFlux)
StepVerifier.create(messageFlux)
.assertNext((message) -> assertThat(message)
.satisfies((msg) -> assertThat(msg).extracting(Message::getPayload).matches((o) -> {
Map map = (Map) o;
Expand All @@ -63,8 +64,7 @@ void testExtraction() {
return map.get("ID").equals(3L) && map.get("NAME").equals("John");
})))
.thenCancel()
.verifyLater();
stepVerifier.verify();
.verify();
}

@SpringBootApplication
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2024 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package org.springframework.cloud.fn.supplier.mongo;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -28,7 +27,6 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.expression.Expression;
Expand Down Expand Up @@ -59,21 +57,16 @@ public MongodbSupplierConfiguration(MongodbSupplierProperties properties, MongoT
}

@Bean(name = "mongodbSupplier")
@PollableBean
@ConditionalOnProperty(prefix = "mongodb", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MongoDbMessageSource mongoDbSource,
Function<Message<?>, List<Message<?>>> splitterFunction) {
Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction) {

return () -> {
return () -> Flux.<Message<?>>create(sink -> {
Message<?> received = mongoDbSource.receive();
if (received != null) {
// multiple Message<Map<String, Object>>
return Flux.fromIterable(splitterFunction.apply(received));
sink.next(received);
}
else {
return Flux.empty();
}
};
}).transform(splitterFunction);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2024 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,8 +76,6 @@ void testMongodbSupplier() {
(message) -> assertThat(toMap(message)).contains(entry("greeting", "hola"), entry("name", "bar")))
.thenCancel()
.verify();

assertThat(this.mongodbSupplier.get().collectList().block()).isEmpty();
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 71c0f20

Please sign in to comment.