We can fan-out delivery from an Event source to multiple endpoints using Channels and Subscriptions. In this case, the Channel implementation ensures that messages are delivered to the requested destinations and should buffer the events if the destination service is unavailable.
Different types of channels are possible such as Apache Kafka, GCPPubSub, InMemoryChannel etc. In this example we will create an InMemoryChannel with the following yaml. This is a best effort channel for development usage.
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
metadata:
name: imc-msgtxr
Let us create the channel.
$ oc create -f eventing/in-memory-channel.yaml
inmemorychannel.messaging.knative.dev/imc-msgtxr created
Verify
$ oc get inmemorychannel
NAME READY REASON URL AGE
imc-msgtxr True http://imc-msgtxr-kn-channel.kn-demo.svc.cluster.local 24s
Note the URL from the output as we will use it while adding the event source in the next step.
This time we will again add an event source. But unlike last direct delivery example, the event source in this case will use the above channel as the sink.
kn source ping create msgtxr-pingsource \
--schedule="* * * * *" \
--data="This message is from PingSource" \
--sink=http://imc-msgtxr-kn-channel.kn-demo.svc.cluster.local
- We are using PingSource as the event source in this example as well
- Note the
sink
here points to the channel we created above. Look at the URL.
Verify
% kn source ping list
NAME SCHEDULE SINK AGE CONDITIONS READY REASON
msgtxr-pingsource * * * * * 13m 6 OK / 6 True
At this point the events are being generated but no data should be flowing into the application yet. Run
% oc get po | grep Running
pingsource-msgtxr-pingsour-e574f4cc-1d71-4d6f-b1b9-47f18c6kl9pq 1/1 Running 0 41s
You should see the event source pod running. It is pushing workloads to a sink which is the InMemory channel. But, since the channel is not subscribed by anyone the events are not getting anywhere.
Event subscription allows an event destination to subscribe to a channel. In our case we will configure our service as the destination.
apiVersion: messaging.knative.dev/v1alpha1
kind: Subscription
metadata:
name: msgtxr-subscriber
spec:
channel:
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
name: imc-msgtxr
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: msgtxr-sl
Let us create the subscription
$ oc create -f eventing/event-subscription.yaml
subscription.messaging.knative.dev/msgtxr-subscriber created
Verify
$ oc get subscription.messaging.knative.dev
NAME READY REASON AGE
msgtxr-subscriber True 42s
Wait a min or so and you should see the serverless pod coming up
% oc get po | grep Running
msgtxr-sl-fzpfp-1-deployment-d55996b47-chxhh 2/2 Running 0 3m3s
pingsource-msgtxr-pingsour-e574f4cc-1d71-4d6f-b1b9-47f18c6kl9pq 1/1 Running 0 10m
Check the serverless pod logs for events received from the event source via channel
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-14 04:56:02,354 INFO [io.quarkus] (main) getting-started 1.0-SNAPSHOT (powered by Quarkus 1.3.2.Final) started in 0.013s. Listening on: http://0.0.0.0:8080
2020-05-14 04:56:02,354 INFO [io.quarkus] (main) Profile prod activated.
2020-05-14 04:56:02,354 INFO [io.quarkus] (main) Installed features: [cdi, resteasy]
04:56:03.350 IN {"body":"This message is from PingSource"} OUT {"body":"This message is from PingSource"}
04:57:00.011 IN {"body":"This message is from PingSource"} OUT {"body":"This message is from PingSource"}
In this lab, we have learnt to add a Channel that listens to an Event Source and then we added our Knative service as the destination that subscribes to the Channel
Let us delete the event source.
$ kn source ping delete msgtxr-pingsource
Ping source 'msgtxr-pingsource' deleted in namespace 'kn-demo'.
You should the event source pod go away immediately. If you wait a minute,our serverless service also scales down to zero.
Delete the channel
$ oc delete -f eventing/in-memory-channel.yaml
inmemorychannel.messaging.knative.dev "imc-msgtxr" deleted
Delete subscription
$ oc delete -f eventing/event-subscription.yaml