Skip to content

Commit

Permalink
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
Browse files Browse the repository at this point in the history
…all product data.

-implement kafka stream application to capture all changes related to product
  • Loading branch information
Phuoc Nguyen committed Oct 29, 2024
1 parent 8b4c8f9 commit f98e6d4
Show file tree
Hide file tree
Showing 23 changed files with 1,037 additions and 6 deletions.
17 changes: 17 additions & 0 deletions kafka/connects/debezium-product-group.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "dbproduct",
"database.user": "admin",
"database.dbname": "product",
"database.hostname": "postgres",
"database.password": "admin",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product_attribute, public.product_attribute_value, public.product_category, public.category, public.brand, public.product",
"include.toasts": "true",
"slot.name": "product_relation_slot"
}
3 changes: 2 additions & 1 deletion kafka/connects/debezium-product.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product",
"slot.name": "product_slot"
"slot.name": "product_slot",
"topic.creation.default.cleanup.policy": "compact"
}
4 changes: 4 additions & 0 deletions recommendation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>

<!-- Spring AI Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
@EnableConfigurationProperties(EmbeddingSearchConfiguration.class)
public class RecommendationApplication {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.yas.recommendation.configuration;

import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.support.serializer.JsonDeserializer;

/**
* KafkaStreamAppConfig is a configuration class that sets up Kafka Streams properties and topics
* for the application. It enables configuration properties for {@link KafkaStreamConfig} and
* {@link KafkaTopicConfig} and defines beans for Kafka Streams configuration and topic creation.
*/
@Configuration
@EnableConfigurationProperties({KafkaStreamConfig.class, KafkaTopicConfig.class})
public class KafkaStreamAppConfig {

private final KafkaStreamConfig kafkaStreamConfig;
private final KafkaTopicConfig kafkaTopicConfig;

/**
* Constructs a KafkaStreamAppConfig instance with the specified Kafka Stream and Topic configurations.
*
* @param kafkaStreamConfig the configuration properties for Kafka Streams
* @param kafkaTopicConfig the configuration properties for Kafka topics
*/
@Autowired
public KafkaStreamAppConfig(KafkaStreamConfig kafkaStreamConfig, KafkaTopicConfig kafkaTopicConfig) {
this.kafkaStreamConfig = kafkaStreamConfig;
this.kafkaTopicConfig = kafkaTopicConfig;
}

/**
* Configures Kafka Streams properties for the application and registers them as a bean.
*
* @return a KafkaStreamsConfiguration object containing the Kafka Streams properties
*/
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kafkaStreamAppConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, kafkaStreamConfig.applicationId());
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaStreamConfig.bootstrapServers());
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultKeySerdeClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaStreamConfig.defaultValueSerdeClass());
props.put(JsonDeserializer.TRUSTED_PACKAGES, kafkaStreamConfig.trustedPackages());
props.put(COMMIT_INTERVAL_MS_CONFIG, kafkaStreamConfig.commitIntervalMs());
return new KafkaStreamsConfiguration(props);
}

/**
* Creates Kafka topics based on the configuration in {@link KafkaTopicConfig} and registers them
* as a bean to initialize topics on application startup.
*
* @return a KafkaAdmin.NewTopics object containing all the configured Kafka topics
*/
@Bean
public KafkaAdmin.NewTopics createTopics() {
return new KafkaAdmin.NewTopics(
createTopic(kafkaTopicConfig.product()),
createTopic(kafkaTopicConfig.brand()),
createTopic(kafkaTopicConfig.category()),
createTopic(kafkaTopicConfig.productCategory()),
createTopic(kafkaTopicConfig.productAttribute()),
createTopic(kafkaTopicConfig.productAttributeValue()),
createTopic(kafkaTopicConfig.productSink())
);
}

private NewTopic createTopic(String topicName) {
return TopicBuilder.name(topicName)
.partitions(kafkaTopicConfig.defaultPartitions())
.replicas(kafkaTopicConfig.defaultReplicas())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.yas.recommendation.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "yas.kafka.stream")
public record KafkaStreamConfig(
String applicationId,
String bootstrapServers,
String defaultKeySerdeClass,
String defaultValueSerdeClass,
String trustedPackages,
int commitIntervalMs
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.yas.recommendation.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "yas.kafka.topic")
public record KafkaTopicConfig(
String product,
String brand,
String category,
String productCategory,
String productAttribute,
String productAttributeValue,
String productSink,
int defaultPartitions,
int defaultReplicas
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.HashSet;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

/**
* AggregationDto is a generic container class for aggregating a set of items of type T.
* It includes a join ID for association with other data entities and supports adding
* individual items to the aggregation set.
*
* @param <T> the type of elements to aggregate
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
@SuppressWarnings("java:S1068")
public class AggregationDto<T> {
private Long joinId;
private Set<T> aggregationContents;

public AggregationDto() {
aggregationContents = new HashSet<>();
}

public void add(T aggregationContent) {
aggregationContents.add(aggregationContent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.yas.recommendation.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
* BaseDto is an abstract base class extending {@link BaseMetaDataDto} that represents
* entities with a unique identifier and name. It provides constructors for initializing
* metadata and basic entity properties.
*/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuppressWarnings("java:S1068")
public abstract class BaseDto extends BaseMetaDataDto {
protected Long id;
protected String name;

/**
* Constructs a BaseDto instance with operation type, timestamp, ID, and name.
*
* @param op the operation type (e.g., CREATE, UPDATE, DELETE)
* @param ts the timestamp of the operation
* @param id the unique identifier for the entity
* @param name the name associated with the entity
*/
protected BaseDto(String op, Long ts, Long id, String name) {
super(op, ts);
this.id = id;
this.name = name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.yas.recommendation.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* BaseMetaDataDto serves as an abstract base class for Data Transfer Objects (DTOs)
* that include metadata fields for operation type and timestamp. This metadata helps
* track changes or events associated with the entity.
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuppressWarnings("java:S1068")
public abstract class BaseMetaDataDto {
protected String op;
protected Long ts = System.currentTimeMillis();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/**
* BrandDto is a Data Transfer Object (DTO) that represents brand information.
* It extends {@link BaseDto} to inherit basic entity properties like ID and name,
* along with metadata fields. This class is annotated for JSON serialization,
* ensuring only non-null properties are included and unknown properties are ignored.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
public class BrandDto extends BaseDto {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/**
* CategoryDto is a Data Transfer Object (DTO) that represents category information.
* It extends {@link BaseDto} to inherit basic entity properties like ID and name,
* along with metadata fields for tracking changes. This class is annotated for JSON
* serialization, ensuring only non-null properties are included and unknown properties
* are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
public class CategoryDto extends BaseDto {
public CategoryDto(String op, Long ts, Long id, String name) {
super(op, ts, id, name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* KeyDto is a Data Transfer Object (DTO) representing the unique identifier (ID) of an entity.
* It is commonly used as the key in message processing contexts.
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@SuppressWarnings("java:S1068")
public class KeyDto {
private Long id;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* MessageDto is a generic Data Transfer Object (DTO) used to represent a change data capture (CDC) message.
* It extends {@link BaseDto} to include metadata and entity identification, and includes fields for
* both the "before" and "after" states of an entity, facilitating change tracking.
*
* @param <T> the type of the entity being captured in the message
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@SuppressWarnings("java:S1068")
public class MessageDto<T> extends BaseMetaDataDto {
private T before;
private T after;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.yas.recommendation.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* ProductAttributeDto is a Data Transfer Object (DTO) representing an attribute associated with a product.
* It extends {@link BaseDto} to inherit entity metadata and identification, and includes an additional
* field for the attribute value. This class is annotated for JSON serialization, ensuring only non-null
* properties are included and unknown properties are ignored during deserialization.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@SuppressWarnings("java:S1068")
public class ProductAttributeDto extends BaseDto {
private String value;

public ProductAttributeDto(String op, Long ts, Long id, String name, String value) {
super(op, ts, id, name);
this.value = value;
}
}
Loading

0 comments on commit f98e6d4

Please sign in to comment.