A simple Example of an Event Driven Flow by the help of SPRING CLOUD STREAM KAFKA
- java.version:
11
- spring-cloud.version:
2020.0.5
- spring-boot.version:
2.5.13
Please visit Spring Cloud Stream Kafka (Part 3) for Project documentation
The Docker-compose file contains: single kafka and zookeeper. just simply run the following command
docker-compose up -d
or
- Control Center UI
Runs a Confluent Control Center that exposes a UI at
http://localhost:9021/
.
docker-compose -f ./kafka-cluster.yml -f ./control-center-ui.yml up
To stop the brokers and the Control Center UI run the following command:
docker-compose -f ./kafka-cluster.yml -f ./control-center-ui.yml down
I assume you already have docker setup in your machine.
run the following command line to create you jar file in target
directory
mvn clean package
Then run the generated jar file in target
folder, (so make sure you are in the same directory when you run the jar
file or give the full path)
java -jar scs-100-0.0.1-SNAPSHOT.jar
the application starts to listen on port 8080.
To scale the application horizontally you can add the following parameter before -jar
by
adding --server.port=8081
(basically a different port) as:
java --server.port=8081 -jar scs-100-0.0.1-SNAPSHOT.jar
or
mvn spring-boot:run -Dserver.port=8081
When you running multiple instances of the same application on a single machine, this path must be unique for each such instance. At this point you should have already seen the information about your topics , to read more
you should now be able to place your order by calling the following curl
command
# assuming your app is listening on 8080
ORDER_UUID=$(curl --silent -H 'Content-Type: application/json' -d "{\"itemName\":\"book\"}" http://localhost:8080/order | jq -r '.orderUuid') && for i in `seq 1 15`; do sleep 1; echo $(curl --silent "http://localhost:8080/order/status/"$ORDER_UUID); done;
Note: make sure you have already installed the jq
Now let’s run the same application multiple times at the same time to simulate the application redundancy. But before that make sure that the current application is not running.
This project code comes with Nginx as LoadBalancer which has already been configured to distribute the incoming traffic from port 8080 and route it into 8081 and 8082.
So first let’s start it in different docker-compose from root on this project “scs-100-2” as:
docker-compose -f nginx/docker-compose.yml up -d
Since the port 8080 is already got occupied by nginx we can run the Ordering application as follow in 2 separated terminal
Terminal 1:
java --server.port=8081 -jar target/scs-100-2-0.0.1-SNAPSHOT.jar
or
mvn spring-boot:run -Dserver.port=8081
And on Terminal 2:
java --server.port=8082 -jar target/scs-100-2-0.0.1-SNAPSHOT.jar
or
mvn spring-boot:run -Dserver.port=8082
Then run our curl call command again (same as the earlier one)
ORDER_UUID=$(curl --silent -H 'Content-Type: application/json' -d "{\"itemName\":\"book\"}" http://localhost:8080/order | jq -r '.orderUuid') && for i in `seq 1 15`; do sleep 1; echo $(curl --silent "http://localhost:8080/order/status/"$ORDER_UUID); done;
Please visit Spring Cloud Stream Kafka (Part 3) for Project documentation
- Spring Cloud Function
- Cloud Events and Spring - part 1 (Oleg Zhurakousky)
- Cloud Events and Spring - part 2 (Oleg Zhurakousky)
- Spring Cloud Stream - demystified and simplified
- Spring Cloud Stream - functional and reactive (Oleg Zhurakousky)
Stream Processing with Spring Cloud Stream and Apache Kafka Streams. Part 6 - State Stores and Interactive Queries
-
Part 1 - Programming Model
-
Part 2 - Programming Model Continued
-
Part 4 - Error Handling
-
Part 5 - Application Customizations
-
Part 6 - State Stores and Interactive Queries
Kafka Streams lets you interactively query the data in the state store in real time as live stream processing is going on. The binder provides abstractions around this feature to make it easier to work with interactive queries. InteractiveQueryService
is a basic API that the binder provides to work with state store querying. You can usually inject this as a bean into your application and then invoke various API methods from it. Here is an example:
@Autowired
private InteractiveQueryService interactiveQueryService;
...
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
Then you can invoke various retrieval methods from the store and iterate through the result. There are various methods that you can invoke from these state stores based on your use case and the type of state store that you are using. Please refer to the Kafka Streams documentation for interactive queries for these various iteration methods available.