-
Notifications
You must be signed in to change notification settings - Fork 454
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
…all product data. -implement kafka stream application to capture all changes related to product
- Loading branch information
Phuoc Nguyen
committed
Oct 25, 2024
1 parent
8b4c8f9
commit 0a2c88b
Showing
24 changed files
with
813 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
{ | ||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", | ||
"topic.prefix": "dbproduct", | ||
"database.user": "admin", | ||
"database.dbname": "product", | ||
"database.hostname": "postgres", | ||
"database.password": "admin", | ||
"database.port": "5432", | ||
"key.converter.schemas.enable": "false", | ||
"value.converter.schemas.enable": "false", | ||
"value.converter": "org.apache.kafka.connect.json.JsonConverter", | ||
"key.converter": "org.apache.kafka.connect.json.JsonConverter", | ||
"schema.include.list": "public", | ||
"table.include.list": "public.product_attribute, public.product_attribute_value, public.product_category, public.category, public.brand, public.product", | ||
"include.toasts": "true", | ||
"slot.name": "product_relation_slot" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
66 changes: 66 additions & 0 deletions
66
recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package com.yas.recommendation.configuration; | ||
|
||
import org.apache.kafka.clients.admin.NewTopic; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; | ||
import org.springframework.kafka.config.KafkaStreamsConfiguration; | ||
import org.springframework.kafka.config.TopicBuilder; | ||
import org.springframework.kafka.core.KafkaAdmin; | ||
import org.springframework.kafka.support.serializer.JsonDeserializer; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import static org.apache.kafka.streams.StreamsConfig.*; | ||
|
||
@Configuration | ||
@EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class}) | ||
public class KafkaStreamAppConfig { | ||
|
||
private final KafkaStreamConfig kafkaStreamConfig; | ||
private final KafkaTopicConfig kafkaTopicConfig; | ||
|
||
@Autowired | ||
public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) { | ||
this.kafkaStreamConfig = kafkaStreamConfig; | ||
this.kafkaTopicConfig = kafkaTopicConfig; | ||
} | ||
|
||
|
||
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) | ||
KafkaStreamsConfiguration kafkaStreamAppConfig() { | ||
Map<String, Object> props = new HashMap<>(); | ||
props.put(APPLICATION_ID_CONFIG, kafkaStreamConfig.applicationId()); | ||
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaStreamConfig.bootstrapServers()); | ||
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultKeySerdeClass()); | ||
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultValueSerdeClass()); | ||
props.put(JsonDeserializer.TRUSTED_PACKAGES, kafkaStreamConfig.trustedPackages()); | ||
props.put(COMMIT_INTERVAL_MS_CONFIG, kafkaStreamConfig.commitIntervalMs()); | ||
return new KafkaStreamsConfiguration(props); | ||
} | ||
|
||
@Bean | ||
public KafkaAdmin.NewTopics createTopics() { | ||
return new KafkaAdmin.NewTopics( | ||
createTopic(kafkaTopicConfig.product()), | ||
createTopic(kafkaTopicConfig.brand()), | ||
createTopic(kafkaTopicConfig.category()), | ||
createTopic(kafkaTopicConfig.productCategory()), | ||
createTopic(kafkaTopicConfig.productAttribute()), | ||
createTopic(kafkaTopicConfig.productAttributeValue()), | ||
createTopic(kafkaTopicConfig.productSink()) | ||
); | ||
} | ||
|
||
private NewTopic createTopic(String topicName) { | ||
return TopicBuilder.name(topicName) | ||
.partitions(kafkaTopicConfig.defaultPartitions()) | ||
.replicas(kafkaTopicConfig.defaultReplicas()) | ||
.build(); | ||
} | ||
|
||
|
||
} |
14 changes: 14 additions & 0 deletions
14
recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.yas.recommendation.configuration; | ||
|
||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
|
||
@ConfigurationProperties(prefix = "yas.kafka.stream") | ||
public record KafkaStreamConfig( | ||
String applicationId, | ||
String bootstrapServers, | ||
String defaultKeySerdeClass, | ||
String defaultValueSerdeClass, | ||
String trustedPackages, | ||
int commitIntervalMs | ||
) { | ||
} |
17 changes: 17 additions & 0 deletions
17
recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package com.yas.recommendation.configuration; | ||
|
||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
|
||
@ConfigurationProperties(prefix = "yas.kafka.topic") | ||
public record KafkaTopicConfig( | ||
String product, | ||
String brand, | ||
String category, | ||
String productCategory, | ||
String productAttribute, | ||
String productAttributeValue, | ||
String productSink, | ||
int defaultPartitions, | ||
int defaultReplicas | ||
) { | ||
} |
29 changes: 29 additions & 0 deletions
29
recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import lombok.*; | ||
|
||
import java.util.HashSet; | ||
import java.util.Set; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@EqualsAndHashCode | ||
@ToString | ||
public class AggregationDTO<ID, T> { | ||
private ID joinId; | ||
private Set<T> aggregationContents; | ||
|
||
public AggregationDTO() { | ||
aggregationContents = new HashSet<>(); | ||
} | ||
|
||
public void add(T aggregationContent) { | ||
aggregationContents.add(aggregationContent); | ||
} | ||
|
||
} |
15 changes: 15 additions & 0 deletions
15
recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public abstract class BaseMessageDTO { | ||
protected String op; | ||
protected Long comingTs = System.currentTimeMillis(); | ||
} |
14 changes: 14 additions & 0 deletions
14
recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public abstract class BaseMetaDataEntity { | ||
protected MetaData metaData; | ||
} |
19 changes: 19 additions & 0 deletions
19
recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public class BrandDTO extends BaseMetaDataEntity { | ||
private Long id; | ||
private String name; | ||
} |
26 changes: 26 additions & 0 deletions
26
recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import lombok.*; | ||
|
||
import java.util.Objects; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
@EqualsAndHashCode(callSuper = false) | ||
@ToString | ||
public class CategoryDTO extends BaseMetaDataEntity { | ||
private Long id; | ||
private String name; | ||
|
||
public CategoryDTO(MetaData metaData, Long id, String name) { | ||
super(metaData); | ||
this.id = id; | ||
this.name = name; | ||
} | ||
} |
18 changes: 18 additions & 0 deletions
18
recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public class KeyDTO { | ||
private Long id; | ||
} |
19 changes: 19 additions & 0 deletions
19
recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public class MessageDTO<T> extends BaseMessageDTO { | ||
private T before; | ||
private T after; | ||
} |
16 changes: 16 additions & 0 deletions
16
recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@Getter | ||
@Setter | ||
@NoArgsConstructor | ||
public class MetaData extends BaseMessageDTO { | ||
public MetaData(String op, Long comingTs) { | ||
super(op, comingTs); | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import lombok.*; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
@EqualsAndHashCode(callSuper = false) | ||
public class ProductAttributeDTO extends BaseMetaDataEntity { | ||
private Long id; | ||
private String name; | ||
private String value; | ||
|
||
public ProductAttributeDTO(MetaData metaData, Long id, String name, String value) { | ||
super(metaData); | ||
this.id = id; | ||
this.name = name; | ||
this.value = value; | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package com.yas.recommendation.dto; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import lombok.*; | ||
|
||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
@EqualsAndHashCode(callSuper = false) | ||
public class ProductAttributeValueDTO extends BaseMetaDataEntity { | ||
private Long id; | ||
private String value; | ||
@JsonProperty("product_id") | ||
private Long productId; | ||
@JsonProperty("product_attribute_id") | ||
private Long productAttributeId; | ||
private String productAttributeName; | ||
private boolean isDeleted; | ||
|
||
public ProductAttributeValueDTO(Long id) { | ||
this.id = id; | ||
} | ||
} |
Oops, something went wrong.