From 2d9e4d86a2d274efdff6f0a9b1b026ae4c372a38 Mon Sep 17 00:00:00 2001 From: Phuoc Nguyen Date: Fri, 25 Oct 2024 09:02:02 +0700 Subject: [PATCH] #1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate all product data. -implement kafka stream application to capture all changes related to product --- .../topology/ProductTopology.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java index 6da6d61a99..9dab6d6837 100644 --- a/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java +++ b/recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java @@ -146,10 +146,9 @@ private KTable> aggregateProductCategory AggregationDTO::new, (productId, productCategoryDetail, agg) -> { agg.setJoinId(productId); - if (productCategoryDetail.isDeleted()) { - agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId())); - } else { - agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId())); + agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId())); + + if (!productCategoryDetail.isDeleted()) { agg.add(new CategoryDTO(productCategoryDetail.getMetaData(), productCategoryDetail.getCategoryId(), productCategoryDetail.getCategoryName())); } return agg; @@ -226,11 +225,15 @@ private KTable> aggregateProduct AggregationDTO::new, (productId, productAttributeValueDetail, agg) -> { agg.setJoinId(productId); - if (productAttributeValueDetail.isDeleted()) { - agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId())); - } else { - agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId())); - agg.add(new ProductAttributeDTO(productAttributeValueDetail.getMetaData(), productAttributeValueDetail.getProductAttributeId(), productAttributeValueDetail.getProductAttributeName(), productAttributeValueDetail.getValue())); + agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId())); + + if (!productAttributeValueDetail.isDeleted()) { + agg.add(new ProductAttributeDTO( + productAttributeValueDetail.getMetaData(), + productAttributeValueDetail.getProductAttributeId(), + productAttributeValueDetail.getProductAttributeName(), + productAttributeValueDetail.getValue() + )); } return agg; },