Skip to content

Commit

Permalink
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate …
Browse files Browse the repository at this point in the history
…all product data.

-temp commt
  • Loading branch information
Phuoc Nguyen committed Oct 24, 2024
1 parent a04559c commit 851242a
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public AbstractTopology(String sourceTopic, String sinkTopic) {
this.sinkTopic = sinkTopic;
}

protected abstract void proccess(StreamsBuilder streamsBuilder);
protected abstract void process(StreamsBuilder streamsBuilder);

protected <S> Serde<S> getSerde(Class<S> clazz) {
return new JsonSerde<>(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,64 +39,98 @@ public ProductTopology(

@Autowired
@Override
protected void proccess(StreamsBuilder streamsBuilder) {
protected void process(StreamsBuilder streamsBuilder) {
KTable<Long, ProductDTO> productTable = createProductTable(streamsBuilder);
KTable<Long, BrandDTO> brandTable = createBrandTable(streamsBuilder);
KTable<Long, ProductResultDTO> productBrandTable = joinProductAndBrand(productTable, brandTable);

KTable<Long, ProductDTO> productTable = streamsBuilder.stream(
KTable<Long, CategoryDTO> categoryTable = createCategoryTable(streamsBuilder);
KTable<Long, ProductCategoryDTO> productCategoryTable = createProductCategoryTable(streamsBuilder);
KTable<Long, ProductCategoryDTO> nonNullProductCategoryTable = filterNonNullProductCategory(productCategoryTable);
KTable<Long, ProductCategoryDTO> productCategoryDetailTable = enrichProductCategoryWithDetails(productCategoryTable, categoryTable);

KTable<Long, AggregationDTO<Long, CategoryDTO>> productCategoryAgg = aggregateProductCategoryDetails(productCategoryDetailTable, nonNullProductCategoryTable);
displayProductCategoryAggregation(productCategoryAgg);

KTable<Long, ProductResultDTO> addingCategoryDetailTable = joinProductWithCategoryDetails(productBrandTable, productCategoryAgg);

KTable<Long, ProductAttributeDTO> productAttributeTable = createProductAttributeTable(streamsBuilder);
KTable<Long, ProductAttributeValueDTO> productAttributeValueTable = createProductAttributeValueTable(streamsBuilder);
KTable<Long, ProductAttributeValueDTO> nonNullProductAttributeValueTable = filterNonNullProductAttributeValue(productAttributeValueTable);
KTable<Long, ProductAttributeValueDTO> productAttributeValueDetailTable = enrichProductAttributeValueDetails(productAttributeValueTable, productAttributeTable);

KTable<Long, AggregationDTO<Long, ProductAttributeDTO>> productAttributeAgg = aggregateProductAttributeDetails(productAttributeValueDetailTable, nonNullProductAttributeValueTable);

KTable<Long, ProductResultDTO> resultTable = joinProductWithAttributeDetails(addingCategoryDetailTable, productAttributeAgg);

writeToSink(resultTable);
}

private KTable<Long, ProductDTO> createProductTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream(
sourceTopic,
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductDTO.class)))
.selectKey((key, value) -> key.getId())
.mapValues(this::extractModelFromMessage)
.toTable(createMaterializedStore(PRODUCT_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductDTO.class)));
}

KTable<Long, BrandDTO> brandTable = streamsBuilder.stream(
private KTable<Long, BrandDTO> createBrandTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream(
"dbproduct.public.brand",
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(BrandDTO.class))
)
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(BrandDTO.class)))
.selectKey((key, value) -> key.getId())
.mapValues(this::extractModelFromMessage)
.toTable(createMaterializedStore(BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(BrandDTO.class)));
}

KTable<Long, ProductResultDTO> productBrandTable = productTable.leftJoin(
private KTable<Long, ProductResultDTO> joinProductAndBrand(KTable<Long, ProductDTO> productTable, KTable<Long, BrandDTO> brandTable) {
return productTable.leftJoin(
brandTable,
ProductDTO::getBrandId,
this::enrichWithProductAndBrandData,
createMaterializedStore(PRODUCT_BRAND_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
);
}

KTable<Long, CategoryDTO> categoryTable = streamsBuilder.stream(
private KTable<Long, CategoryDTO> createCategoryTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream(
"dbproduct.public.category",
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(CategoryDTO.class))
)
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(CategoryDTO.class)))
.selectKey((key, value) -> key.getId())
.mapValues(this::extractModelFromMessage)
.toTable(createMaterializedStore(CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(CategoryDTO.class)));
}


KTable<Long, ProductCategoryDTO> productCategoryTable = streamsBuilder.stream(
private KTable<Long, ProductCategoryDTO> createProductCategoryTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream(
"dbproduct.public.product_category",
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductCategoryDTO.class))
)
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductCategoryDTO.class)))
.selectKey((key, value) -> key.getId())
.mapValues(this::extractModelFromMessage)
.toTable(createMaterializedStore(PRODUCT_CATEGORY_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)));
}

KTable<Long, ProductCategoryDTO> nonNullProductCategoryTable = productCategoryTable.toStream()
.filter((key, value) -> value != null).toTable(createMaterializedStore(PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)));

private KTable<Long, ProductCategoryDTO> filterNonNullProductCategory(KTable<Long, ProductCategoryDTO> productCategoryTable) {
return productCategoryTable.toStream()
.filter((key, value) -> value != null)
.toTable(createMaterializedStore(PRODUCT_CATEGORY_NON_NULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class)));
}

KTable<Long, ProductCategoryDTO> productCategoryDetailTable = productCategoryTable
.leftJoin(
categoryTable,
ProductCategoryDTO::getCategoryId,
(productCategory, category) -> {
productCategory.setCategoryName(category.getName());
return productCategory;
},
createMaterializedStore(PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))
);
private KTable<Long, ProductCategoryDTO> enrichProductCategoryWithDetails(KTable<Long, ProductCategoryDTO> productCategoryTable, KTable<Long, CategoryDTO> categoryTable) {
return productCategoryTable.leftJoin(
categoryTable,
ProductCategoryDTO::getCategoryId,
(productCategory, category) -> {
productCategory.setCategoryName(category.getName());
return productCategory;
},
createMaterializedStore(PRODUCT_CATEGORY_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductCategoryDTO.class))
);
}

KTable<Long, AggregationDTO<Long, CategoryDTO>> productCategoryAgg = productCategoryDetailTable
.toStream()
private KTable<Long, AggregationDTO<Long, CategoryDTO>> aggregateProductCategoryDetails(KTable<Long, ProductCategoryDTO> productCategoryDetailTable, KTable<Long, ProductCategoryDTO> nonNullProductCategoryTable) {
return productCategoryDetailTable.toStream()
.mapValues(v -> v != null ? v : new ProductCategoryDTO(-1L))
.join(
nonNullProductCategoryTable,
Expand All @@ -108,25 +142,14 @@ protected void proccess(StreamsBuilder streamsBuilder) {
return productCategoryDetail;
}
)
.peek((key, value) -> {
if (value.isDeleted()) {
System.out.println("Lay thong tin cua product category bi xoa:");
System.out.println("Key: " + key);
System.out.println("Product ID: " + value.getProductId());
System.out.println("CategoryId: " + value.getCategoryId());
}

})
.selectKey((key, value) -> value.getProductId())
.groupByKey()
.aggregate(
AggregationDTO::new,
(productId, productCategoryDetail, agg) -> {
agg.setJoinId(productId);
//agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));
if (productCategoryDetail.isDeleted()) {
agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));
System.out.println("Delete category id : " + productCategoryDetail.getCategoryId() + " of product Id " + productCategoryDetail.getProductId());
} else {
agg.getAggregationContents().removeIf(category -> productCategoryDetail.getCategoryId().equals(category.getId()));
agg.add(new CategoryDTO(productCategoryDetail.getMetaData(), productCategoryDetail.getCategoryId(), productCategoryDetail.getCategoryName()));
Expand All @@ -135,73 +158,69 @@ protected void proccess(StreamsBuilder streamsBuilder) {
},
createMaterializedStore(PRODUCT_CATEGORY_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, CategoryDTO.class))
);
}

private void displayProductCategoryAggregation(KTable<Long, AggregationDTO<Long, CategoryDTO>> productCategoryAgg) {
productCategoryAgg.toStream().peek((key, value) -> {
System.out.println("productCategoryAgg---------------");
System.out.println("key: " + key);
if (value != null) {
System.out.println("value : " + value.toString());
}
});
}

KTable<Long, ProductResultDTO> addingCategoryDetailTable = productBrandTable
.leftJoin(
productCategoryAgg,
(productResult, agg) -> {
if (agg != null) {
productResult.setCategories(agg.getAggregationContents());
}
return productResult;
},
createMaterializedStore(PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
);


addingCategoryDetailTable.toStream().peek((key, value) -> {

System.out.println("addingCategoryDetailTable---------------");
System.out.println("key: " + key);
if (value != null) {
System.out.println("value : " + value.toString());
}
});
private KTable<Long, ProductResultDTO> joinProductWithCategoryDetails(KTable<Long, ProductResultDTO> productBrandTable, KTable<Long, AggregationDTO<Long, CategoryDTO>> productCategoryAgg) {
return productBrandTable.leftJoin(
productCategoryAgg,
(productResult, agg) -> {
if (agg != null) {
productResult.setCategories(agg.getAggregationContents());
}
return productResult;
},
createMaterializedStore(PRODUCT_BRAND_CATEGORY_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
);
}

KTable<Long, ProductAttributeDTO> productAttributeTable = streamsBuilder.stream(
private KTable<Long, ProductAttributeDTO> createProductAttributeTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream(
"dbproduct.public.product_attribute",
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeDTO.class))
)
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeDTO.class)))
.selectKey((key, value) -> key.getId())
.mapValues(this::extractModelFromMessage)
.toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeDTO.class)));
}


KTable<Long, ProductAttributeValueDTO> productAttributeValueTable = streamsBuilder.stream(
private KTable<Long, ProductAttributeValueDTO> createProductAttributeValueTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream(
"dbproduct.public.product_attribute_value",
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeValueDTO.class))
)
Consumed.with(getSerde(KeyDTO.class), getMessageDTOSerde(ProductAttributeValueDTO.class)))
.selectKey((key, value) -> key.getId())
.mapValues(this::extractModelFromMessage)
.toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class)));
}


KTable<Long, ProductAttributeValueDTO> nonNullProductAttributeValueTable = productAttributeValueTable.toStream()
private KTable<Long, ProductAttributeValueDTO> filterNonNullProductAttributeValue(KTable<Long, ProductAttributeValueDTO> productAttributeValueTable) {
return productAttributeValueTable.toStream()
.filter((key, value) -> value != null)
.toTable(createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_NONNULL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class)));
}

KTable<Long, ProductAttributeValueDTO> productAttributeValueDetailTable = productAttributeValueTable
.leftJoin(
productAttributeTable,
ProductAttributeValueDTO::getProductAttributeId,
(productAttributeValue, productAttribute) -> {
productAttributeValue.setProductAttributeName(productAttribute.getName());
return productAttributeValue;
},
createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))
);

private KTable<Long, ProductAttributeValueDTO> enrichProductAttributeValueDetails(KTable<Long, ProductAttributeValueDTO> productAttributeValueTable, KTable<Long, ProductAttributeDTO> productAttributeTable) {
return productAttributeValueTable.leftJoin(
productAttributeTable,
ProductAttributeValueDTO::getProductAttributeId,
(productAttributeValue, productAttribute) -> {
productAttributeValue.setProductAttributeName(productAttribute.getName());
return productAttributeValue;
},
createMaterializedStore(PRODUCT_ATTRIBUTE_VALUE_WITH_NAME_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductAttributeValueDTO.class))
);
}

KTable<Long, AggregationDTO<Long, ProductAttributeDTO>> productAttributeAgg = productAttributeValueDetailTable
.toStream()
private KTable<Long, AggregationDTO<Long, ProductAttributeDTO>> aggregateProductAttributeDetails(KTable<Long, ProductAttributeValueDTO> productAttributeValueDetailTable, KTable<Long, ProductAttributeValueDTO> nonNullProductAttributeValueTable) {
return productAttributeValueDetailTable.toStream()
.mapValues(v -> v != null ? v : new ProductAttributeValueDTO(-1L))
.join(
nonNullProductAttributeValueTable,
Expand Down Expand Up @@ -229,30 +248,31 @@ protected void proccess(StreamsBuilder streamsBuilder) {
},
createMaterializedStore(PRODUCT_ATTRIBUTE_AGGREGATION_MATERIALIZED_VIEW, Serdes.Long(), getAggregationDTOSerde(Long.class, ProductAttributeDTO.class))
);
}

KTable<Long, ProductResultDTO> resultTable = addingCategoryDetailTable
.leftJoin(
productAttributeAgg,
ProductResultDTO::getId,
(productResult, agg) -> {
if (agg != null) {
productResult.setProductAttributes(agg.getAggregationContents());
}
return productResult;
},
createMaterializedStore(PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
);
private KTable<Long, ProductResultDTO> joinProductWithAttributeDetails(KTable<Long, ProductResultDTO> addingCategoryDetailTable, KTable<Long, AggregationDTO<Long, ProductAttributeDTO>> productAttributeAgg) {
return addingCategoryDetailTable.leftJoin(
productAttributeAgg,
ProductResultDTO::getId,
(productResult, agg) -> {
if (agg != null) {
productResult.setProductAttributes(agg.getAggregationContents());
}
return productResult;
},
createMaterializedStore(PRODUCT_BRAND_CATEGORY_ATTRIBUTE_DETAIL_MATERIALIZED_VIEW, Serdes.Long(), getSerde(ProductResultDTO.class))
);
}

private void writeToSink(KTable<Long, ProductResultDTO> resultTable) {
resultTable.toStream()
.peek((key, value) -> {

System.out.println("write to sink topic: ");
System.out.println("Key : " + key);
if(value != null) {
if (value != null) {
System.out.println("Value: " + value.toString());
System.out.println("spec: " + value.getSpecification());
}

})
.to("sinkTopic", Produced.with(Serdes.Long(), getSerde(ProductResultDTO.class)));
}
Expand Down

0 comments on commit 851242a

Please sign in to comment.