Skip to content

Commit

Permalink
GH-112: Fix JdbcConsumerConfiguration for auto-wire ambiguity
Browse files Browse the repository at this point in the history
Fixes: #112
  • Loading branch information
artembilan committed Dec 19, 2024
1 parent a0d01bf commit d5a77d1
Showing 1 changed file with 7 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.SqlParameterSourceFactory;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MutableMessage;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
Expand Down Expand Up @@ -120,8 +119,8 @@ private static String generateSql(String tableName, Set<String> columns) {
}

@Bean
IntegrationFlow jdbcConsumerFlow(@Qualifier("aggregator") MessageHandler aggregator,
JdbcMessageHandler jdbcMessageHandler) {
IntegrationFlow jdbcConsumerFlow(@Qualifier("jdbcConsumerAggregator") MessageHandler aggregator,
@Qualifier("jdbcConsumerMessageHandler") JdbcMessageHandler jdbcMessageHandler) {

return (flow) -> {
if (this.properties.getBatchSize() > 1 || this.properties.getIdleTimeout() > 0) {
Expand All @@ -140,13 +139,16 @@ AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> jdbcConsumer() {
}

@Bean
FactoryBean<MessageHandler> aggregator(MessageGroupStore messageGroupStore) {
FactoryBean<MessageHandler> jdbcConsumerAggregator() {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setCorrelationStrategy((message) -> message.getPayload().getClass().getName());
aggregatorFactoryBean.setReleaseStrategy(new MessageCountReleaseStrategy(this.properties.getBatchSize()));
if (this.properties.getIdleTimeout() >= 0) {
aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(this.properties.getIdleTimeout()));
}
SimpleMessageStore messageGroupStore = new SimpleMessageStore();
messageGroupStore.setTimeoutOnIdle(true);
messageGroupStore.setCopyOnGet(false);
aggregatorFactoryBean.setMessageStore(messageGroupStore);
aggregatorFactoryBean.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
Expand All @@ -155,15 +157,7 @@ FactoryBean<MessageHandler> aggregator(MessageGroupStore messageGroupStore) {
}

@Bean
MessageGroupStore messageGroupStore() {
SimpleMessageStore messageGroupStore = new SimpleMessageStore();
messageGroupStore.setTimeoutOnIdle(true);
messageGroupStore.setCopyOnGet(false);
return messageGroupStore;
}

@Bean
public JdbcMessageHandler jdbcMessageHandler(DataSource dataSource,
public JdbcMessageHandler jdbcConsumerMessageHandler(DataSource dataSource,
@Qualifier(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME) EvaluationContext evaluationContext) {

final MultiValueMap<String, Expression> columnExpressionVariations = new LinkedMultiValueMap<>();
Expand Down

0 comments on commit d5a77d1

Please sign in to comment.