From b32ba6c23fbab390b9597a5c76300330e6b76ab8 Mon Sep 17 00:00:00 2001 From: Phuoc Nguyen Date: Fri, 11 Oct 2024 00:02:29 +0700 Subject: [PATCH] #1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate all product data. -implement kafka stream application to capture all changes related to product --- kafka/connects/debezium-product-group.json | 17 ++ kafka/connects/debezium-product.json | 3 +- recommendation/pom.xml | 4 + .../RecommendationApplication.java | 2 + .../configuration/KafkaStreamAppConfig.java | 66 +++++ .../configuration/KafkaStreamConfig.java | 14 + .../configuration/KafkaTopicConfig.java | 17 ++ .../recommendation/dto/AggregationDTO.java | 29 ++ .../recommendation/dto/BaseMessageDTO.java | 15 + .../dto/BaseMetaDataEntity.java | 14 + .../com/yas/recommendation/dto/BrandDTO.java | 19 ++ .../yas/recommendation/dto/CategoryDTO.java | 26 ++ .../com/yas/recommendation/dto/KeyDTO.java | 18 ++ .../yas/recommendation/dto/MessageDTO.java | 19 ++ .../com/yas/recommendation/dto/MetaData.java | 16 + .../dto/ProductAttributeDTO.java | 25 ++ .../dto/ProductAttributeValueDTO.java | 28 ++ .../dto/ProductCategoryDTO.java | 33 +++ .../yas/recommendation/dto/ProductDTO.java | 44 +++ .../recommendation/dto/ProductResultDTO.java | 22 ++ .../topology/AbstractTopology.java | 78 +++++ .../topology/ProductTopology.java | 280 ++++++++++++++++++ .../src/main/resources/application.properties | 22 ++ start-source-connectors.sh | 9 +- 24 files changed, 814 insertions(+), 6 deletions(-) create mode 100644 kafka/connects/debezium-product-group.json create mode 100644 recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java create mode 100644 recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java diff --git a/kafka/connects/debezium-product-group.json b/kafka/connects/debezium-product-group.json new file mode 100644 index 0000000000..7fd0e5cbe6 --- /dev/null +++ b/kafka/connects/debezium-product-group.json @@ -0,0 +1,17 @@ +{ + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "topic.prefix": "dbproduct", + "database.user": "admin", + "database.dbname": "product", + "database.hostname": "postgres", + "database.password": "admin", + "database.port": "5432", + "key.converter.schemas.enable": "false", + "value.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "schema.include.list": "public", + "table.include.list": "public.product_attribute, public.product_attribute_value, public.product_category, public.category, public.brand, public.product", + "include.toasts": "true", + "slot.name": "product_relation_slot" +} diff --git a/kafka/connects/debezium-product.json b/kafka/connects/debezium-product.json index f228d1bb53..651221bda2 100644 --- a/kafka/connects/debezium-product.json +++ b/kafka/connects/debezium-product.json @@ -12,5 +12,6 @@ "key.converter": "org.apache.kafka.connect.json.JsonConverter", "schema.include.list": "public", "table.include.list": "public.product", - "slot.name": "product_slot" + "slot.name": "product_slot", + "topic.creation.default.cleanup.policy": "compact" } diff --git a/recommendation/pom.xml b/recommendation/pom.xml index 63e18caa1f..b1023a000e 100644 --- a/recommendation/pom.xml +++ b/recommendation/pom.xml @@ -55,6 +55,10 @@ org.springframework.kafka spring-kafka + + org.apache.kafka + kafka-streams + diff --git a/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java b/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java index 81c47e19e2..a109e1aac2 100644 --- a/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java +++ b/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java @@ -4,8 +4,10 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.kafka.annotation.EnableKafkaStreams; @SpringBootApplication +@EnableKafkaStreams @EnableConfigurationProperties(EmbeddingSearchConfiguration.class) public class RecommendationApplication { diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java new file mode 100644 index 0000000000..630b8711ec --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java @@ -0,0 +1,66 @@ +package com.yas.recommendation.configuration; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.streams.StreamsConfig.*; + +@Configuration +@EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class}) +public class KafkaStreamAppConfig { + + private final KafkaStreamConfig kafkaStreamConfig; + private final KafkaTopicConfig kafkaTopicConfig; + + @Autowired + public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) { + this.kafkaStreamConfig = kafkaStreamConfig; + this.kafkaTopicConfig = kafkaTopicConfig; + } + + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + KafkaStreamsConfiguration kafkaStreamAppConfig() { + Map props = new HashMap<>(); + props.put(APPLICATION_ID_CONFIG, kafkaStreamConfig.applicationId()); + props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaStreamConfig.bootstrapServers()); + props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultKeySerdeClass()); + props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultValueSerdeClass()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, kafkaStreamConfig.trustedPackages()); + props.put(COMMIT_INTERVAL_MS_CONFIG, kafkaStreamConfig.commitIntervalMs()); + return new KafkaStreamsConfiguration(props); + } + + @Bean + public KafkaAdmin.NewTopics createTopics() { + return new KafkaAdmin.NewTopics( + createTopic(kafkaTopicConfig.product()), + createTopic(kafkaTopicConfig.brand()), + createTopic(kafkaTopicConfig.category()), + createTopic(kafkaTopicConfig.productCategory()), + createTopic(kafkaTopicConfig.productAttribute()), + createTopic(kafkaTopicConfig.productAttributeValue()), + createTopic(kafkaTopicConfig.productSink()) + ); + } + + private NewTopic createTopic(String topicName) { + return TopicBuilder.name(topicName) + .partitions(kafkaTopicConfig.defaultPartitions()) + .replicas(kafkaTopicConfig.defaultReplicas()) + .build(); + } + + +} diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java new file mode 100644 index 0000000000..d48bc75180 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java @@ -0,0 +1,14 @@ +package com.yas.recommendation.configuration; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "yas.kafka.stream") +public record KafkaStreamConfig( + String applicationId, + String bootstrapServers, + String defaultKeySerdeClass, + String defaultValueSerdeClass, + String trustedPackages, + int commitIntervalMs +) { +} diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java new file mode 100644 index 0000000000..acd1db186d --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java @@ -0,0 +1,17 @@ +package com.yas.recommendation.configuration; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "yas.kafka.topic") +public record KafkaTopicConfig( + String product, + String brand, + String category, + String productCategory, + String productAttribute, + String productAttributeValue, + String productSink, + int defaultPartitions, + int defaultReplicas +) { +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java new file mode 100644 index 0000000000..51a4b264fd --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java @@ -0,0 +1,29 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.*; + +import java.util.HashSet; +import java.util.Set; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class AggregationDTO { + private ID joinId; + private Set aggregationContents; + + public AggregationDTO() { + aggregationContents = new HashSet<>(); + } + + public void add(T aggregationContent) { + aggregationContents.add(aggregationContent); + } + +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java new file mode 100644 index 0000000000..ae239d14ce --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java @@ -0,0 +1,15 @@ +package com.yas.recommendation.dto; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public abstract class BaseMessageDTO { + protected String op; + protected Long comingTs = System.currentTimeMillis(); +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java new file mode 100644 index 0000000000..7ba5c5ecfd --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java @@ -0,0 +1,14 @@ +package com.yas.recommendation.dto; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public abstract class BaseMetaDataEntity { + protected MetaData metaData; +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java new file mode 100644 index 0000000000..cd29bb0c10 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java @@ -0,0 +1,19 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class BrandDTO extends BaseMetaDataEntity { + private Long id; + private String name; +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java new file mode 100644 index 0000000000..68aa020d4f --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java @@ -0,0 +1,26 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.*; + +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false) +@ToString +public class CategoryDTO extends BaseMetaDataEntity { + private Long id; + private String name; + + public CategoryDTO(MetaData metaData, Long id, String name) { + super(metaData); + this.id = id; + this.name = name; + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java new file mode 100644 index 0000000000..9ad16edc0b --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java @@ -0,0 +1,18 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class KeyDTO { + private Long id; +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java new file mode 100644 index 0000000000..29788d740f --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java @@ -0,0 +1,19 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class MessageDTO extends BaseMessageDTO { + private T before; + private T after; +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java b/recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java new file mode 100644 index 0000000000..33fc0792ed --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java @@ -0,0 +1,16 @@ +package com.yas.recommendation.dto; + + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class MetaData extends BaseMessageDTO { + public MetaData(String op, Long comingTs) { + super(op, comingTs); + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java new file mode 100644 index 0000000000..9a605496f0 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java @@ -0,0 +1,25 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.*; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false) +public class ProductAttributeDTO extends BaseMetaDataEntity { + private Long id; + private String name; + private String value; + + public ProductAttributeDTO(MetaData metaData, Long id, String name, String value) { + super(metaData); + this.id = id; + this.name = name; + this.value = value; + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java new file mode 100644 index 0000000000..d30f276ea5 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java @@ -0,0 +1,28 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.*; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false) +public class ProductAttributeValueDTO extends BaseMetaDataEntity { + private Long id; + private String value; + @JsonProperty("product_id") + private Long productId; + @JsonProperty("product_attribute_id") + private Long productAttributeId; + private String productAttributeName; + private boolean isDeleted; + + public ProductAttributeValueDTO(Long id) { + this.id = id; + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java new file mode 100644 index 0000000000..a91fce7070 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java @@ -0,0 +1,33 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ProductCategoryDTO extends BaseMetaDataEntity { + private Long id; + + @JsonProperty("category_id") + private Long categoryId; + + @JsonProperty("product_id") + private Long productId; + + private String categoryName; + + private boolean isDeleted; + + public ProductCategoryDTO(Long id) { + this.id = id; + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java new file mode 100644 index 0000000000..4627e626a7 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java @@ -0,0 +1,44 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.*; + +import java.util.List; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@ToString +public class ProductDTO extends BaseMetaDataEntity { + + private Long id; + + @JsonProperty("brand_id") + private Long brandId; + + private String name; + + @JsonProperty("short_description") + private String shortDescription; + + private String specification; + + @JsonProperty("is_published") + private Boolean published; + + @JsonProperty("meta_description") + private String metaDescription; + + @JsonProperty("meta_title") + private String metaTitle; + + @JsonProperty("meta_keyword") + private String metaKeyword; + + private Double price; +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java new file mode 100644 index 0000000000..898bfcd2b2 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java @@ -0,0 +1,22 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.*; + +import java.util.HashSet; +import java.util.Set; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@ToString +public class ProductResultDTO extends ProductDTO { + private BrandDTO brand; + private Set categories = new HashSet<>(); + private Set productAttributes = new HashSet<>(); + +} diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java new file mode 100644 index 0000000000..e39d49d21c --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java @@ -0,0 +1,78 @@ +package com.yas.recommendation.topology; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.yas.recommendation.configuration.KafkaTopicConfig; +import com.yas.recommendation.dto.AggregationDTO; +import com.yas.recommendation.dto.BaseMetaDataEntity; +import com.yas.recommendation.dto.MessageDTO; +import com.yas.recommendation.dto.MetaData; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.lang.reflect.Type; + +public abstract class AbstractTopology { + + protected final KafkaTopicConfig kafkaTopicConfig; + + public AbstractTopology(KafkaTopicConfig kafkaTopicConfig) { + this.kafkaTopicConfig = kafkaTopicConfig; + } + + protected abstract void process(StreamsBuilder streamsBuilder); + + protected Serde getSerde(Class clazz) { + return new JsonSerde<>(clazz); + } + + protected Serde getSerde(TypeReference typeReference) { + return new JsonSerde<>(typeReference); + } + + protected Serde> getMessageDTOSerde(Class clazz) { + TypeReference> typeReference = new TypeReference<>() { + @Override + public Type getType() { + return TypeFactory.defaultInstance() + .constructParametricType(MessageDTO.class, clazz); + } + }; + return getSerde(typeReference); + } + + protected Serde> getAggregationDTOSerde(Class clazzID, Class clazzT) { + TypeReference> typeReference = new TypeReference<>() { + @Override + public Type getType() { + return TypeFactory + .defaultInstance() + .constructParametricType(AggregationDTO.class, clazzID, clazzT); + } + }; + return getSerde(typeReference); + } + + + protected Materialized> createMaterializedStore( + String storeName, Serde keySerde, Serde valueSerde) { + + return Materialized.>as(storeName) + .withKeySerde(keySerde) + .withValueSerde(valueSerde) + .withCachingDisabled(); + } + + protected T extractModelFromMessage(MessageDTO message) { + if (message == null || message.getAfter() == null) { + return null; + } + T model = message.getAfter(); + model.setMetaData(new MetaData(message.getOp(), message.getComingTs())); + return model; + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java new file mode 100644 index 0000000000..9dab6d6837 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java @@ -0,0 +1,280 @@ +package com.yas.recommendation.topology; + +import com.yas.recommendation.configuration.KafkaTopicConfig; +import com.yas.recommendation.dto.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ProductTopology extends AbstractTopology { + + private static final String PRODUCT_MATERIALIZED_VIEW = "product-mv"; + private static final String BRAND_MATERIALIZED_VIEW = "brand-mv"; + private static final String PRODUCT_BRAND_MATERIALIZED_VIEW = "product-brand-mv"; + private static final String CATEGORY_MATERIALIZED_VIEW = "category-mv"; + private static final String PRODUCT_CATEGORY_MATERIALIZED_VIEW = "product-category-mv"; + private static final String PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW = "product-category-nonnull-mv"; + private static final String PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW = "product-category-with-name-mv"; + private static final String PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW = "product-category-agg-mv"; + private static final String PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW = "product-brand-category-detail-mv"; + private static final String PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW = "product-attribute-mv"; + private static final String PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW = "product-attribute-value-nonnull-mv"; + private static final String PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW = "product-attribute-value-mv"; + private static final String PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW = "product-attribute-value-with-name-mv"; + private static final String PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW = "product-attribute-agg-mv"; + private static final String PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW = "product-brand-category-attribute-detail-mv"; + + @Autowired + public ProductTopology(KafkaTopicConfig kafkaTopicConfig) { + super(kafkaTopicConfig); + } + + @Override + @Autowired + protected void process(StreamsBuilder streamsBuilder) { + KTable productTable = createProductTable(streamsBuilder); + KTable brandTable = createBrandTable(streamsBuilder); + KTable productBrandTable = joinProductAndBrand(productTable, brandTable); + + KTable categoryTable = createCategoryTable(streamsBuilder); + KTable productCategoryTable = createProductCategoryTable(streamsBuilder); + KTable nonNullProductCategoryTable = filterNonNullProductCategory(productCategoryTable); + KTable productCategoryDetailTable = enrichProductCategoryWithDetails(productCategoryTable, categoryTable); + + KTable> productCategoryAgg = aggregateProductCategoryDetails(productCategoryDetailTable, nonNullProductCategoryTable); + + KTable addingCategoryDetailTable = joinProductWithCategoryDetails(productBrandTable, productCategoryAgg); + + KTable productAttributeTable = createProductAttributeTable(streamsBuilder); + KTable productAttributeValueTable = createProductAttributeValueTable(streamsBuilder); + KTable nonNullProductAttributeValueTable = filterNonNullProductAttributeValue(productAttributeValueTable); + KTable productAttributeValueDetailTable = enrichProductAttributeValueDetails(productAttributeValueTable, productAttributeTable); + + KTable> productAttributeAgg = aggregateProductAttributeDetails(productAttributeValueDetailTable, nonNullProductAttributeValueTable); + + KTable resultTable = joinProductWithAttributeDetails(addingCategoryDetailTable, productAttributeAgg); + + writeToSink(resultTable); + } + + private KTable createProductTable(StreamsBuilder streamsBuilder) { + return streamsBuilder.stream( + kafkaTopicConfig.product(), + Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductDTO.class))) + .selectKey((key, value) -> key.getId()) + .mapValues(this::extractModelFromMessage) + .toTable(createMaterializedStore(PRODUCT_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductDTO.class))); + } + + private KTable createBrandTable(StreamsBuilder streamsBuilder) { + return streamsBuilder.stream( + kafkaTopicConfig.brand(), + Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(BrandDTO.class))) + .selectKey((key, value) -> key.getId()) + .mapValues(this::extractModelFromMessage) + .toTable(createMaterializedStore(BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(BrandDTO.class))); + } + + private KTable joinProductAndBrand(KTable productTable, KTable brandTable) { + return productTable.leftJoin( + brandTable, + ProductDTO::getBrandId, + this::enrichWithProductAndBrandData, + createMaterializedStore(PRODUCT_BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class)) + ); + } + + private KTable createCategoryTable(StreamsBuilder streamsBuilder) { + return streamsBuilder.stream( + kafkaTopicConfig.category(), + Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(CategoryDTO.class))) + .selectKey((key, value) -> key.getId()) + .mapValues(this::extractModelFromMessage) + .toTable(createMaterializedStore(CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(CategoryDTO.class))); + } + + private KTable createProductCategoryTable(StreamsBuilder streamsBuilder) { + return streamsBuilder.stream( + kafkaTopicConfig.productCategory(), + Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductCategoryDTO.class))) + .selectKey((key, value) -> key.getId()) + .mapValues(this::extractModelFromMessage) + .toTable(createMaterializedStore(PRODUCT_CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))); + } + + private KTable filterNonNullProductCategory(KTable productCategoryTable) { + return productCategoryTable.toStream() + .filter((key, value) -> value != null) + .toTable(createMaterializedStore(PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))); + } + + private KTable enrichProductCategoryWithDetails(KTable productCategoryTable, KTable categoryTable) { + return productCategoryTable.leftJoin( + categoryTable, + ProductCategoryDTO::getCategoryId, + (productCategory, category) -> { + productCategory.setCategoryName(category.getName()); + return productCategory; + }, + createMaterializedStore(PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)) + ); + } + + private KTable> aggregateProductCategoryDetails(KTable productCategoryDetailTable, KTable nonNullProductCategoryTable) { + return productCategoryDetailTable.toStream() + .mapValues(v -> v != null ? v : new ProductCategoryDTO(-1L)) + .join( + nonNullProductCategoryTable, + (productCategoryDetail, nonNullProductCategoryDetail) -> { + if (productCategoryDetail.getId().equals(-1L)) { + nonNullProductCategoryDetail.setDeleted(true); + return nonNullProductCategoryDetail; + } + return productCategoryDetail; + } + ) + .selectKey((key, value) -> value.getProductId()) + .groupByKey() + .aggregate( + AggregationDTO::new, + (productId, productCategoryDetail, agg) -> { + agg.setJoinId(productId); + agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId())); + + if (!productCategoryDetail.isDeleted()) { + agg.add(new CategoryDTO(productCategoryDetail.getMetaData(), productCategoryDetail.getCategoryId(), productCategoryDetail.getCategoryName())); + } + return agg; + }, + createMaterializedStore(PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, CategoryDTO.class)) + ); + } + + private KTable joinProductWithCategoryDetails(KTable productBrandTable, KTable> productCategoryAgg) { + return productBrandTable.leftJoin( + productCategoryAgg, + (productResult, agg) -> { + if (agg != null) { + productResult.setCategories(agg.getAggregationContents()); + } + return productResult; + }, + createMaterializedStore(PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class)) + ); + } + + private KTable createProductAttributeTable(StreamsBuilder streamsBuilder) { + return streamsBuilder.stream( + kafkaTopicConfig.productAttribute(), + Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeDTO.class))) + .selectKey((key, value) -> key.getId()) + .mapValues(this::extractModelFromMessage) + .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeDTO.class))); + } + + private KTable createProductAttributeValueTable(StreamsBuilder streamsBuilder) { + return streamsBuilder.stream( + kafkaTopicConfig.productAttributeValue(), + Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeValueDTO.class))) + .selectKey((key, value) -> key.getId()) + .mapValues(this::extractModelFromMessage) + .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))); + } + + private KTable filterNonNullProductAttributeValue(KTable productAttributeValueTable) { + return productAttributeValueTable.toStream() + .filter((key, value) -> value != null) + .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))); + } + + private KTable enrichProductAttributeValueDetails(KTable productAttributeValueTable, KTable productAttributeTable) { + return productAttributeValueTable.leftJoin( + productAttributeTable, + ProductAttributeValueDTO::getProductAttributeId, + (productAttributeValue, productAttribute) -> { + productAttributeValue.setProductAttributeName(productAttribute.getName()); + return productAttributeValue; + }, + createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class)) + ); + } + + private KTable> aggregateProductAttributeDetails(KTable productAttributeValueDetailTable, KTable nonNullProductAttributeValueTable) { + return productAttributeValueDetailTable.toStream() + .mapValues(v -> v != null ? v : new ProductAttributeValueDTO(-1L)) + .join( + nonNullProductAttributeValueTable, + (productAttributeValueDetail, nonNullProductAttributeValueDetail) -> { + if (productAttributeValueDetail.getId().equals(-1L)) { + nonNullProductAttributeValueDetail.setDeleted(true); + return nonNullProductAttributeValueDetail; + } + return productAttributeValueDetail; + } + ) + .selectKey((key, value) -> value.getProductId()) + .groupByKey() + .aggregate( + AggregationDTO::new, + (productId, productAttributeValueDetail, agg) -> { + agg.setJoinId(productId); + agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId())); + + if (!productAttributeValueDetail.isDeleted()) { + agg.add(new ProductAttributeDTO( + productAttributeValueDetail.getMetaData(), + productAttributeValueDetail.getProductAttributeId(), + productAttributeValueDetail.getProductAttributeName(), + productAttributeValueDetail.getValue() + )); + } + return agg; + }, + createMaterializedStore(PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, ProductAttributeDTO.class)) + ); + } + + private KTable joinProductWithAttributeDetails(KTable addingCategoryDetailTable, KTable> productAttributeAgg) { + return addingCategoryDetailTable.leftJoin( + productAttributeAgg, + ProductResultDTO::getId, + (productResult, agg) -> { + if (agg != null) { + productResult.setProductAttributes(agg.getAggregationContents()); + } + return productResult; + }, + createMaterializedStore(PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class)) + ); + } + + private void writeToSink(KTable resultTable) { + resultTable + .toStream() + .to(kafkaTopicConfig.productSink(), Produced.with(Serdes.Long(), getSerde(ProductResultDTO.class))); + } + + private ProductResultDTO enrichWithProductAndBrandData(ProductDTO productRecord, BrandDTO brandRecord) { + ProductResultDTO productResultDTO = new ProductResultDTO(); + productResultDTO.setBrand(brandRecord); + productResultDTO.setId(productRecord.getId()); + productResultDTO.setName(productRecord.getName()); + productResultDTO.setMetaDescription(productRecord.getMetaDescription()); + productResultDTO.setShortDescription(productRecord.getShortDescription()); + productResultDTO.setPrice(productRecord.getPrice()); + productResultDTO.setMetaTitle(productRecord.getMetaTitle()); + productResultDTO.setSpecification(productRecord.getSpecification()); + productResultDTO.setBrandId(productRecord.getBrandId()); + productResultDTO.setMetaData(productRecord.getMetaData()); + productResultDTO.setPublished(productRecord.getPublished()); + return productResultDTO; + } + +} diff --git a/recommendation/src/main/resources/application.properties b/recommendation/src/main/resources/application.properties index 9ed2599991..a2716093ca 100644 --- a/recommendation/src/main/resources/application.properties +++ b/recommendation/src/main/resources/application.properties @@ -52,3 +52,25 @@ yas.recommendation.embedding-based.search.topK=10 yas.recommendation.embedding-based.search.initDefaultData=false yas.recommendation.embedding-based.search.similarityThreshold=0 +# Kafka Topic Config +yas.kafka.topic.product=dbproduct.public.product +yas.kafka.topic.brand=dbproduct.public.brand +yas.kafka.topic.category=dbproduct.public.category +yas.kafka.topic.productCategory=dbproduct.public.product_category +yas.kafka.topic.productAttribute=dbproduct.public.product_attribute +yas.kafka.topic.productAttributeValue=dbproduct.public.product_attribute_value +yas.kafka.topic.productSink=product_sink +yas.kafka.topic.default-partitions=1 +yas.kafka.topic.default-replicas=1 + +#Kafka Stream Config +yas.kafka.stream.application-id=yas-streams-application +yas.kafka.stream.bootstrap-servers=kafka:9092 +yas.kafka.stream.default-key-serde-class=org.springframework.kafka.support.serializer.JsonSerde +yas.kafka.stream.default-value-serde-class=org.springframework.kafka.support.serializer.JsonSerde +yas.kafka.stream.trusted-packages=* +yas.kafka.stream.commit-interval-ms=0 + + + + diff --git a/start-source-connectors.sh b/start-source-connectors.sh index 8cc7fcafde..c8058cb10d 100644 --- a/start-source-connectors.sh +++ b/start-source-connectors.sh @@ -1,8 +1,7 @@ - curl -i -X PUT -H "Content-Type:application/json" \ - http://localhost:8083/connectors/product-connector/config \ - -d @kafka/connects/debezium-product.json + http://localhost:8083/connectors/order-connector/config \ + -d @kafka/connects/debezium-order.json curl -i -X PUT -H "Content-Type:application/json" \ - http://localhost:8083/connectors/order-connector/config \ - -d @kafka/connects/debezium-order.json \ No newline at end of file + http://localhost:8083/connectors/product-group-connector/config \ + -d @kafka/connects/debezium-product-group.json \ No newline at end of file