Skip to content

Commit

Permalink
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
Browse files Browse the repository at this point in the history
…all product data.

-implement kafka stream application to capture all changes related to product
  • Loading branch information
Phuoc Nguyen committed Oct 25, 2024
1 parent bf23f5a commit 2d9e4d8
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ private KTable<Long, AggregationDTO<Long, CategoryDTO>> aggregateProductCategory
AggregationDTO::new,
(productId, productCategoryDetail, agg) -> {
agg.setJoinId(productId);
if (productCategoryDetail.isDeleted()) {
agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));
} else {
agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));
agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));

if (!productCategoryDetail.isDeleted()) {
agg.add(new CategoryDTO(productCategoryDetail.getMetaData(), productCategoryDetail.getCategoryId(), productCategoryDetail.getCategoryName()));
}
return agg;
Expand Down Expand Up @@ -226,11 +225,15 @@ private KTable<Long, AggregationDTO<Long, ProductAttributeDTO>> aggregateProduct
AggregationDTO::new,
(productId, productAttributeValueDetail, agg) -> {
agg.setJoinId(productId);
if (productAttributeValueDetail.isDeleted()) {
agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId()));
} else {
agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId()));
agg.add(new ProductAttributeDTO(productAttributeValueDetail.getMetaData(), productAttributeValueDetail.getProductAttributeId(), productAttributeValueDetail.getProductAttributeName(), productAttributeValueDetail.getValue()));
agg.getAggregationContents().removeIf(attribute -> productAttributeValueDetail.getProductAttributeId().equals(attribute.getId()));

if (!productAttributeValueDetail.isDeleted()) {
agg.add(new ProductAttributeDTO(
productAttributeValueDetail.getMetaData(),
productAttributeValueDetail.getProductAttributeId(),
productAttributeValueDetail.getProductAttributeName(),
productAttributeValueDetail.getValue()
));
}
return agg;
},
Expand Down

0 comments on commit 2d9e4d8

Please sign in to comment.