Skip to content

Commit

Permalink
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
Browse files Browse the repository at this point in the history
…all product data.

-implement kafka stream application to capture all changes related to product
  • Loading branch information
Phuoc Nguyen committed Oct 16, 2024
1 parent c5106ab commit 222fa3c
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 60 deletions.
3 changes: 2 additions & 1 deletion kafka/connects/debezium-product-group.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product_attribute, public.product_attribute_value, public.product_category, public.category, public.brand",
"slot.name": "product_relation_slot"
"slot.name": "product_relation_slot",
"topic.creation.default.cleanup.policy": "compact"
}
3 changes: 2 additions & 1 deletion kafka/connects/debezium-product.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.product",
"slot.name": "product_slot"
"slot.name": "product_slot",
"topic.creation.default.cleanup.policy": "compact"
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ protected void proccess(StreamsBuilder streamsBuilder) {
createMaterializedStore(PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, CategoryDTO.class))
);


KTable<Long, ProductResultDTO> addingCategoryDetailTable = productBrandTable
.leftJoin(
productCategoryAgg,
ProductResultDTO::getId,
(productResult, agg) -> {
if (agg != null) {
productResult.setCategories(agg.getAggregationContents());
Expand Down
2 changes: 2 additions & 0 deletions recommendation/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ product.topic.name = dbproduct.public.product
spring.kafka.consumer.bootstrap-servers=kafka:9092
spring.kafka.consumer.group-id=recommendation

product.sink.topic.name=test

# Similarity Search Config
yas.recommendation.embedding-based.search.topK=10
yas.recommendation.embedding-based.search.initDefaultData=false
Expand Down

0 comments on commit 222fa3c

Please sign in to comment.