Skip to content

Commit

Permalink
GH-120: Fix auto-wiring ambiguity in the AwsS3SupplierConfiguration
Browse files Browse the repository at this point in the history
Fixes: #120
  • Loading branch information
artembilan committed Dec 19, 2024
1 parent 85a33ea commit 6b0f058
Showing 1 changed file with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -96,7 +95,7 @@ Supplier<Flux<Message<?>>> s3Supplier(Publisher<Message<?>> s3SupplierFlow) {
}

@Bean
ChainFileListFilter<S3Object> filter(ConcurrentMetadataStore metadataStore) {
ChainFileListFilter<S3Object> s3SupplierFileListFilter(ConcurrentMetadataStore metadataStore) {
ChainFileListFilter<S3Object> chainFilter = new ChainFileListFilter<>();
if (StringUtils.hasText(this.awsS3SupplierProperties.getFilenamePattern())) {
chainFilter
Expand Down Expand Up @@ -129,17 +128,15 @@ Publisher<Message<Object>> s3SupplierFlow(S3InboundFileSynchronizingMessageSourc
}

@Bean
S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S3Object> filter) {

S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S3Object> s3SupplierFileListFilter) {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.s3SessionFactory);
synchronizer.setDeleteRemoteFiles(this.awsS3SupplierProperties.isDeleteRemoteFiles());
synchronizer.setPreserveTimestamp(this.awsS3SupplierProperties.isPreserveTimestamp());
String remoteDir = this.awsS3SupplierProperties.getRemoteDir();
synchronizer.setRemoteDirectory(remoteDir);
synchronizer.setRemoteFileSeparator(this.awsS3SupplierProperties.getRemoteFileSeparator());
synchronizer.setTemporaryFileSuffix(this.awsS3SupplierProperties.getTmpFileSuffix());
synchronizer.setFilter(filter);

synchronizer.setFilter(s3SupplierFileListFilter);
return synchronizer;
}

Expand Down Expand Up @@ -178,8 +175,8 @@ Supplier<Flux<Message<Object>>> s3Supplier(Publisher<Message<Object>> s3Supplier
}

@Bean
Publisher<Message<Object>> s3SupplierFlow(ReactiveMessageSourceProducer s3ListingProducer) {
return IntegrationFlow.from(s3ListingProducer).split().toReactivePublisher(true);
Publisher<Message<Object>> s3SupplierFlow(ReactiveMessageSourceProducer s3ListingMessageProducer) {
return IntegrationFlow.from(s3ListingMessageProducer).split().toReactivePublisher(true);
}

@Bean
Expand Down Expand Up @@ -210,13 +207,13 @@ else if (this.awsS3SupplierProperties.getFilenameRegex() != null) {

@Bean
ReactiveMessageSourceProducer s3ListingMessageProducer(S3Client amazonS3, ObjectMapper objectMapper,
AwsS3SupplierProperties awsS3SupplierProperties, Predicate<S3Object> filter) {
AwsS3SupplierProperties awsS3SupplierProperties, Predicate<S3Object> listOnlyFilter) {
return new ReactiveMessageSourceProducer((MessageSource<List<String>>) () -> {
List<String> summaryList = amazonS3
.listObjects(ListObjectsRequest.builder().bucket(awsS3SupplierProperties.getRemoteDir()).build())
.contents()
.stream()
.filter(filter)
.filter(listOnlyFilter)
.map((s3Object) -> {
try {
return objectMapper.writeValueAsString(s3Object.toBuilder());
Expand All @@ -225,7 +222,7 @@ ReactiveMessageSourceProducer s3ListingMessageProducer(S3Client amazonS3, Object
throw new RuntimeException(ex);
}
})
.collect(Collectors.toList());
.toList();
return summaryList.isEmpty() ? null : new GenericMessage<>(summaryList);
});
}
Expand Down

0 comments on commit 6b0f058

Please sign in to comment.