diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java index 630b8711ec..6984226352 100644 --- a/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java +++ b/recommendation/src/main/java/com/yas/recommendation/configuration/KafkaStreamAppConfig.java @@ -1,5 +1,13 @@ package com.yas.recommendation.configuration; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; + +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -11,11 +19,11 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.support.serializer.JsonDeserializer; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.kafka.streams.StreamsConfig.*; - +/** + * KafkaStreamAppConfig is a configuration class that sets up Kafka Streams properties and topics + * for the application. It enables configuration properties for {@link KafkaStreamConfig} and + * {@link KafkaTopicConfig} and defines beans for Kafka Streams configuration and topic creation. + */ @Configuration @EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class}) public class KafkaStreamAppConfig { @@ -23,13 +31,23 @@ public class KafkaStreamAppConfig { private final KafkaStreamConfig kafkaStreamConfig; private final KafkaTopicConfig kafkaTopicConfig; + /** + * Constructs a KafkaStreamAppConfig instance with the specified Kafka Stream and Topic configurations. + * + * @param kafkaStreamConfig the configuration properties for Kafka Streams + * @param kafkaTopicConfig the configuration properties for Kafka topics + */ @Autowired public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) { this.kafkaStreamConfig = kafkaStreamConfig; this.kafkaTopicConfig = kafkaTopicConfig; } - + /** + * Configures Kafka Streams properties for the application and registers them as a bean. + * + * @return a KafkaStreamsConfiguration object containing the Kafka Streams properties + */ @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration kafkaStreamAppConfig() { Map props = new HashMap<>(); @@ -42,6 +60,12 @@ KafkaStreamsConfiguration kafkaStreamAppConfig() { return new KafkaStreamsConfiguration(props); } + /** + * Creates Kafka topics based on the configuration in {@link KafkaTopicConfig} and registers them + * as a bean to initialize topics on application startup. + * + * @return a KafkaAdmin.NewTopics object containing all the configured Kafka topics + */ @Bean public KafkaAdmin.NewTopics createTopics() { return new KafkaAdmin.NewTopics( @@ -61,6 +85,4 @@ private NewTopic createTopic(String topicName) { .replicas(kafkaTopicConfig.defaultReplicas()) .build(); } - - } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDto.java similarity index 64% rename from recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/AggregationDto.java index b2b287e1cb..99e346937d 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/AggregationDto.java @@ -9,17 +9,24 @@ import lombok.Getter; import lombok.Setter; +/** + * AggregationDto is a generic container class for aggregating a set of items of type T. + * It includes a join ID for association with other data entities and supports adding + * individual items to the aggregation set. + * + * @param the type of elements to aggregate + */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @EqualsAndHashCode -public class AggregationDTO { - private I joinId; +public class AggregationDto { + private Long joinId; private Set aggregationContents; - public AggregationDTO() { + public AggregationDto() { aggregationContents = new HashSet<>(); } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BaseDto.java b/recommendation/src/main/java/com/yas/recommendation/dto/BaseDto.java index a70a9f1020..d6a39382c9 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/BaseDto.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/BaseDto.java @@ -2,8 +2,15 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +/** + * BaseDto is an abstract base class extending {@link BaseMetaDataDto} that represents + * entities with a unique identifier and name. It provides constructors for initializing + * metadata and basic entity properties. + */ +@EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor @NoArgsConstructor @@ -11,6 +18,14 @@ public abstract class BaseDto extends BaseMetaDataDto { protected Long id; protected String name; + /** + * Constructs a BaseDto instance with operation type, timestamp, ID, and name. + * + * @param op the operation type (e.g., CREATE, UPDATE, DELETE) + * @param ts the timestamp of the operation + * @param id the unique identifier for the entity + * @param name the name associated with the entity + */ public BaseDto(String op, Long ts, Long id, String name) { super(op, ts); this.id = id; diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataDto.java b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataDto.java index d8980ae94c..92e53bf15d 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataDto.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/BaseMetaDataDto.java @@ -4,6 +4,11 @@ import lombok.Data; import lombok.NoArgsConstructor; +/** + * BaseMetaDataDto serves as an abstract base class for Data Transfer Objects (DTOs) + * that include metadata fields for operation type and timestamp. This metadata helps + * track changes or events associated with the entity. + */ @Data @AllArgsConstructor @NoArgsConstructor diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java deleted file mode 100644 index 436af135d5..0000000000 --- a/recommendation/src/main/java/com/yas/recommendation/dto/BrandDTO.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.yas.recommendation.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; - -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -@Getter -@Setter -@AllArgsConstructor -public class BrandDTO extends BaseDto { -} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/BrandDto.java b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDto.java new file mode 100644 index 0000000000..7213157b3a --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/BrandDto.java @@ -0,0 +1,21 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +/** + * BrandDto is a Data Transfer Object (DTO) that represents brand information. + * It extends {@link BaseDto} to inherit basic entity properties like ID and name, + * along with metadata fields. This class is annotated for JSON serialization, + * ensuring only non-null properties are included and unknown properties are ignored. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +public class BrandDto extends BaseDto { +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java deleted file mode 100644 index eda7547abf..0000000000 --- a/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDTO.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.yas.recommendation.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; - -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -@Getter -@Setter -@AllArgsConstructor -public class CategoryDTO extends BaseDto { - public CategoryDTO(String op, Long ts, Long id, String name) { - super(op, ts, id, name); - } -} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDto.java b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDto.java new file mode 100644 index 0000000000..d432c84448 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/CategoryDto.java @@ -0,0 +1,25 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +/** + * CategoryDto is a Data Transfer Object (DTO) that represents category information. + * It extends {@link BaseDto} to inherit basic entity properties like ID and name, + * along with metadata fields for tracking changes. This class is annotated for JSON + * serialization, ensuring only non-null properties are included and unknown properties + * are ignored during deserialization. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +public class CategoryDto extends BaseDto { + public CategoryDto(String op, Long ts, Long id, String name) { + super(op, ts, id, name); + } +} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/KeyDto.java similarity index 64% rename from recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/KeyDto.java index 9ad16edc0b..c9551a72ff 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/KeyDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/KeyDto.java @@ -7,12 +7,14 @@ import lombok.NoArgsConstructor; import lombok.Setter; -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) +/** + * KeyDto is a Data Transfer Object (DTO) representing the unique identifier (ID) of an entity. + * It is commonly used as the key in message processing contexts. + */ @Getter @Setter @AllArgsConstructor @NoArgsConstructor -public class KeyDTO { +public class KeyDto { private Long id; } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/MessageDto.java similarity index 51% rename from recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/MessageDto.java index 4c5d5d06d2..a334594b1d 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/MessageDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/MessageDto.java @@ -7,13 +7,20 @@ import lombok.NoArgsConstructor; import lombok.Setter; +/** + * MessageDto is a generic Data Transfer Object (DTO) used to represent a change data capture (CDC) message. + * It extends {@link BaseDto} to include metadata and entity identification, and includes fields for + * both the "before" and "after" states of an entity, facilitating change tracking. + * + * @param the type of the entity being captured in the message + */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @NoArgsConstructor -public class MessageDTO extends BaseDto { +public class MessageDto extends BaseDto { private T before; private T after; } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDto.java similarity index 50% rename from recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDto.java index e9c1e4fdc1..ccecd1c5cd 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeDto.java @@ -7,16 +7,22 @@ import lombok.NoArgsConstructor; import lombok.Setter; +/** + * ProductAttributeDto is a Data Transfer Object (DTO) representing an attribute associated with a product. + * It extends {@link BaseDto} to inherit entity metadata and identification, and includes an additional + * field for the attribute value. This class is annotated for JSON serialization, ensuring only non-null + * properties are included and unknown properties are ignored during deserialization. + */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @NoArgsConstructor -public class ProductAttributeDTO extends BaseDto { +public class ProductAttributeDto extends BaseDto { private String value; - public ProductAttributeDTO(String op, Long ts, Long id, String name, String value) { + public ProductAttributeDto(String op, Long ts, Long id, String name, String value) { super(op, ts, id, name); this.value = value; } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDto.java similarity index 55% rename from recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDto.java index 6ebd371a07..eed9ba39c7 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductAttributeValueDto.java @@ -8,13 +8,20 @@ import lombok.NoArgsConstructor; import lombok.Setter; +/** + * ProductAttributeValueDto is a Data Transfer Object (DTO) representing the value of a specific attribute + * associated with a product. It extends {@link BaseMetaDataDto} to include metadata fields such as + * operation type and timestamp, and it provides additional fields to store attribute-specific information. + * This class is annotated for JSON serialization, ensuring only non-null properties are included and + * unknown properties are ignored during deserialization. + */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @NoArgsConstructor -public class ProductAttributeValueDTO extends BaseMetaDataDto { +public class ProductAttributeValueDto extends BaseMetaDataDto { private Long id; private String value; @JsonProperty("product_id") @@ -24,7 +31,7 @@ public class ProductAttributeValueDTO extends BaseMetaDataDto { private String productAttributeName; private boolean isDeleted; - public ProductAttributeValueDTO(Long id) { + public ProductAttributeValueDto(Long id) { this.id = id; } } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDto.java similarity index 54% rename from recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDto.java index b41c6a0b42..ae98f54154 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductCategoryDto.java @@ -8,13 +8,21 @@ import lombok.NoArgsConstructor; import lombok.Setter; +/** + * ProductCategoryDto is a Data Transfer Object (DTO) representing the association between + * a product and a category. It extends {@link BaseMetaDataDto} to include metadata fields + * such as operation type and timestamp, and it provides fields for category and product + * association details, including deletion status. + * This class is annotated for JSON serialization, ensuring only non-null properties are + * included and unknown properties are ignored during deserialization. + */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @NoArgsConstructor -public class ProductCategoryDTO extends BaseMetaDataDto { +public class ProductCategoryDto extends BaseMetaDataDto { private Long id; @JsonProperty("category_id") @@ -27,7 +35,7 @@ public class ProductCategoryDTO extends BaseMetaDataDto { private boolean isDeleted; - public ProductCategoryDTO(Long id) { + public ProductCategoryDto(Long id) { this.id = id; } } diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductDto.java similarity index 63% rename from recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java rename to recommendation/src/main/java/com/yas/recommendation/dto/ProductDto.java index 35429f665a..3fe5562c12 100644 --- a/recommendation/src/main/java/com/yas/recommendation/dto/ProductDTO.java +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductDto.java @@ -8,13 +8,21 @@ import lombok.NoArgsConstructor; import lombok.Setter; +/** + * ProductDto is a Data Transfer Object (DTO) representing detailed information about a product. + * It extends {@link BaseDto} to inherit entity properties like ID and name, along with metadata + * fields. This class includes additional product-specific fields such as brand, price, and + * publication status, making it useful for various product-related operations. + * The class is annotated for JSON serialization, ensuring only non-null properties are included + * and unknown properties are ignored during deserialization. + */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @NoArgsConstructor -public class ProductDTO extends BaseDto { +public class ProductDto extends BaseDto { @JsonProperty("brand_id") private Long brandId; diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java deleted file mode 100644 index 279c528970..0000000000 --- a/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDTO.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.yas.recommendation.dto; - -import java.util.HashSet; -import java.util.Set; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -@Getter -@Setter -@AllArgsConstructor -@NoArgsConstructor -public class ProductResultDTO extends ProductDTO { - private BrandDTO brand; - private Set categories = new HashSet<>(); - private Set productAttributes = new HashSet<>(); - -} diff --git a/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDto.java b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDto.java new file mode 100644 index 0000000000..355b303468 --- /dev/null +++ b/recommendation/src/main/java/com/yas/recommendation/dto/ProductResultDto.java @@ -0,0 +1,31 @@ +package com.yas.recommendation.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.HashSet; +import java.util.Set; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * ProductResultDto is a Data Transfer Object (DTO) representing an enriched product result. + * It extends {@link ProductDto} to include core product information, and adds fields for + * associated brand, categories, and product attributes, providing a comprehensive view + * of product details with related entities. + * This class is annotated for JSON serialization, ensuring only non-null properties are + * included and unknown properties are ignored during deserialization. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ProductResultDto extends ProductDto { + private BrandDto brand; + private Set categories = new HashSet<>(); + private Set productAttributes = new HashSet<>(); + +} diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java index cb79ce5819..27696dce8d 100644 --- a/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java +++ b/recommendation/src/main/java/com/yas/recommendation/topology/AbstractTopology.java @@ -1,13 +1,12 @@ package com.yas.recommendation.topology; -import java.lang.reflect.Type; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.type.TypeFactory; import com.yas.recommendation.configuration.KafkaTopicConfig; -import com.yas.recommendation.dto.AggregationDTO; +import com.yas.recommendation.dto.AggregationDto; import com.yas.recommendation.dto.BaseMetaDataDto; -import com.yas.recommendation.dto.MessageDTO; +import com.yas.recommendation.dto.MessageDto; +import java.lang.reflect.Type; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; @@ -15,48 +14,104 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.springframework.kafka.support.serializer.JsonSerde; +/** + * AbstractTopology provides a base class for creating Kafka Streams topologies. + * It contains common configurations and utility methods for creating and processing + * Kafka Streams materialized views, serde configurations, and message extraction. + */ public abstract class AbstractTopology { protected final KafkaTopicConfig kafkaTopicConfig; - public AbstractTopology(KafkaTopicConfig kafkaTopicConfig) { + /** + * Constructs an AbstractTopology instance with the specified Kafka topic configuration. + * + * @param kafkaTopicConfig Configuration for Kafka topics used in this topology. + */ + protected AbstractTopology(KafkaTopicConfig kafkaTopicConfig) { this.kafkaTopicConfig = kafkaTopicConfig; } + /** + * Abstract method to be implemented by subclasses, defining the processing logic + * of the Kafka Streams topology. + * + * @param streamsBuilder the builder used to define the Kafka Streams topology + */ protected abstract void process(StreamsBuilder streamsBuilder); + /** + * Creates a JSON serde for a specified class type. + * + * @param clazz the class type for which to create the serde + * @param the type of the class + * @return a serde for the specified class + */ protected Serde getSerde(Class clazz) { return new JsonSerde<>(clazz); } + /** + * Creates a JSON serde for a specified type reference. + * + * @param typeReference the TypeReference for which to create the serde + * @param the type of the reference + * @return a serde for the specified type reference + */ protected Serde getSerde(TypeReference typeReference) { return new JsonSerde<>(typeReference); } - protected Serde> getMessageDTOSerde(Class clazz) { - TypeReference> typeReference = new TypeReference<>() { + /** + * Creates a serde for messages wrapped in MessageDto, allowing for deserialization + * of messages with metadata. + * + * @param clazz the class type for the MessageDto content + * @param the type of the content within MessageDto + * @return a serde for MessageDto containing the specified content type + */ + protected Serde> getMessageDtoSerde(Class clazz) { + TypeReference> typeReference = new TypeReference<>() { @Override public Type getType() { return TypeFactory.defaultInstance() - .constructParametricType(MessageDTO.class, clazz); + .constructParametricType(MessageDto.class, clazz); } }; return getSerde(typeReference); } - protected Serde> getAggregationDTOSerde(Class clazzID, Class clazzT) { - TypeReference> typeReference = new TypeReference<>() { + /** + * Creates a serde for aggregation messages wrapped in AggregationDto, allowing + * for deserialization of aggregated messages with metadata. + * + * @param clazzT the class type for the AggregationDto content + * @param the type of the content within AggregationDto + * @return a serde for AggregationDto containing the specified content type + */ + protected Serde> getAggregationDtoSerde(Class clazzT) { + TypeReference> typeReference = new TypeReference<>() { @Override public Type getType() { return TypeFactory .defaultInstance() - .constructParametricType(AggregationDTO.class, clazzID, clazzT); + .constructParametricType(AggregationDto.class, clazzT); } }; return getSerde(typeReference); } - + /** + * Creates a materialized store configuration with specified key and value serdes + * and caching disabled. + * + * @param storeName the name of the store + * @param keySerde the serde for the key type + * @param valueSerde the serde for the value type + * @param the type of keys in the store + * @param the type of values in the store + * @return a Materialized configuration for the store + */ protected Materialized> createMaterializedStore( String storeName, Serde keySerde, Serde valueSerde) { @@ -66,7 +121,15 @@ protected Materialized> createMaterial .withCachingDisabled(); } - protected T extractModelFromMessage(MessageDTO message) { + /** + * Extracts the model from a message, preserving metadata such as operation type + * and timestamp. Returns null if the message or its content is null. + * + * @param message the message containing the model + * @param the type of the model within the message + * @return the extracted model with metadata, or null if the message is empty + */ + protected T extractModelFromMessage(MessageDto message) { if (message == null || message.getAfter() == null) { return null; } diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java index 271282873c..1e536dfa80 100644 --- a/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java +++ b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java @@ -1,15 +1,15 @@ package com.yas.recommendation.topology; import com.yas.recommendation.configuration.KafkaTopicConfig; -import com.yas.recommendation.dto.AggregationDTO; -import com.yas.recommendation.dto.BrandDTO; -import com.yas.recommendation.dto.CategoryDTO; -import com.yas.recommendation.dto.KeyDTO; -import com.yas.recommendation.dto.ProductAttributeDTO; -import com.yas.recommendation.dto.ProductAttributeValueDTO; -import com.yas.recommendation.dto.ProductCategoryDTO; -import com.yas.recommendation.dto.ProductDTO; -import com.yas.recommendation.dto.ProductResultDTO; +import com.yas.recommendation.dto.AggregationDto; +import com.yas.recommendation.dto.BrandDto; +import com.yas.recommendation.dto.CategoryDto; +import com.yas.recommendation.dto.KeyDto; +import com.yas.recommendation.dto.ProductAttributeDto; +import com.yas.recommendation.dto.ProductAttributeValueDto; +import com.yas.recommendation.dto.ProductCategoryDto; +import com.yas.recommendation.dto.ProductDto; +import com.yas.recommendation.dto.ProductResultDto; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -19,6 +19,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +/** + * ProductTopology defines a Kafka Streams topology that processes product-related data, + * joins it with brand, category, and attribute information, and aggregates details + * for enriched product output. This topology generates various materialized views for + * different stages of data enrichment and outputs the final processed product details + * to a Kafka sink topic. + */ @Component @Slf4j public class ProductTopology extends AbstractTopology { @@ -28,16 +35,16 @@ public class ProductTopology extends AbstractTopology { private static final String PRODUCT_BRAND_MATERIALIZED_VIEW = "product-brand-mv"; private static final String CATEGORY_MATERIALIZED_VIEW = "category-mv"; private static final String PRODUCT_CATEGORY_MATERIALIZED_VIEW = "product-category-mv"; - private static final String PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW = "product-category-nonnull-mv"; - private static final String PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW = "product-category-with-name-mv"; - private static final String PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW = "product-category-agg-mv"; - private static final String PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW = "product-brand-category-detail-mv"; - private static final String PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW = "product-attribute-mv"; - private static final String PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW = "product-attribute-value-nonnull-mv"; - private static final String PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW = "product-attribute-value-mv"; - private static final String PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW = "product-attribute-value-with-name-mv"; - private static final String PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW = "product-attribute-agg-mv"; - private static final String PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW = "product-brand-category-attribute-detail-mv"; + private static final String PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW = "prod-category-nn-mv"; + private static final String PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW = "prod-category-with-name-mv"; + private static final String PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW = "prod-category-agg-mv"; + private static final String PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW = "prod-brand-category-detail-mv"; + private static final String PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW = "prod-attr-mv"; + private static final String PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW = "prod-attr-value-nn-mv"; + private static final String PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW = "prod-attr-value-mv"; + private static final String PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW = "prod-attr-value-with-name-mv"; + private static final String PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW = "prod-attr-agg-mv"; + private static final String PRODUCT_WITH_ALL_DETAIL_MATERIALIZED_VIEW = "product-with-all-detail-mv"; @Autowired public ProductTopology(KafkaTopicConfig kafkaTopicConfig) { @@ -47,97 +54,128 @@ public ProductTopology(KafkaTopicConfig kafkaTopicConfig) { @Override @Autowired protected void process(StreamsBuilder streamsBuilder) { - KTable productTable = createProductTable(streamsBuilder); - KTable brandTable = createBrandTable(streamsBuilder); - KTable productBrandTable = joinProductAndBrand(productTable, brandTable); + KTable productTable = createProductTable(streamsBuilder); + KTable brandTable = createBrandTable(streamsBuilder); + KTable productBrandTable = joinProductAndBrand(productTable, brandTable); - KTable categoryTable = createCategoryTable(streamsBuilder); - KTable productCategoryTable = createProductCategoryTable(streamsBuilder); - KTable nonNullProductCategoryTable = filterNonNullProductCategory(productCategoryTable); - KTable productCategoryDetailTable = enrichProductCategoryWithDetails(productCategoryTable, categoryTable); + KTable categoryTable = createCategoryTable(streamsBuilder); - KTable> productCategoryAgg = aggregateProductCategoryDetails(productCategoryDetailTable, nonNullProductCategoryTable); + KTable productCategoryTable = + createProductCategoryTable(streamsBuilder); - KTable addingCategoryDetailTable = joinProductWithCategoryDetails(productBrandTable, productCategoryAgg); + KTable nonNullProductCategoryTable = + filterNonNullProductCategory(productCategoryTable); - KTable productAttributeTable = createProductAttributeTable(streamsBuilder); - KTable productAttributeValueTable = createProductAttributeValueTable(streamsBuilder); - KTable nonNullProductAttributeValueTable = filterNonNullProductAttributeValue(productAttributeValueTable); - KTable productAttributeValueDetailTable = enrichProductAttributeValueDetails(productAttributeValueTable, productAttributeTable); + KTable productCategoryDetailTable = + enrichProductCategoryWithDetails(productCategoryTable, categoryTable); - KTable> productAttributeAgg = aggregateProductAttributeDetails(productAttributeValueDetailTable, nonNullProductAttributeValueTable); + KTable> productCategoryAgg = + aggregateProductCategoryDetails(productCategoryDetailTable, nonNullProductCategoryTable); - KTable resultTable = joinProductWithAttributeDetails(addingCategoryDetailTable, productAttributeAgg); + KTable addingCategoryDetailTable = + joinProductWithCategoryDetails(productBrandTable, productCategoryAgg); + + KTable productAttributeTable = createProductAttributeTable(streamsBuilder); + + KTable productAttributeValueTable = + createProductAttributeValueTable(streamsBuilder); + + KTable nonNullProductAttributeValueTable = + filterNonNullProductAttributeValue(productAttributeValueTable); + + KTable productAttributeValueDetailTable = + enrichProductAttributeValueDetails(productAttributeValueTable, productAttributeTable); + + KTable> productAttributeAgg = + aggregateProductAttributeDetails(productAttributeValueDetailTable, nonNullProductAttributeValueTable); + + KTable resultTable = + joinProductWithAttributeDetails(addingCategoryDetailTable, productAttributeAgg); writeToSink(resultTable); } - private KTable createProductTable(StreamsBuilder streamsBuilder) { + private KTable createProductTable(StreamsBuilder streamsBuilder) { return streamsBuilder.stream( kafkaTopicConfig.product(), - Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductDTO.class))) + Consumed.with(getSerde(KeyDto.class), getMessageDtoSerde(ProductDto.class))) .selectKey((key, value) -> key.getId()) .mapValues(this::extractModelFromMessage) - .toTable(createMaterializedStore(PRODUCT_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductDTO.class))); + .toTable(createMaterializedStore(PRODUCT_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductDto.class))); } - private KTable createBrandTable(StreamsBuilder streamsBuilder) { + private KTable createBrandTable(StreamsBuilder streamsBuilder) { return streamsBuilder.stream( kafkaTopicConfig.brand(), - Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(BrandDTO.class))) + Consumed.with(getSerde(KeyDto.class), getMessageDtoSerde(BrandDto.class))) .selectKey((key, value) -> key.getId()) .mapValues(this::extractModelFromMessage) - .toTable(createMaterializedStore(BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(BrandDTO.class))); + .toTable(createMaterializedStore(BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(BrandDto.class))); } - private KTable joinProductAndBrand(KTable productTable, KTable brandTable) { + private KTable joinProductAndBrand( + KTable productTable, KTable brandTable) { return productTable.leftJoin( brandTable, - ProductDTO::getBrandId, + ProductDto::getBrandId, this::enrichWithProductAndBrandData, - createMaterializedStore(PRODUCT_BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class)) - ); + createMaterializedStore( + PRODUCT_BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDto.class))); } - private KTable createCategoryTable(StreamsBuilder streamsBuilder) { + private KTable createCategoryTable(StreamsBuilder streamsBuilder) { return streamsBuilder.stream( kafkaTopicConfig.category(), - Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(CategoryDTO.class))) + Consumed.with(getSerde(KeyDto.class), getMessageDtoSerde(CategoryDto.class))) .selectKey((key, value) -> key.getId()) .mapValues(this::extractModelFromMessage) - .toTable(createMaterializedStore(CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(CategoryDTO.class))); + .toTable( + createMaterializedStore( + CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(CategoryDto.class))); } - private KTable createProductCategoryTable(StreamsBuilder streamsBuilder) { + private KTable createProductCategoryTable(StreamsBuilder streamsBuilder) { return streamsBuilder.stream( kafkaTopicConfig.productCategory(), - Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductCategoryDTO.class))) + Consumed.with(getSerde(KeyDto.class), getMessageDtoSerde(ProductCategoryDto.class))) .selectKey((key, value) -> key.getId()) .mapValues(this::extractModelFromMessage) - .toTable(createMaterializedStore(PRODUCT_CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))); + .toTable( + createMaterializedStore( + PRODUCT_CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDto.class))); } - private KTable filterNonNullProductCategory(KTable productCategoryTable) { + private KTable filterNonNullProductCategory( + KTable productCategoryTable) { return productCategoryTable.toStream() .filter((key, value) -> value != null) - .toTable(createMaterializedStore(PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))); + .toTable( + createMaterializedStore( + PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW, + Serdes.Long(), + getSerde(ProductCategoryDto.class))); } - private KTable enrichProductCategoryWithDetails(KTable productCategoryTable, KTable categoryTable) { + private KTable enrichProductCategoryWithDetails( + KTable productCategoryTable, KTable categoryTable) { return productCategoryTable.leftJoin( categoryTable, - ProductCategoryDTO::getCategoryId, + ProductCategoryDto::getCategoryId, (productCategory, category) -> { productCategory.setCategoryName(category.getName()); return productCategory; }, - createMaterializedStore(PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)) - ); + createMaterializedStore( + PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW, + Serdes.Long(), + getSerde(ProductCategoryDto.class))); } - private KTable> aggregateProductCategoryDetails(KTable productCategoryDetailTable, KTable nonNullProductCategoryTable) { + private KTable> aggregateProductCategoryDetails( + KTable productCategoryDetailTable, + KTable nonNullProductCategoryTable) { return productCategoryDetailTable.toStream() - .mapValues(v -> v != null ? v : new ProductCategoryDTO(-1L)) + .mapValues(v -> v != null ? v : new ProductCategoryDto(-1L)) .join( nonNullProductCategoryTable, (productCategoryDetail, nonNullProductCategoryDetail) -> { @@ -146,18 +184,19 @@ private KTable> aggregateProductCategory return nonNullProductCategoryDetail; } return productCategoryDetail; - } - ) + }) .selectKey((key, value) -> value.getProductId()) .groupByKey() .aggregate( - AggregationDTO::new, + AggregationDto::new, (productId, productCategoryDetail, agg) -> { agg.setJoinId(productId); - agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId())); + agg.getAggregationContents() + .removeIf(category -> + productCategoryDetail.getCategoryId().equals(category.getId())); if (!productCategoryDetail.isDeleted()) { - agg.add(new CategoryDTO( + agg.add(new CategoryDto( productCategoryDetail.getOp(), productCategoryDetail.getTs(), productCategoryDetail.getCategoryId(), @@ -165,11 +204,15 @@ private KTable> aggregateProductCategory } return agg; }, - createMaterializedStore(PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, CategoryDTO.class)) - ); + createMaterializedStore( + PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW, + Serdes.Long(), + getAggregationDtoSerde(CategoryDto.class))); } - private KTable joinProductWithCategoryDetails(KTable productBrandTable, KTable> productCategoryAgg) { + private KTable joinProductWithCategoryDetails( + KTable productBrandTable, + KTable> productCategoryAgg) { return productBrandTable.leftJoin( productCategoryAgg, (productResult, agg) -> { @@ -178,49 +221,65 @@ private KTable joinProductWithCategoryDetails(KTable createProductAttributeTable(StreamsBuilder streamsBuilder) { + private KTable createProductAttributeTable(StreamsBuilder streamsBuilder) { return streamsBuilder.stream( kafkaTopicConfig.productAttribute(), - Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeDTO.class))) + Consumed.with(getSerde(KeyDto.class), getMessageDtoSerde(ProductAttributeDto.class))) .selectKey((key, value) -> key.getId()) .mapValues(this::extractModelFromMessage) - .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeDTO.class))); + .toTable(createMaterializedStore( + PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeDto.class))); } - private KTable createProductAttributeValueTable(StreamsBuilder streamsBuilder) { + private KTable createProductAttributeValueTable(StreamsBuilder streamsBuilder) { return streamsBuilder.stream( kafkaTopicConfig.productAttributeValue(), - Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeValueDTO.class))) + Consumed.with(getSerde(KeyDto.class), getMessageDtoSerde(ProductAttributeValueDto.class))) .selectKey((key, value) -> key.getId()) .mapValues(this::extractModelFromMessage) - .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))); + .toTable(createMaterializedStore( + PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW, + Serdes.Long(), + getSerde(ProductAttributeValueDto.class))); } - private KTable filterNonNullProductAttributeValue(KTable productAttributeValueTable) { + private KTable filterNonNullProductAttributeValue( + KTable productAttributeValueTable) { return productAttributeValueTable.toStream() .filter((key, value) -> value != null) - .toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))); + .toTable(createMaterializedStore( + PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW, + Serdes.Long(), + getSerde(ProductAttributeValueDto.class))); } - private KTable enrichProductAttributeValueDetails(KTable productAttributeValueTable, KTable productAttributeTable) { + private KTable enrichProductAttributeValueDetails( + KTable productAttributeValueTable, + KTable productAttributeTable) { return productAttributeValueTable.leftJoin( productAttributeTable, - ProductAttributeValueDTO::getProductAttributeId, + ProductAttributeValueDto::getProductAttributeId, (productAttributeValue, productAttribute) -> { productAttributeValue.setProductAttributeName(productAttribute.getName()); return productAttributeValue; }, - createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class)) - ); + createMaterializedStore( + PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW, + Serdes.Long(), + getSerde(ProductAttributeValueDto.class))); } - private KTable> aggregateProductAttributeDetails(KTable productAttributeValueDetailTable, KTable nonNullProductAttributeValueTable) { + private KTable> aggregateProductAttributeDetails( + KTable productAttributeValueDetailTable, + KTable nonNullProductAttributeValueTable) { return productAttributeValueDetailTable.toStream() - .mapValues(v -> v != null ? v : new ProductAttributeValueDTO(-1L)) + .mapValues(v -> v != null ? v : new ProductAttributeValueDto(-1L)) .join( nonNullProductAttributeValueTable, (productAttributeValueDetail, nonNullProductAttributeValueDetail) -> { @@ -229,66 +288,72 @@ private KTable> aggregateProduct return nonNullProductAttributeValueDetail; } return productAttributeValueDetail; - } - ) + }) .selectKey((key, value) -> value.getProductId()) .groupByKey() .aggregate( - AggregationDTO::new, + AggregationDto::new, (productId, productAttributeValueDetail, agg) -> { agg.setJoinId(productId); - agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId())); + agg.getAggregationContents() + .removeIf(attribute -> productAttributeValueDetail.getProductAttributeId() + .equals(attribute.getId())); if (!productAttributeValueDetail.isDeleted()) { - agg.add(new ProductAttributeDTO( + agg.add(new ProductAttributeDto( productAttributeValueDetail.getOp(), productAttributeValueDetail.getTs(), productAttributeValueDetail.getProductAttributeId(), productAttributeValueDetail.getProductAttributeName(), - productAttributeValueDetail.getValue() - )); + productAttributeValueDetail.getValue())); } return agg; }, - createMaterializedStore(PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, ProductAttributeDTO.class)) - ); + createMaterializedStore( + PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW, + Serdes.Long(), + getAggregationDtoSerde(ProductAttributeDto.class))); } - private KTable joinProductWithAttributeDetails(KTable addingCategoryDetailTable, KTable> productAttributeAgg) { + private KTable joinProductWithAttributeDetails( + KTable addingCategoryDetailTable, + KTable> productAttributeAgg) { return addingCategoryDetailTable.leftJoin( productAttributeAgg, - ProductResultDTO::getId, + ProductResultDto::getId, (productResult, agg) -> { if (agg != null) { productResult.setProductAttributes(agg.getAggregationContents()); } return productResult; }, - createMaterializedStore(PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class)) - ); + createMaterializedStore( + PRODUCT_WITH_ALL_DETAIL_MATERIALIZED_VIEW, + Serdes.Long(), + getSerde(ProductResultDto.class))); } - private void writeToSink(KTable resultTable) { + private void writeToSink(KTable resultTable) { resultTable .toStream() - .to(kafkaTopicConfig.productSink(), Produced.with(Serdes.Long(), getSerde(ProductResultDTO.class))); + .to(kafkaTopicConfig.productSink(), Produced.with(Serdes.Long(), getSerde(ProductResultDto.class))); } - private ProductResultDTO enrichWithProductAndBrandData(ProductDTO productRecord, BrandDTO brandRecord) { - ProductResultDTO productResultDTO = new ProductResultDTO(); - productResultDTO.setBrand(brandRecord); - productResultDTO.setId(productRecord.getId()); - productResultDTO.setName(productRecord.getName()); - productResultDTO.setMetaDescription(productRecord.getMetaDescription()); - productResultDTO.setShortDescription(productRecord.getShortDescription()); - productResultDTO.setPrice(productRecord.getPrice()); - productResultDTO.setMetaTitle(productRecord.getMetaTitle()); - productResultDTO.setSpecification(productRecord.getSpecification()); - productResultDTO.setBrandId(productRecord.getBrandId()); - productResultDTO.setOp(productRecord.getOp()); - productResultDTO.setTs(productRecord.getTs()); - productResultDTO.setPublished(productRecord.getPublished()); - return productResultDTO; + private ProductResultDto enrichWithProductAndBrandData(ProductDto productRecord, BrandDto brandRecord) { + ProductResultDto productResultDto = new ProductResultDto(); + productResultDto.setBrand(brandRecord); + productResultDto.setId(productRecord.getId()); + productResultDto.setName(productRecord.getName()); + productResultDto.setMetaDescription(productRecord.getMetaDescription()); + productResultDto.setShortDescription(productRecord.getShortDescription()); + productResultDto.setPrice(productRecord.getPrice()); + productResultDto.setMetaTitle(productRecord.getMetaTitle()); + productResultDto.setSpecification(productRecord.getSpecification()); + productResultDto.setBrandId(productRecord.getBrandId()); + productResultDto.setOp(productRecord.getOp()); + productResultDto.setTs(productRecord.getTs()); + productResultDto.setPublished(productRecord.getPublished()); + return productResultDto; } }