Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: adding e2e test for func subscribe command #2110

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hack/binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ main() {
local dapr_version=v1.11.0
local helm_version=v3.12.0
local stern_version=1.25.0
local kn_version=v1.12.0

local em=$(tput bold)$(tput setaf 2)
local me=$(tput sgr0)
Expand All @@ -29,6 +30,7 @@ main() {
install_dapr
install_helm
install_stern
install_kn

echo "${em}DONE${me}"

Expand Down Expand Up @@ -81,4 +83,11 @@ install_stern() {
stern -v
}

install_kn() {
echo 'Installing kn...'
curl -sSLo kn https://github.com/knative/client/releases/download/knative-${kn_version}/kn-linux-amd64
chmod +x kn && sudo mv kn /usr/local/bin/
kn version
}

main "$@"
216 changes: 216 additions & 0 deletions test/e2e/scenario_subscribe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
//go:build e2e && linux

package e2e

import (
"os"
"path/filepath"
"strings"
"testing"
"time"

"gotest.tools/v3/assert"
"k8s.io/apimachinery/pkg/util/rand"
"knative.dev/func/test/oncluster"
"knative.dev/func/test/testhttp"

common "knative.dev/func/test/common"
)

type FuncSubscribeTestType struct {
T *testing.T
TestBrokerName string
TestBrokerUrl string
FuncProducerUrl string
SubscribeToEventType string
}

func (f *FuncSubscribeTestType) newKnCli() *common.TestExecCmd {
knCli := &common.TestExecCmd{
Binary: "kn",
ShouldFailOnError: true,
ShouldDumpCmdLine: true,
ShouldDumpOnSuccess: true,
T: f.T,
}
return knCli
}

func (f *FuncSubscribeTestType) newKubectlCli() *common.TestExecCmd {
kubectl := &common.TestExecCmd{
Binary: "kubectl",
ShouldDumpCmdLine: true,
ShouldDumpOnSuccess: false,
T: f.T,
}
return kubectl
}

func (f *FuncSubscribeTestType) setupBroker() {
f.TestBrokerName = "broker-" + rand.String(5)

knCli := f.newKnCli()
kubectl := f.newKubectlCli()
knCli.Exec("broker", "create", f.TestBrokerName, "--class", "MTChannelBasedBroker")
kubectl.Exec("wait", "broker/"+f.TestBrokerName, "--for=condition=TriggerChannelReady", "--timeout=15s")
cmd := knCli.Exec("broker", "describe", f.TestBrokerName, "-o", "url")

f.TestBrokerUrl = cmd.Out
f.TestBrokerUrl = strings.TrimRight(f.TestBrokerUrl, "\n")
assert.Assert(f.T, strings.HasPrefix(f.TestBrokerUrl, "http"))

f.T.Cleanup(func() {
f.newKnCli().Exec("broker", "delete", f.TestBrokerName)
})
}

// setupProducerFunc creates and deploy a knative function that produces events
// It will take 'type' and 'message' from query string to build and send an event to a TARGET_SINK (env var)
// Example: https://func-producer.default.127.0.0.1.sslip.io?type=HelloEvent&message=HELLO+EVENT+1
func (f *FuncSubscribeTestType) setupProducerFunc() {

var funcProducerName = "func-producer"
var funcProducerPath = filepath.Join(f.T.TempDir(), funcProducerName)

knFunc := common.NewKnFuncShellCli(f.T)
knFunc.Exec("create", "--language", "node", "--template", "http", funcProducerPath)
knFunc.SourceDir = funcProducerPath

indexJsContent := `
const { httpTransport, emitterFor, CloudEvent } = require("cloudevents");
const handle = async (context, body) => {
const ce = new CloudEvent({
source: "test.source",
type: context.query.type,
data: { message: context.query.message }
});
const emit = emitterFor(httpTransport(process.env.TARGET_SINK));
emit(ce);
}
module.exports = { handle };
`
err := os.WriteFile(filepath.Join(funcProducerPath, "index.js"), []byte(indexJsContent), 0644)
oncluster.AssertNoError(f.T, err)

knFunc.Exec("config", "env", "add", "--name", "TARGET_SINK", "--value", f.TestBrokerUrl, "-p", funcProducerPath)
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcProducerPath)
f.FuncProducerUrl = knFunc.Exec("describe", "-o", "url", "-p", funcProducerPath).Out
f.FuncProducerUrl = strings.TrimRight(f.FuncProducerUrl, "\n")

f.T.Cleanup(func() {
knFunc.Exec("delete", funcProducerName)
})
}

// setupConsumerFunc creates and deploy the function that subscribe to events of type HelloEvent
func (f *FuncSubscribeTestType) setupConsumerFunc() {
var funcConsumerName = "func-consumer"
var funcConsumerPath = filepath.Join(f.T.TempDir(), funcConsumerName)

knFunc := common.NewKnFuncShellCli(f.T)
knFunc.Exec("create", "--language", "node", "--template", "cloudevents", funcConsumerPath)
knFunc.SourceDir = funcConsumerPath

indexJsContent := `
const { CloudEvent } = require('cloudevents');
const handle = async (context, event) => {
context.log.warn(event);
console.log(event);
return new CloudEvent({
source: 'consumer.processor',
type: 'consumer.processed'
})
};
module.exports = { handle };
`
err := os.WriteFile(filepath.Join(funcConsumerPath, "index.js"), []byte(indexJsContent), 0644)
oncluster.AssertNoError(f.T, err)

knFunc.Exec("subscribe", "--filter", "type="+f.SubscribeToEventType, "--source", f.TestBrokerName)
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcConsumerPath)

f.T.Cleanup(func() {
knFunc.Exec("delete", funcConsumerName)
})
}

// TestFunctionSubscribeEvents tests the func integration with Kn Events by subscribing to events
// In other words, it tests `func subscribe` command
// To accomplish that the test steps consists in:
// - Deploy a function that produces events and emits to the broker
// - Deploy a function that subscribes to a specific Event Type (HelloEvent)
// - Make the producer func to send events of the expected (HelloEvent) and unexpected (DiscardEvent) CE Type
// - Assert the consumer function only receives the event it has subscribed to
func TestFunctionSubscribeEvents(t *testing.T) {

funcSubTest := &FuncSubscribeTestType{T: t, SubscribeToEventType: "HelloEvent"}

// ----------------------------------
// 1. Setup test Broker
// ----------------------------------
funcSubTest.setupBroker()

// ----------------------------------
// 2. Deploy test functions
// -----------------------------------
deploymentChan := make(chan string)

go func() {
funcSubTest.setupProducerFunc() // "kn function" that emits test events
deploymentChan <- "producer"
}()
go func() {
funcSubTest.setupConsumerFunc() // "kn function" that subscribe to events
deploymentChan <- "consumer"
}()
<-deploymentChan
<-deploymentChan

// ----------------------------------
// 3. Test
// ON WHEN a new event of a specific type is received by the broker
// ASSERT THAT the func-consumer receives the event it has subscribed to
// ----------------------------------

// Watch the logs of func-consumer and inspects for received Events

var gotEventA, gotEventB, c bool
var podReached, podNotFound bool
var doCheck = true

waitChan := make(chan bool)
go func() {
kubectl := funcSubTest.newKubectlCli()
for i := 0; doCheck; i++ {
result := kubectl.Exec("logs", "-l", "function.knative.dev/name=func-consumer", "-c", "user-container")

podNotFound = strings.Contains(result.Out, "No resources found")
podReached = podReached || !podNotFound
gotEventA = gotEventA || strings.Contains(result.Out, "EVENT_A_CATCH_ME")
gotEventB = gotEventB || strings.Contains(result.Out, "EVENT_B_DISCARD_ME")
doCheck = !(i > 20 || (podReached && podNotFound)) // check until function pod is Terminated
if doCheck {
if gotEventA && !c {
c = true
t.Log("Expected EVENT_A received. Watching for non-EVENT_B until function pod is Terminated")
}
kubectl.ShouldDumpCmdLine = false
time.Sleep(6 * time.Second) // 1.5 minutes max wait.
}
}
waitChan <- true
}()
time.Sleep(2 * time.Second)

// Invoke Producer func to force Event A to be emitted. The event should be received by func
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type="+funcSubTest.SubscribeToEventType+"&message=EVENT_A_CATCH_ME")

// Invoke Producer func to force Event B to be emitted. The event should NOT be received by func
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type=DiscardEvent&message=EVENT_B_DISCARD_ME")

<-waitChan

assert.Assert(t, gotEventA, "Event A was not received by the consumer function")
assert.Assert(t, !gotEventB, "Event B was received but it should not be")

}
Loading