-
Notifications
You must be signed in to change notification settings - Fork 139
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: adding e2e test for func subscribe command
- Loading branch information
1 parent
32f6d8d
commit 9b9ac87
Showing
2 changed files
with
228 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
//go:build e2e && linux | ||
|
||
package e2e | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"gotest.tools/v3/assert" | ||
"k8s.io/apimachinery/pkg/util/rand" | ||
"knative.dev/func/test/oncluster" | ||
"knative.dev/func/test/testhttp" | ||
|
||
common "knative.dev/func/test/common" | ||
) | ||
|
||
type FuncSubscribeTestType struct { | ||
T *testing.T | ||
TestBrokerName string | ||
TestBrokerUrl string | ||
FuncProducerUrl string | ||
SubscribeToEventType string | ||
} | ||
|
||
func (f *FuncSubscribeTestType) newKnCli() *common.TestExecCmd { | ||
knCli := &common.TestExecCmd{ | ||
Binary: "kn", | ||
ShouldFailOnError: true, | ||
ShouldDumpCmdLine: true, | ||
ShouldDumpOnSuccess: true, | ||
T: f.T, | ||
} | ||
return knCli | ||
} | ||
|
||
func (f *FuncSubscribeTestType) newKubectlCli() *common.TestExecCmd { | ||
kubectl := &common.TestExecCmd{ | ||
Binary: "kubectl", | ||
ShouldDumpCmdLine: true, | ||
ShouldDumpOnSuccess: false, | ||
T: f.T, | ||
} | ||
return kubectl | ||
} | ||
|
||
func (f *FuncSubscribeTestType) setupBroker() { | ||
f.TestBrokerName = "broker-" + rand.String(5) | ||
|
||
knCli := f.newKnCli() | ||
knCli.Exec("broker", "create", f.TestBrokerName, "--class", "MTChannelBasedBroker") | ||
cmd := knCli.Exec("broker", "describe", f.TestBrokerName, "-o", "url") | ||
|
||
f.TestBrokerUrl = cmd.Out | ||
f.TestBrokerUrl = strings.TrimRight(f.TestBrokerUrl, "\n") | ||
assert.Assert(f.T, strings.HasPrefix(f.TestBrokerUrl, "http")) | ||
|
||
f.T.Cleanup(func() { | ||
f.newKnCli().Exec("broker", "delete", f.TestBrokerName) | ||
}) | ||
} | ||
|
||
// setupProducerFunc creates and deploy a knative function that produces events | ||
// It will take 'type' and 'message' from query string to build and send an event to a TARGET_SINK (env var) | ||
// Example: https://func-producer.default.127.0.0.1.sslip.io?type=HelloEvent&message=HELLO+EVENT+1 | ||
func (f *FuncSubscribeTestType) setupProducerFunc() { | ||
|
||
var funcProducerName = "func-producer" | ||
var funcProducerPath = filepath.Join(f.T.TempDir(), funcProducerName) | ||
|
||
knFunc := common.NewKnFuncShellCli(f.T) | ||
knFunc.Exec("create", "--language", "node", "--template", "http", funcProducerPath) | ||
knFunc.SourceDir = funcProducerPath | ||
|
||
indexJsContent := ` | ||
const { httpTransport, emitterFor, CloudEvent } = require("cloudevents"); | ||
const handle = async (context, body) => { | ||
const ce = new CloudEvent({ | ||
source: "test.source", | ||
type: context.query.type, | ||
data: { message: context.query.message } | ||
}); | ||
console.log(ce); | ||
console.log(process.env.TARGET_SINK); | ||
context.log.warn(ce); | ||
const emit = emitterFor(httpTransport(process.env.TARGET_SINK)); | ||
emit(ce); | ||
} | ||
module.exports = { handle }; | ||
` | ||
err := os.WriteFile(filepath.Join(funcProducerPath, "index.js"), []byte(indexJsContent), 0644) | ||
oncluster.AssertNoError(f.T, err) | ||
|
||
knFunc.Exec("config", "env", "add", "--name", "TARGET_SINK", "--value", f.TestBrokerUrl, "-p", funcProducerPath) | ||
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcProducerPath) | ||
f.FuncProducerUrl = knFunc.Exec("describe", "-o", "url", "-p", funcProducerPath).Out | ||
f.FuncProducerUrl = strings.TrimRight(f.FuncProducerUrl, "\n") | ||
|
||
f.T.Cleanup(func() { | ||
knFunc.Exec("delete", funcProducerName) | ||
}) | ||
} | ||
|
||
// setupConsumerFunc creates and deploy the function that subscribe to events of type HelloEvent | ||
func (f *FuncSubscribeTestType) setupConsumerFunc() { | ||
var funcConsumerName = "func-consumer" | ||
var funcConsumerPath = filepath.Join(f.T.TempDir(), funcConsumerName) | ||
|
||
knFunc := common.NewKnFuncShellCli(f.T) | ||
knFunc.Exec("create", "--language", "node", "--template", "cloudevents", funcConsumerPath) | ||
knFunc.SourceDir = funcConsumerPath | ||
|
||
indexJsContent := ` | ||
const { CloudEvent } = require('cloudevents'); | ||
const handle = async (context, event) => { | ||
context.log.warn(event); | ||
console.log(event); | ||
return new CloudEvent({ | ||
source: 'consumer.processor', | ||
type: 'consumer.processed' | ||
}) | ||
}; | ||
module.exports = { handle }; | ||
` | ||
err := os.WriteFile(filepath.Join(funcConsumerPath, "index.js"), []byte(indexJsContent), 0644) | ||
oncluster.AssertNoError(f.T, err) | ||
|
||
knFunc.Exec("subscribe", "--filter", "type="+f.SubscribeToEventType, "--source", f.TestBrokerName) | ||
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcConsumerPath) | ||
|
||
f.T.Cleanup(func() { | ||
knFunc.Exec("delete", funcConsumerName) | ||
}) | ||
} | ||
|
||
// TestFunctionSubscribeEvents tests the func integration with Kn Events by subscribing to events | ||
// In other words, it tests `func subscribe` command | ||
// To accomplish that the test steps consists in: | ||
// - Deploy a function that produces events and emits to the broker | ||
// - Deploy a function that subscribes to a specific Event Type (HelloEvent) | ||
// - Make the producer func to send events of the expected (HelloEvent) and unexpected (DiscardEvent) CE Type | ||
// - Assert the consumer function only receives the event it has subscribed to | ||
func TestFunctionSubscribeEvents(t *testing.T) { | ||
|
||
funcSubTest := &FuncSubscribeTestType{T: t, SubscribeToEventType: "HelloEvent"} | ||
|
||
// ---------------------------------- | ||
// 1. Setup test Broker | ||
// ---------------------------------- | ||
funcSubTest.setupBroker() | ||
|
||
// ---------------------------------- | ||
// 2. Deploy test functions | ||
// ----------------------------------- | ||
deploymentChan := make(chan string) | ||
|
||
go func() { | ||
funcSubTest.setupProducerFunc() // "kn function" that emits test events | ||
deploymentChan <- "producer" | ||
}() | ||
go func() { | ||
funcSubTest.setupConsumerFunc() // "kn function" that subscribe to events | ||
deploymentChan <- "consumer" | ||
}() | ||
<-deploymentChan | ||
<-deploymentChan | ||
|
||
// ---------------------------------- | ||
// 3. Test | ||
// ON WHEN a new event of a specific type is received by the broker | ||
// ASSERT THAT the func-consumer receives the event it has subscribed to | ||
// ---------------------------------- | ||
|
||
// Watch the logs of func-consumer and inspects for received Events | ||
|
||
var gotEventA, gotEventB, c bool | ||
var podReached, podNotFound bool | ||
var doCheck = true | ||
|
||
waitChan := make(chan bool) | ||
go func() { | ||
kubectl := funcSubTest.newKubectlCli() | ||
for i := 0; doCheck; i++ { | ||
result := kubectl.Exec("logs", "-l", "function.knative.dev/name=func-consumer", "-c", "user-container") | ||
|
||
podNotFound = strings.Contains(result.Out, "No resources found") | ||
podReached = podReached || !podNotFound | ||
gotEventA = gotEventA || strings.Contains(result.Out, "EVENT_A_CATCH_ME") | ||
gotEventB = gotEventB || strings.Contains(result.Out, "EVENT_B_DISCARD_ME") | ||
doCheck = !(i > 20 || (podReached && podNotFound)) // check until function pod is Terminated | ||
if doCheck { | ||
if gotEventA && !c { | ||
c = true | ||
t.Log("Expected EVENT_A received. Watching for non-EVENT_B until function pod is Terminated") | ||
} | ||
kubectl.ShouldDumpCmdLine = false | ||
time.Sleep(6 * time.Second) // 1.5 minutes max wait. | ||
} | ||
} | ||
waitChan <- true | ||
}() | ||
time.Sleep(2 * time.Second) | ||
|
||
// Invoke Producer func to force Event A to be emitted. The event should be received by func | ||
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type="+funcSubTest.SubscribeToEventType+"&message=EVENT_A_CATCH_ME") | ||
|
||
// Invoke Producer func to force Event B to be emitted. The event should NOT be received by func | ||
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type=DiscardEvent&message=EVENT_B_DISCARD_ME") | ||
|
||
<-waitChan | ||
|
||
assert.Assert(t, gotEventA, "Event A was not received by the consumer function") | ||
assert.Assert(t, !gotEventB, "Event B was received but it should not be") | ||
|
||
} |