diff --git a/kafka/connects/debezium-product-group.json b/kafka/connects/debezium-product-group.json
new file mode 100644
index 0000000000..7fd0e5cbe6
--- /dev/null
+++ b/kafka/connects/debezium-product-group.json
@@ -0,0 +1,17 @@
+{
+ "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
+ "topic.prefix": "dbproduct",
+ "database.user": "admin",
+ "database.dbname": "product",
+ "database.hostname": "postgres",
+ "database.password": "admin",
+ "database.port": "5432",
+ "key.converter.schemas.enable": "false",
+ "value.converter.schemas.enable": "false",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "key.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "schema.include.list": "public",
+ "table.include.list": "public.product_attribute, public.product_attribute_value, public.product_category, public.category, public.brand, public.product",
+ "include.toasts": "true",
+ "slot.name": "product_relation_slot"
+}
diff --git a/kafka/connects/debezium-product.json b/kafka/connects/debezium-product.json
index f228d1bb53..651221bda2 100644
--- a/kafka/connects/debezium-product.json
+++ b/kafka/connects/debezium-product.json
@@ -12,5 +12,6 @@
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product",
- "slot.name": "product_slot"
+ "slot.name": "product_slot",
+ "topic.creation.default.cleanup.policy": "compact"
}
diff --git a/recommendation/pom.xml b/recommendation/pom.xml
index 63e18caa1f..b1023a000e 100644
--- a/recommendation/pom.xml
+++ b/recommendation/pom.xml
@@ -55,6 +55,10 @@
org.springframework.kafka
spring-kafka
+
+ org.apache.kafka
+ kafka-streams
+
diff --git a/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java b/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java
index 81c47e19e2..a109e1aac2 100644
--- a/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java
+++ b/recommendation/src/main/java/com/yas/recommendation/RecommendationApplication.java
@@ -4,8 +4,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.kafka.annotation.EnableKafkaStreams;
@SpringBootApplication
+@EnableKafkaStreams
@EnableConfigurationProperties(EmbeddingSearchConfiguration.class)
public class RecommendationApplication {
diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java
new file mode 100644
index 0000000000..630b8711ec
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java
@@ -0,0 +1,66 @@
+package com.yas.recommendation.configuration;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
+import org.springframework.kafka.config.KafkaStreamsConfiguration;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.kafka.core.KafkaAdmin;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.streams.StreamsConfig.*;
+
+@Configuration
+@EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class})
+public class KafkaStreamAppConfig {
+
+ private final KafkaStreamConfig kafkaStreamConfig;
+ private final KafkaTopicConfig kafkaTopicConfig;
+
+ @Autowired
+ public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) {
+ this.kafkaStreamConfig = kafkaStreamConfig;
+ this.kafkaTopicConfig = kafkaTopicConfig;
+ }
+
+
+ @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
+ KafkaStreamsConfiguration kafkaStreamAppConfig() {
+ Map props = new HashMap<>();
+ props.put(APPLICATION_ID_CONFIG, kafkaStreamConfig.applicationId());
+ props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaStreamConfig.bootstrapServers());
+ props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultKeySerdeClass());
+ props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultValueSerdeClass());
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, kafkaStreamConfig.trustedPackages());
+ props.put(COMMIT_INTERVAL_MS_CONFIG, kafkaStreamConfig.commitIntervalMs());
+ return new KafkaStreamsConfiguration(props);
+ }
+
+ @Bean
+ public KafkaAdmin.NewTopics createTopics() {
+ return new KafkaAdmin.NewTopics(
+ createTopic(kafkaTopicConfig.product()),
+ createTopic(kafkaTopicConfig.brand()),
+ createTopic(kafkaTopicConfig.category()),
+ createTopic(kafkaTopicConfig.productCategory()),
+ createTopic(kafkaTopicConfig.productAttribute()),
+ createTopic(kafkaTopicConfig.productAttributeValue()),
+ createTopic(kafkaTopicConfig.productSink())
+ );
+ }
+
+ private NewTopic createTopic(String topicName) {
+ return TopicBuilder.name(topicName)
+ .partitions(kafkaTopicConfig.defaultPartitions())
+ .replicas(kafkaTopicConfig.defaultReplicas())
+ .build();
+ }
+
+
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java
new file mode 100644
index 0000000000..d48bc75180
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamConfig.java
@@ -0,0 +1,14 @@
+package com.yas.recommendation.configuration;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "yas.kafka.stream")
+public record KafkaStreamConfig(
+ String applicationId,
+ String bootstrapServers,
+ String defaultKeySerdeClass,
+ String defaultValueSerdeClass,
+ String trustedPackages,
+ int commitIntervalMs
+) {
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java
new file mode 100644
index 0000000000..acd1db186d
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaTopicConfig.java
@@ -0,0 +1,17 @@
+package com.yas.recommendation.configuration;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "yas.kafka.topic")
+public record KafkaTopicConfig(
+ String product,
+ String brand,
+ String category,
+ String productCategory,
+ String productAttribute,
+ String productAttributeValue,
+ String productSink,
+ int defaultPartitions,
+ int defaultReplicas
+) {
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java
new file mode 100644
index 0000000000..51a4b264fd
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java
@@ -0,0 +1,29 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@EqualsAndHashCode
+@ToString
+public class AggregationDTO {
+ private ID joinId;
+ private Set aggregationContents;
+
+ public AggregationDTO() {
+ aggregationContents = new HashSet<>();
+ }
+
+ public void add(T aggregationContent) {
+ aggregationContents.add(aggregationContent);
+ }
+
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java
new file mode 100644
index 0000000000..ae239d14ce
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMessageDTO.java
@@ -0,0 +1,15 @@
+package com.yas.recommendation.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public abstract class BaseMessageDTO {
+ protected String op;
+ protected Long comingTs = System.currentTimeMillis();
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java
new file mode 100644
index 0000000000..7ba5c5ecfd
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataEntity.java
@@ -0,0 +1,14 @@
+package com.yas.recommendation.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public abstract class BaseMetaDataEntity {
+ protected MetaData metaData;
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java
new file mode 100644
index 0000000000..cd29bb0c10
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java
@@ -0,0 +1,19 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class BrandDTO extends BaseMetaDataEntity {
+ private Long id;
+ private String name;
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java
new file mode 100644
index 0000000000..68aa020d4f
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java
@@ -0,0 +1,26 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.*;
+
+import java.util.Objects;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = false)
+@ToString
+public class CategoryDTO extends BaseMetaDataEntity {
+ private Long id;
+ private String name;
+
+ public CategoryDTO(MetaData metaData, Long id, String name) {
+ super(metaData);
+ this.id = id;
+ this.name = name;
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java
new file mode 100644
index 0000000000..9ad16edc0b
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java
@@ -0,0 +1,18 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class KeyDTO {
+ private Long id;
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java
new file mode 100644
index 0000000000..29788d740f
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java
@@ -0,0 +1,19 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class MessageDTO extends BaseMessageDTO {
+ private T before;
+ private T after;
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java b/recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java
new file mode 100644
index 0000000000..33fc0792ed
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/MetaData.java
@@ -0,0 +1,16 @@
+package com.yas.recommendation.dto;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class MetaData extends BaseMessageDTO {
+ public MetaData(String op, Long comingTs) {
+ super(op, comingTs);
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java
new file mode 100644
index 0000000000..9a605496f0
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java
@@ -0,0 +1,25 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.*;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = false)
+public class ProductAttributeDTO extends BaseMetaDataEntity {
+ private Long id;
+ private String name;
+ private String value;
+
+ public ProductAttributeDTO(MetaData metaData, Long id, String name, String value) {
+ super(metaData);
+ this.id = id;
+ this.name = name;
+ this.value = value;
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java
new file mode 100644
index 0000000000..d30f276ea5
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java
@@ -0,0 +1,28 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.*;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = false)
+public class ProductAttributeValueDTO extends BaseMetaDataEntity {
+ private Long id;
+ private String value;
+ @JsonProperty("product_id")
+ private Long productId;
+ @JsonProperty("product_attribute_id")
+ private Long productAttributeId;
+ private String productAttributeName;
+ private boolean isDeleted;
+
+ public ProductAttributeValueDTO(Long id) {
+ this.id = id;
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java
new file mode 100644
index 0000000000..a91fce7070
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java
@@ -0,0 +1,33 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class ProductCategoryDTO extends BaseMetaDataEntity {
+ private Long id;
+
+ @JsonProperty("category_id")
+ private Long categoryId;
+
+ @JsonProperty("product_id")
+ private Long productId;
+
+ private String categoryName;
+
+ private boolean isDeleted;
+
+ public ProductCategoryDTO(Long id) {
+ this.id = id;
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java
new file mode 100644
index 0000000000..4627e626a7
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java
@@ -0,0 +1,44 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.*;
+
+import java.util.List;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class ProductDTO extends BaseMetaDataEntity {
+
+ private Long id;
+
+ @JsonProperty("brand_id")
+ private Long brandId;
+
+ private String name;
+
+ @JsonProperty("short_description")
+ private String shortDescription;
+
+ private String specification;
+
+ @JsonProperty("is_published")
+ private Boolean published;
+
+ @JsonProperty("meta_description")
+ private String metaDescription;
+
+ @JsonProperty("meta_title")
+ private String metaTitle;
+
+ @JsonProperty("meta_keyword")
+ private String metaKeyword;
+
+ private Double price;
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java
new file mode 100644
index 0000000000..898bfcd2b2
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java
@@ -0,0 +1,22 @@
+package com.yas.recommendation.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class ProductResultDTO extends ProductDTO {
+ private BrandDTO brand;
+ private Set categories = new HashSet<>();
+ private Set productAttributes = new HashSet<>();
+
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java
new file mode 100644
index 0000000000..e39d49d21c
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java
@@ -0,0 +1,78 @@
+package com.yas.recommendation.topology;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.yas.recommendation.configuration.KafkaTopicConfig;
+import com.yas.recommendation.dto.AggregationDTO;
+import com.yas.recommendation.dto.BaseMetaDataEntity;
+import com.yas.recommendation.dto.MessageDTO;
+import com.yas.recommendation.dto.MetaData;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.lang.reflect.Type;
+
+public abstract class AbstractTopology {
+
+ protected final KafkaTopicConfig kafkaTopicConfig;
+
+ public AbstractTopology(KafkaTopicConfig kafkaTopicConfig) {
+ this.kafkaTopicConfig = kafkaTopicConfig;
+ }
+
+ protected abstract void process(StreamsBuilder streamsBuilder);
+
+ protected Serde getSerde(Class clazz) {
+ return new JsonSerde<>(clazz);
+ }
+
+ protected Serde getSerde(TypeReference typeReference) {
+ return new JsonSerde<>(typeReference);
+ }
+
+ protected Serde> getMessageDTOSerde(Class clazz) {
+ TypeReference> typeReference = new TypeReference<>() {
+ @Override
+ public Type getType() {
+ return TypeFactory.defaultInstance()
+ .constructParametricType(MessageDTO.class, clazz);
+ }
+ };
+ return getSerde(typeReference);
+ }
+
+ protected Serde> getAggregationDTOSerde(Class clazzID, Class clazzT) {
+ TypeReference> typeReference = new TypeReference<>() {
+ @Override
+ public Type getType() {
+ return TypeFactory
+ .defaultInstance()
+ .constructParametricType(AggregationDTO.class, clazzID, clazzT);
+ }
+ };
+ return getSerde(typeReference);
+ }
+
+
+ protected Materialized> createMaterializedStore(
+ String storeName, Serde keySerde, Serde valueSerde) {
+
+ return Materialized.>as(storeName)
+ .withKeySerde(keySerde)
+ .withValueSerde(valueSerde)
+ .withCachingDisabled();
+ }
+
+ protected T extractModelFromMessage(MessageDTO message) {
+ if (message == null || message.getAfter() == null) {
+ return null;
+ }
+ T model = message.getAfter();
+ model.setMetaData(new MetaData(message.getOp(), message.getComingTs()));
+ return model;
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java
new file mode 100644
index 0000000000..9dab6d6837
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java
@@ -0,0 +1,280 @@
+package com.yas.recommendation.topology;
+
+import com.yas.recommendation.configuration.KafkaTopicConfig;
+import com.yas.recommendation.dto.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class ProductTopology extends AbstractTopology {
+
+ private static final String PRODUCT_MATERIALIZED_VIEW = "product-mv";
+ private static final String BRAND_MATERIALIZED_VIEW = "brand-mv";
+ private static final String PRODUCT_BRAND_MATERIALIZED_VIEW = "product-brand-mv";
+ private static final String CATEGORY_MATERIALIZED_VIEW = "category-mv";
+ private static final String PRODUCT_CATEGORY_MATERIALIZED_VIEW = "product-category-mv";
+ private static final String PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW = "product-category-nonnull-mv";
+ private static final String PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW = "product-category-with-name-mv";
+ private static final String PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW = "product-category-agg-mv";
+ private static final String PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW = "product-brand-category-detail-mv";
+ private static final String PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW = "product-attribute-mv";
+ private static final String PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW = "product-attribute-value-nonnull-mv";
+ private static final String PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW = "product-attribute-value-mv";
+ private static final String PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW = "product-attribute-value-with-name-mv";
+ private static final String PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW = "product-attribute-agg-mv";
+ private static final String PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW = "product-brand-category-attribute-detail-mv";
+
+ @Autowired
+ public ProductTopology(KafkaTopicConfig kafkaTopicConfig) {
+ super(kafkaTopicConfig);
+ }
+
+ @Override
+ @Autowired
+ protected void process(StreamsBuilder streamsBuilder) {
+ KTable productTable = createProductTable(streamsBuilder);
+ KTable brandTable = createBrandTable(streamsBuilder);
+ KTable productBrandTable = joinProductAndBrand(productTable, brandTable);
+
+ KTable categoryTable = createCategoryTable(streamsBuilder);
+ KTable productCategoryTable = createProductCategoryTable(streamsBuilder);
+ KTable nonNullProductCategoryTable = filterNonNullProductCategory(productCategoryTable);
+ KTable productCategoryDetailTable = enrichProductCategoryWithDetails(productCategoryTable, categoryTable);
+
+ KTable> productCategoryAgg = aggregateProductCategoryDetails(productCategoryDetailTable, nonNullProductCategoryTable);
+
+ KTable addingCategoryDetailTable = joinProductWithCategoryDetails(productBrandTable, productCategoryAgg);
+
+ KTable productAttributeTable = createProductAttributeTable(streamsBuilder);
+ KTable productAttributeValueTable = createProductAttributeValueTable(streamsBuilder);
+ KTable nonNullProductAttributeValueTable = filterNonNullProductAttributeValue(productAttributeValueTable);
+ KTable productAttributeValueDetailTable = enrichProductAttributeValueDetails(productAttributeValueTable, productAttributeTable);
+
+ KTable> productAttributeAgg = aggregateProductAttributeDetails(productAttributeValueDetailTable, nonNullProductAttributeValueTable);
+
+ KTable resultTable = joinProductWithAttributeDetails(addingCategoryDetailTable, productAttributeAgg);
+
+ writeToSink(resultTable);
+ }
+
+ private KTable createProductTable(StreamsBuilder streamsBuilder) {
+ return streamsBuilder.stream(
+ kafkaTopicConfig.product(),
+ Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductDTO.class)))
+ .selectKey((key, value) -> key.getId())
+ .mapValues(this::extractModelFromMessage)
+ .toTable(createMaterializedStore(PRODUCT_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductDTO.class)));
+ }
+
+ private KTable createBrandTable(StreamsBuilder streamsBuilder) {
+ return streamsBuilder.stream(
+ kafkaTopicConfig.brand(),
+ Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(BrandDTO.class)))
+ .selectKey((key, value) -> key.getId())
+ .mapValues(this::extractModelFromMessage)
+ .toTable(createMaterializedStore(BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(BrandDTO.class)));
+ }
+
+ private KTable joinProductAndBrand(KTable productTable, KTable brandTable) {
+ return productTable.leftJoin(
+ brandTable,
+ ProductDTO::getBrandId,
+ this::enrichWithProductAndBrandData,
+ createMaterializedStore(PRODUCT_BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
+ );
+ }
+
+ private KTable createCategoryTable(StreamsBuilder streamsBuilder) {
+ return streamsBuilder.stream(
+ kafkaTopicConfig.category(),
+ Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(CategoryDTO.class)))
+ .selectKey((key, value) -> key.getId())
+ .mapValues(this::extractModelFromMessage)
+ .toTable(createMaterializedStore(CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(CategoryDTO.class)));
+ }
+
+ private KTable createProductCategoryTable(StreamsBuilder streamsBuilder) {
+ return streamsBuilder.stream(
+ kafkaTopicConfig.productCategory(),
+ Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductCategoryDTO.class)))
+ .selectKey((key, value) -> key.getId())
+ .mapValues(this::extractModelFromMessage)
+ .toTable(createMaterializedStore(PRODUCT_CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)));
+ }
+
+ private KTable filterNonNullProductCategory(KTable productCategoryTable) {
+ return productCategoryTable.toStream()
+ .filter((key, value) -> value != null)
+ .toTable(createMaterializedStore(PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)));
+ }
+
+ private KTable enrichProductCategoryWithDetails(KTable productCategoryTable, KTable categoryTable) {
+ return productCategoryTable.leftJoin(
+ categoryTable,
+ ProductCategoryDTO::getCategoryId,
+ (productCategory, category) -> {
+ productCategory.setCategoryName(category.getName());
+ return productCategory;
+ },
+ createMaterializedStore(PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))
+ );
+ }
+
+ private KTable> aggregateProductCategoryDetails(KTable productCategoryDetailTable, KTable nonNullProductCategoryTable) {
+ return productCategoryDetailTable.toStream()
+ .mapValues(v -> v != null ? v : new ProductCategoryDTO(-1L))
+ .join(
+ nonNullProductCategoryTable,
+ (productCategoryDetail, nonNullProductCategoryDetail) -> {
+ if (productCategoryDetail.getId().equals(-1L)) {
+ nonNullProductCategoryDetail.setDeleted(true);
+ return nonNullProductCategoryDetail;
+ }
+ return productCategoryDetail;
+ }
+ )
+ .selectKey((key, value) -> value.getProductId())
+ .groupByKey()
+ .aggregate(
+ AggregationDTO::new,
+ (productId, productCategoryDetail, agg) -> {
+ agg.setJoinId(productId);
+ agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));
+
+ if (!productCategoryDetail.isDeleted()) {
+ agg.add(new CategoryDTO(productCategoryDetail.getMetaData(), productCategoryDetail.getCategoryId(), productCategoryDetail.getCategoryName()));
+ }
+ return agg;
+ },
+ createMaterializedStore(PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, CategoryDTO.class))
+ );
+ }
+
+ private KTable joinProductWithCategoryDetails(KTable productBrandTable, KTable> productCategoryAgg) {
+ return productBrandTable.leftJoin(
+ productCategoryAgg,
+ (productResult, agg) -> {
+ if (agg != null) {
+ productResult.setCategories(agg.getAggregationContents());
+ }
+ return productResult;
+ },
+ createMaterializedStore(PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
+ );
+ }
+
+ private KTable createProductAttributeTable(StreamsBuilder streamsBuilder) {
+ return streamsBuilder.stream(
+ kafkaTopicConfig.productAttribute(),
+ Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeDTO.class)))
+ .selectKey((key, value) -> key.getId())
+ .mapValues(this::extractModelFromMessage)
+ .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeDTO.class)));
+ }
+
+ private KTable createProductAttributeValueTable(StreamsBuilder streamsBuilder) {
+ return streamsBuilder.stream(
+ kafkaTopicConfig.productAttributeValue(),
+ Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeValueDTO.class)))
+ .selectKey((key, value) -> key.getId())
+ .mapValues(this::extractModelFromMessage)
+ .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class)));
+ }
+
+ private KTable filterNonNullProductAttributeValue(KTable productAttributeValueTable) {
+ return productAttributeValueTable.toStream()
+ .filter((key, value) -> value != null)
+ .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class)));
+ }
+
+ private KTable enrichProductAttributeValueDetails(KTable productAttributeValueTable, KTable productAttributeTable) {
+ return productAttributeValueTable.leftJoin(
+ productAttributeTable,
+ ProductAttributeValueDTO::getProductAttributeId,
+ (productAttributeValue, productAttribute) -> {
+ productAttributeValue.setProductAttributeName(productAttribute.getName());
+ return productAttributeValue;
+ },
+ createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))
+ );
+ }
+
+ private KTable> aggregateProductAttributeDetails(KTable productAttributeValueDetailTable, KTable nonNullProductAttributeValueTable) {
+ return productAttributeValueDetailTable.toStream()
+ .mapValues(v -> v != null ? v : new ProductAttributeValueDTO(-1L))
+ .join(
+ nonNullProductAttributeValueTable,
+ (productAttributeValueDetail, nonNullProductAttributeValueDetail) -> {
+ if (productAttributeValueDetail.getId().equals(-1L)) {
+ nonNullProductAttributeValueDetail.setDeleted(true);
+ return nonNullProductAttributeValueDetail;
+ }
+ return productAttributeValueDetail;
+ }
+ )
+ .selectKey((key, value) -> value.getProductId())
+ .groupByKey()
+ .aggregate(
+ AggregationDTO::new,
+ (productId, productAttributeValueDetail, agg) -> {
+ agg.setJoinId(productId);
+ agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId()));
+
+ if (!productAttributeValueDetail.isDeleted()) {
+ agg.add(new ProductAttributeDTO(
+ productAttributeValueDetail.getMetaData(),
+ productAttributeValueDetail.getProductAttributeId(),
+ productAttributeValueDetail.getProductAttributeName(),
+ productAttributeValueDetail.getValue()
+ ));
+ }
+ return agg;
+ },
+ createMaterializedStore(PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, ProductAttributeDTO.class))
+ );
+ }
+
+ private KTable joinProductWithAttributeDetails(KTable addingCategoryDetailTable, KTable> productAttributeAgg) {
+ return addingCategoryDetailTable.leftJoin(
+ productAttributeAgg,
+ ProductResultDTO::getId,
+ (productResult, agg) -> {
+ if (agg != null) {
+ productResult.setProductAttributes(agg.getAggregationContents());
+ }
+ return productResult;
+ },
+ createMaterializedStore(PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
+ );
+ }
+
+ private void writeToSink(KTable resultTable) {
+ resultTable
+ .toStream()
+ .to(kafkaTopicConfig.productSink(), Produced.with(Serdes.Long(), getSerde(ProductResultDTO.class)));
+ }
+
+ private ProductResultDTO enrichWithProductAndBrandData(ProductDTO productRecord, BrandDTO brandRecord) {
+ ProductResultDTO productResultDTO = new ProductResultDTO();
+ productResultDTO.setBrand(brandRecord);
+ productResultDTO.setId(productRecord.getId());
+ productResultDTO.setName(productRecord.getName());
+ productResultDTO.setMetaDescription(productRecord.getMetaDescription());
+ productResultDTO.setShortDescription(productRecord.getShortDescription());
+ productResultDTO.setPrice(productRecord.getPrice());
+ productResultDTO.setMetaTitle(productRecord.getMetaTitle());
+ productResultDTO.setSpecification(productRecord.getSpecification());
+ productResultDTO.setBrandId(productRecord.getBrandId());
+ productResultDTO.setMetaData(productRecord.getMetaData());
+ productResultDTO.setPublished(productRecord.getPublished());
+ return productResultDTO;
+ }
+
+}
diff --git a/recommendation/src/main/resources/application.properties b/recommendation/src/main/resources/application.properties
index 9ed2599991..a2716093ca 100644
--- a/recommendation/src/main/resources/application.properties
+++ b/recommendation/src/main/resources/application.properties
@@ -52,3 +52,25 @@ yas.recommendation.embedding-based.search.topK=10
yas.recommendation.embedding-based.search.initDefaultData=false
yas.recommendation.embedding-based.search.similarityThreshold=0
+# Kafka Topic Config
+yas.kafka.topic.product=dbproduct.public.product
+yas.kafka.topic.brand=dbproduct.public.brand
+yas.kafka.topic.category=dbproduct.public.category
+yas.kafka.topic.productCategory=dbproduct.public.product_category
+yas.kafka.topic.productAttribute=dbproduct.public.product_attribute
+yas.kafka.topic.productAttributeValue=dbproduct.public.product_attribute_value
+yas.kafka.topic.productSink=product_sink
+yas.kafka.topic.default-partitions=1
+yas.kafka.topic.default-replicas=1
+
+#Kafka Stream Config
+yas.kafka.stream.application-id=yas-streams-application
+yas.kafka.stream.bootstrap-servers=kafka:9092
+yas.kafka.stream.default-key-serde-class=org.springframework.kafka.support.serializer.JsonSerde
+yas.kafka.stream.default-value-serde-class=org.springframework.kafka.support.serializer.JsonSerde
+yas.kafka.stream.trusted-packages=*
+yas.kafka.stream.commit-interval-ms=0
+
+
+
+
diff --git a/start-source-connectors.sh b/start-source-connectors.sh
index 8cc7fcafde..c8058cb10d 100644
--- a/start-source-connectors.sh
+++ b/start-source-connectors.sh
@@ -1,8 +1,7 @@
-
curl -i -X PUT -H "Content-Type:application/json" \
- http://localhost:8083/connectors/product-connector/config \
- -d @kafka/connects/debezium-product.json
+ http://localhost:8083/connectors/order-connector/config \
+ -d @kafka/connects/debezium-order.json
curl -i -X PUT -H "Content-Type:application/json" \
- http://localhost:8083/connectors/order-connector/config \
- -d @kafka/connects/debezium-order.json
\ No newline at end of file
+ http://localhost:8083/connectors/product-group-connector/config \
+ -d @kafka/connects/debezium-product-group.json
\ No newline at end of file