Skip to content

Commit

Permalink
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
Browse files Browse the repository at this point in the history
…all product data.

-implement kafka stream application to capture all changes related to product
  • Loading branch information
Phuoc Nguyen committed Oct 28, 2024
1 parent 6a0a802 commit d925ea2
Show file tree
Hide file tree
Showing 18 changed files with 439 additions and 204 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package com.yas.recommendation.configuration;

import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -11,25 +19,35 @@
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.streams.StreamsConfig.*;

/**
* KafkaStreamAppConfig is a configuration class that sets up Kafka Streams properties and topics
* for the application. It enables configuration properties for {@link KafkaStreamConfig} and
* {@link KafkaTopicConfig} and defines beans for Kafka Streams configuration and topic creation.
*/
@Configuration
@EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class})
public class KafkaStreamAppConfig {

private final KafkaStreamConfig kafkaStreamConfig;
private final KafkaTopicConfig kafkaTopicConfig;

/**
* Constructs a KafkaStreamAppConfig instance with the specified Kafka Stream and Topic configurations.
*
* @param kafkaStreamConfig the configuration properties for Kafka Streams
* @param kafkaTopicConfig the configuration properties for Kafka topics
*/
@Autowired
public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) {
this.kafkaStreamConfig = kafkaStreamConfig;
this.kafkaTopicConfig = kafkaTopicConfig;
}


/**
* Configures Kafka Streams properties for the application and registers them as a bean.
*
* @return a KafkaStreamsConfiguration object containing the Kafka Streams properties
*/
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kafkaStreamAppConfig() {
Map<String, Object> props = new HashMap<>();
Expand All @@ -42,6 +60,12 @@ KafkaStreamsConfiguration kafkaStreamAppConfig() {
return new KafkaStreamsConfiguration(props);
}

/**
* Creates Kafka topics based on the configuration in {@link KafkaTopicConfig} and registers them
* as a bean to initialize topics on application startup.
*
* @return a KafkaAdmin.NewTopics object containing all the configured Kafka topics
*/
@Bean
public KafkaAdmin.NewTopics createTopics() {
return new KafkaAdmin.NewTopics(
Expand All @@ -61,6 +85,4 @@ private NewTopic createTopic(String topicName) {
.replicas(kafkaTopicConfig.defaultReplicas())
.build();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,24 @@
import lombok.Getter;
import lombok.Setter;

/**
* AggregationDto is a generic container class for aggregating a set of items of type T.
* It includes a join ID for association with other data entities and supports adding
* individual items to the aggregation set.
*
* @param <T> the type of elements to aggregate
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
public class AggregationDTO<I, T> {
private I joinId;
public class AggregationDto<T> {
private Long joinId;
private Set<T> aggregationContents;

public AggregationDTO() {
public AggregationDto() {
aggregationContents = new HashSet<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,30 @@

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
* BaseDto is an abstract base class extending {@link BaseMetaDataDto} that represents
* entities with a unique identifier and name. It provides constructors for initializing
* metadata and basic entity properties.
*/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
public abstract class BaseDto extends BaseMetaDataDto {
protected Long id;
protected String name;

/**
* Constructs a BaseDto instance with operation type, timestamp, ID, and name.
*
* @param op the operation type (e.g., CREATE, UPDATE, DELETE)
* @param ts the timestamp of the operation
* @param id the unique identifier for the entity
* @param name the name associated with the entity
*/
public BaseDto(String op, Long ts, Long id, String name) {
super(op, ts);
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* BaseMetaDataDto serves as an abstract base class for Data Transfer Objects (DTOs)
* that include metadata fields for operation type and timestamp. This metadata helps
* track changes or events associated with the entity.
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/**
* BrandDto is a Data Transfer Object (DTO) that represents brand information.
* It extends {@link BaseDto} to inherit basic entity properties like ID and name,
* along with metadata fields. This class is annotated for JSON serialization,
* ensuring only non-null properties are included and unknown properties are ignored.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
public class BrandDto extends BaseDto {
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/**
* CategoryDto is a Data Transfer Object (DTO) that represents category information.
* It extends {@link BaseDto} to inherit basic entity properties like ID and name,
* along with metadata fields for tracking changes. This class is annotated for JSON
* serialization, ensuring only non-null properties are included and unknown properties
* are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
public class CategoryDto extends BaseDto {
public CategoryDto(String op, Long ts, Long id, String name) {
super(op, ts, id, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
/**
* KeyDto is a Data Transfer Object (DTO) representing the unique identifier (ID) of an entity.
* It is commonly used as the key in message processing contexts.
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KeyDTO {
public class KeyDto {
private Long id;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* MessageDto is a generic Data Transfer Object (DTO) used to represent a change data capture (CDC) message.
* It extends {@link BaseDto} to include metadata and entity identification, and includes fields for
* both the "before" and "after" states of an entity, facilitating change tracking.
*
* @param <T> the type of the entity being captured in the message
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO<T> extends BaseDto {
public class MessageDto<T> extends BaseDto {
private T before;
private T after;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* ProductAttributeDto is a Data Transfer Object (DTO) representing an attribute associated with a product.
* It extends {@link BaseDto} to inherit entity metadata and identification, and includes an additional
* field for the attribute value. This class is annotated for JSON serialization, ensuring only non-null
* properties are included and unknown properties are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ProductAttributeDTO extends BaseDto {
public class ProductAttributeDto extends BaseDto {
private String value;

public ProductAttributeDTO(String op, Long ts, Long id, String name, String value) {
public ProductAttributeDto(String op, Long ts, Long id, String name, String value) {
super(op, ts, id, name);
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* ProductAttributeValueDto is a Data Transfer Object (DTO) representing the value of a specific attribute
* associated with a product. It extends {@link BaseMetaDataDto} to include metadata fields such as
* operation type and timestamp, and it provides additional fields to store attribute-specific information.
* This class is annotated for JSON serialization, ensuring only non-null properties are included and
* unknown properties are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ProductAttributeValueDTO extends BaseMetaDataDto {
public class ProductAttributeValueDto extends BaseMetaDataDto {
private Long id;
private String value;
@JsonProperty("product_id")
Expand All @@ -24,7 +31,7 @@ public class ProductAttributeValueDTO extends BaseMetaDataDto {
private String productAttributeName;
private boolean isDeleted;

public ProductAttributeValueDTO(Long id) {
public ProductAttributeValueDto(Long id) {
this.id = id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* ProductCategoryDto is a Data Transfer Object (DTO) representing the association between
* a product and a category. It extends {@link BaseMetaDataDto} to include metadata fields
* such as operation type and timestamp, and it provides fields for category and product
* association details, including deletion status.
* This class is annotated for JSON serialization, ensuring only non-null properties are
* included and unknown properties are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ProductCategoryDTO extends BaseMetaDataDto {
public class ProductCategoryDto extends BaseMetaDataDto {
private Long id;

@JsonProperty("category_id")
Expand All @@ -27,7 +35,7 @@ public class ProductCategoryDTO extends BaseMetaDataDto {

private boolean isDeleted;

public ProductCategoryDTO(Long id) {
public ProductCategoryDto(Long id) {
this.id = id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* ProductDto is a Data Transfer Object (DTO) representing detailed information about a product.
* It extends {@link BaseDto} to inherit entity properties like ID and name, along with metadata
* fields. This class includes additional product-specific fields such as brand, price, and
* publication status, making it useful for various product-related operations.
* The class is annotated for JSON serialization, ensuring only non-null properties are included
* and unknown properties are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ProductDTO extends BaseDto {
public class ProductDto extends BaseDto {

@JsonProperty("brand_id")
private Long brandId;
Expand Down
Loading

0 comments on commit d925ea2

Please sign in to comment.