Skip to content

Commit

Permalink
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
Browse files Browse the repository at this point in the history
…all product data.

-implement kafka stream application to capture all changes related to product
  • Loading branch information
Phuoc Nguyen committed Oct 25, 2024
1 parent 8b4c8f9 commit b32ba6c
Show file tree
Hide file tree
Showing 24 changed files with 814 additions and 6 deletions.
17 changes: 17 additions & 0 deletions kafka/connects/debezium-product-group.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "dbproduct",
"database.user": "admin",
"database.dbname": "product",
"database.hostname": "postgres",
"database.password": "admin",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product_attribute, public.product_attribute_value, public.product_category, public.category, public.brand, public.product",
"include.toasts": "true",
"slot.name": "product_relation_slot"
}
3 changes: 2 additions & 1 deletion kafka/connects/debezium-product.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product",
"slot.name": "product_slot"
"slot.name": "product_slot",
"topic.creation.default.cleanup.policy": "compact"
}
4 changes: 4 additions & 0 deletions recommendation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>

<!-- Spring AI Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
@EnableConfigurationProperties(EmbeddingSearchConfiguration.class)
public class RecommendationApplication {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.yas.recommendation.configuration;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;

Check warning on line 14 in recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Extra separation in import group before 'java.util.HashMap'

Check warning on line 14 in recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'java.util.HashMap' import. Should be before 'org.springframework.kafka.support.serializer.JsonDeserializer'.
import java.util.Map;

Check warning on line 15 in recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'java.util.Map' import. Should be before 'org.springframework.kafka.support.serializer.JsonDeserializer'.

import static org.apache.kafka.streams.StreamsConfig.*;

Check warning on line 17 in recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.apache.kafka.streams.StreamsConfig.*' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.

Check warning on line 17 in recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.AvoidStarImportCheck

Using the '.*' form of import should be avoided - org.apache.kafka.streams.StreamsConfig.*.

@Configuration
@EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class})
public class KafkaStreamAppConfig {

private final KafkaStreamConfig kafkaStreamConfig;
private final KafkaTopicConfig kafkaTopicConfig;

@Autowired
public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) {
this.kafkaStreamConfig = kafkaStreamConfig;
this.kafkaTopicConfig = kafkaTopicConfig;
}


@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kafkaStreamAppConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, kafkaStreamConfig.applicationId());
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaStreamConfig.bootstrapServers());
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultKeySerdeClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultValueSerdeClass());
props.put(JsonDeserializer.TRUSTED_PACKAGES, kafkaStreamConfig.trustedPackages());
props.put(COMMIT_INTERVAL_MS_CONFIG, kafkaStreamConfig.commitIntervalMs());
return new KafkaStreamsConfiguration(props);
}

@Bean
public KafkaAdmin.NewTopics createTopics() {
return new KafkaAdmin.NewTopics(
createTopic(kafkaTopicConfig.product()),
createTopic(kafkaTopicConfig.brand()),
createTopic(kafkaTopicConfig.category()),
createTopic(kafkaTopicConfig.productCategory()),
createTopic(kafkaTopicConfig.productAttribute()),
createTopic(kafkaTopicConfig.productAttributeValue()),
createTopic(kafkaTopicConfig.productSink())
);
}

private NewTopic createTopic(String topicName) {
return TopicBuilder.name(topicName)
.partitions(kafkaTopicConfig.defaultPartitions())
.replicas(kafkaTopicConfig.defaultReplicas())
.build();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.yas.recommendation.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "yas.kafka.stream")
public record KafkaStreamConfig(
String applicationId,
String bootstrapServers,
String defaultKeySerdeClass,
String defaultValueSerdeClass,
String trustedPackages,
int commitIntervalMs
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.yas.recommendation.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "yas.kafka.topic")
public record KafkaTopicConfig(
String product,
String brand,
String category,
String productCategory,
String productAttribute,
String productAttributeValue,
String productSink,
int defaultPartitions,
int defaultReplicas
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.*;

Check warning on line 5 in recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.AvoidStarImportCheck

Using the '.*' form of import should be avoided - lombok.*.

import java.util.HashSet;

Check warning on line 7 in recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Extra separation in import group before 'java.util.HashSet'

Check warning on line 7 in recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'java.util.HashSet' import. Should be before 'lombok.*'.
import java.util.Set;

Check warning on line 8 in recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'java.util.Set' import. Should be before 'lombok.*'.

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class AggregationDTO<ID, T> {

Check warning on line 17 in recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'AggregationDTO' must contain no more than '1' consecutive capital letters.

Check warning on line 17 in recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.ClassTypeParameterNameCheck

Class type name 'ID' must match pattern '(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)'.
private ID joinId;
private Set<T> aggregationContents;

public AggregationDTO() {
aggregationContents = new HashSet<>();
}

public void add(T aggregationContent) {
aggregationContents.add(aggregationContent);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.yas.recommendation.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public abstract class BaseMessageDTO {

Check warning on line 12 in recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'BaseMessageDTO' must contain no more than '1' consecutive capital letters.
protected String op;
protected Long comingTs = System.currentTimeMillis();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.yas.recommendation.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public abstract class BaseMetaDataEntity {
protected MetaData metaData;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class BrandDTO extends BaseMetaDataEntity {

Check warning on line 16 in recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'BrandDTO' must contain no more than '1' consecutive capital letters.
private Long id;
private String name;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.*;

Check warning on line 5 in recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.AvoidStarImportCheck

Using the '.*' form of import should be avoided - lombok.*.

import java.util.Objects;

Check warning on line 7 in recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Extra separation in import group before 'java.util.Objects'

Check warning on line 7 in recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'java.util.Objects' import. Should be before 'lombok.*'.

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
public class CategoryDTO extends BaseMetaDataEntity {

Check warning on line 17 in recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'CategoryDTO' must contain no more than '1' consecutive capital letters.
private Long id;
private String name;

public CategoryDTO(MetaData metaData, Long id, String name) {
super(metaData);
this.id = id;
this.name = name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KeyDTO {

Check warning on line 16 in recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'KeyDTO' must contain no more than '1' consecutive capital letters.
private Long id;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO<T> extends BaseMessageDTO {

Check warning on line 16 in recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'MessageDTO' must contain no more than '1' consecutive capital letters.
private T before;
private T after;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.yas.recommendation.dto;


import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
public class MetaData extends BaseMessageDTO {
public MetaData(String op, Long comingTs) {
super(op, comingTs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.*;

Check warning on line 5 in recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.AvoidStarImportCheck

Using the '.*' form of import should be avoided - lombok.*.

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class ProductAttributeDTO extends BaseMetaDataEntity {

Check warning on line 14 in recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'ProductAttributeDTO' must contain no more than '1' consecutive capital letters.
private Long id;
private String name;
private String value;

public ProductAttributeDTO(MetaData metaData, Long id, String name, String value) {
super(metaData);
this.id = id;
this.name = name;
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;

Check warning on line 6 in recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.AvoidStarImportCheck

Using the '.*' form of import should be avoided - lombok.*.

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class ProductAttributeValueDTO extends BaseMetaDataEntity {

Check warning on line 15 in recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'ProductAttributeValueDTO' must contain no more than '1' consecutive capital letters.
private Long id;
private String value;
@JsonProperty("product_id")
private Long productId;
@JsonProperty("product_attribute_id")
private Long productAttributeId;
private String productAttributeName;
private boolean isDeleted;

public ProductAttributeValueDTO(Long id) {
this.id = id;
}
}
Loading

0 comments on commit b32ba6c

Please sign in to comment.