Skip to content

Commit

Permalink
feat(aws lambda) add template generator with e2e tests (#1630)
Browse files Browse the repository at this point in the history
* feat(aws lambda) add template generator

* chore(aws lambda) add e2e test
  • Loading branch information
Oleksiivanov authored Jan 10, 2024
1 parent 8a0c444 commit 2dca5b7
Show file tree
Hide file tree
Showing 20 changed files with 884 additions and 231 deletions.
77 changes: 77 additions & 0 deletions connectors-e2e-test/connectors-e2e-test-aws/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-parent</artifactId>
<relativePath>../../parent/pom.xml</relativePath>
<version>8.4.0-SNAPSHOT</version>
</parent>

<description>Tests</description>
<artifactId>connectors-e2e-aws</artifactId>
<packaging>jar</packaging>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${version.localstack}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>${version.aws-java-sdk}</version>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-aws-lambda</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>${version.aws-lambda-java-events}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>${version.aws-lambda-java-core}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connectors-e2e-test-base</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.e2e;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.lambda.AWSLambda;
import com.amazonaws.services.lambda.AWSLambdaClientBuilder;
import io.camunda.connector.aws.ObjectMapperSupplier;
import io.camunda.zeebe.model.bpmn.Bpmn;
import java.io.File;
import java.io.IOException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class AwsLambdaTest extends BaseAwsTest {

protected static final String ELEMENT_TEMPLATE_PATH =
"../../connectors/aws/aws-lambda/element-templates/aws-lambda-outbound-connector.json";
private static final String FUNCTION_NAME = "myLambdaFunction";
private static final String LAMBDA_FUNCTION_ZIP_FILE_PATH = "src/test/resources/function.zip";

private static AWSLambda lambdaClient;

/** Initializes the AWS Lambda client and sets up the Lambda function for testing. */
@BeforeAll
public static void initLambdaClient() throws IOException, InterruptedException {

lambdaClient =
AWSLambdaClientBuilder.standard()
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(localstack.getAccessKey(), localstack.getSecretKey())))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
localstack.getEndpoint().toString(), localstack.getRegion()))
.build();

AwsLambdaTestHelper.waitForLambdaClientInitialization(lambdaClient);
AwsLambdaTestHelper.initializeLambdaFunction(
lambdaClient, LAMBDA_FUNCTION_ZIP_FILE_PATH, FUNCTION_NAME);
AwsLambdaTestHelper.waitForLambdaFunctionToBeReady(lambdaClient, FUNCTION_NAME);
}

@AfterAll
public static void cleanUpLambdaClient() {
if (lambdaClient != null) {
lambdaClient.shutdown();
}
}

/**
* Tests the integration of AWS Lambda within a Camunda BPMN process. This test deploys a BPMN
* model with a Lambda service task and asserts the execution results.
*
* @throws Exception if any exception occurs during the test execution
*/
@Test
public void testLambdaFunction() throws Exception {

var model = Bpmn.createProcess().executable().startEvent().serviceTask("aws").endEvent().done();

var elementTemplate =
ElementTemplate.from(ELEMENT_TEMPLATE_PATH)
.property("authentication.type", "credentials")
.property("authentication.accessKey", localstack.getAccessKey())
.property("authentication.secretKey", localstack.getSecretKey())
.property("configuration.region", localstack.getRegion())
.property("awsFunction.operationType", "sync")
.property("awsFunction.functionName", FUNCTION_NAME)
.property("awsFunction.payload", "str")
.property("retryCount", "0")
.property("awsFunction.payload", "str")
.property("resultExpression", "={response: payload}")
.property("configuration.endpoint", localstack.getEndpoint().toString())
.writeTo(new File(tempDir, "template.json"));

var updatedModel =
new BpmnFile(model)
.writeToFile(new File(tempDir, "test.bpmn"))
.apply(elementTemplate, "aws", new File(tempDir, "result.bpmn"));

var bpmnTest =
ZeebeTest.with(zeebeClient)
.deploy(updatedModel)
.createInstance()
.waitForProcessCompletion();

Object expectedResult =
ObjectMapperSupplier.getMapperInstance()
.readValue(
"{\"statusCode\":200,\"body\":\"{\\\"message\\\": \\\"Hello from your Python Lambda function!\\\", \\\"receivedEvent\\\": \\\"str\\\"}\"}",
Object.class);
assertThat(bpmnTest.getProcessInstanceEvent()).hasVariableWithValue("response", expectedResult);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.e2e;

import com.amazonaws.services.lambda.AWSLambda;
import com.amazonaws.services.lambda.model.CreateFunctionRequest;
import com.amazonaws.services.lambda.model.DeleteFunctionRequest;
import com.amazonaws.services.lambda.model.FunctionCode;
import com.amazonaws.services.lambda.model.GetFunctionRequest;
import com.amazonaws.services.lambda.model.GetFunctionResult;
import com.amazonaws.services.lambda.model.ResourceNotFoundException;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;

/**
* Utility class providing helper methods for setting up and managing AWS Lambda and LocalStack in
* the context of end-to-end testing.
*/
public class AwsLambdaTestHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(AwsLambdaTestHelper.class);

/**
* Waits for LocalStack to become operational. Checks periodically if LocalStack is healthy.
* Throws a runtime exception if LocalStack does not become healthy within the timeout period.
*
* @param localstack The LocalStack container instance.
* @throws InterruptedException if the waiting thread is interrupted.
*/
static void waitForLocalStackToBeHealthy(LocalStackContainer localstack)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long maxDurationMillis = 60000; // 1 minute
boolean isHealthy = false;

while (System.currentTimeMillis() - startTime < maxDurationMillis) {
if (localstack.isCreated() && localstack.isHealthy() && localstack.isRunning()) {
isHealthy = true;
break;
}
Thread.sleep(500); // Wait for 1 second before checking again
}

if (!isHealthy) {
throw new RuntimeException(
"LocalStack did not become healthy within the expected time. "
+ "Status: created = "
+ localstack.isCreated()
+ ", healthy = "
+ localstack.isHealthy()
+ ", running = "
+ localstack.isRunning());
}
}

/**
* Waits for the AWS Lambda client to be initialized. This method periodically checks if the
* Lambda client is ready by attempting to list functions. If it doesn't initialize within the
* specified timeout, a runtime exception is thrown.
*
* @param lambdaClient The AWS Lambda client.
* @throws InterruptedException if the waiting thread is interrupted.
*/
static void waitForLambdaClientInitialization(final AWSLambda lambdaClient)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long maxDurationMillis = 30000; // 30 seconds
while (System.currentTimeMillis() - startTime < maxDurationMillis) {
try {
lambdaClient.listFunctions(); // Check if Lambda client is ready
return; // If the request succeeds, exit the method
} catch (Exception e) {
// Log the exception or handle it as necessary
}
Thread.sleep(100); // Wait for 100 milliseconds before retrying
}
throw new RuntimeException("Lambda client did not initialize within the expected time.");
}

/**
* Initializes the Lambda function for testing. This includes checking if the function already
* exists and deleting it if so, then creating a new function using the provided ZIP file path.
*
* @param lambdaClient The AWS Lambda client.
* @param lambdaFunctionZipFilePath The file path to the ZIP file containing the Lambda function
* code.
* @param functionName The name of the Lambda function to be created.
* @throws IOException if an error occurs during file operations.
*/
static void initializeLambdaFunction(
final AWSLambda lambdaClient,
final String lambdaFunctionZipFilePath,
final String functionName)
throws IOException {
try {
lambdaClient.getFunction(new GetFunctionRequest().withFunctionName(functionName));
LOGGER.info("Function already exists, deleting: " + functionName);
lambdaClient.deleteFunction(new DeleteFunctionRequest().withFunctionName(functionName));
} catch (ResourceNotFoundException e) {
LOGGER.info("Function does not exist, no need to delete: " + functionName);
}

File zipFile = Paths.get(lambdaFunctionZipFilePath).toFile();
CreateFunctionRequest functionRequest =
new CreateFunctionRequest()
.withFunctionName("myLambdaFunction")
.withRuntime("python3.9")
.withRole("arn:aws:iam::000000000000:role/lambda-execute")
.withHandler("lambda_function.lambda_handler")
.withCode(
new FunctionCode()
.withZipFile(ByteBuffer.wrap(Files.readAllBytes(zipFile.toPath()))));

lambdaClient.createFunction(functionRequest);
}

/**
* Waits for the specified Lambda function to be ready for invocation. This method checks the
* function's state and waits until it becomes 'Active'. A runtime exception is thrown if the
* function is not ready within the number of attempts.
*
* @param lambdaClient The AWS Lambda client.
* @param functionName The name of the Lambda function to check.
* @throws InterruptedException if the waiting thread is interrupted.
*/
static void waitForLambdaFunctionToBeReady(
final AWSLambda lambdaClient, final String functionName) throws InterruptedException {
LOGGER.info("Waiting for Lambda function to be ready...");
int attempts = 30;

while (attempts > 0) {
try {
GetFunctionResult function =
lambdaClient.getFunction(new GetFunctionRequest().withFunctionName(functionName));
if (function.getConfiguration().getState().equals("Active")) {
LOGGER.info("Lambda function is ready for invocation.");
return;
}
} catch (ResourceNotFoundException e) {
LOGGER.info("Lambda function does not exist yet. Waiting...");
}
Thread.sleep(1000); // Wait for 1 second before retrying
attempts--;
}
throw new RuntimeException("Lambda function is not ready after waiting.");
}

/**
* Removes Lambda Docker containers that are not automatically cleaned up. This method is a
* workaround to ensure that all related Docker containers are stopped and removed.
*/
static void removeLambdaContainers() {
try {
// Assuming 'docker' command is available on the path
ProcessBuilder builder = new ProcessBuilder();

// List all containers using the specific Lambda image
builder.command(
"docker", "ps", "-a", "-q", "--filter", "ancestor=public.ecr.aws/lambda/python:3.9");
Process process = builder.start();
List<String> containerIds =
new BufferedReader(new InputStreamReader(process.getInputStream())).lines().toList();
process.waitFor();

// Stop and remove containers
for (String containerId : containerIds) {
LOGGER.info("Stopping and removing container: " + containerId);
// Stop container
new ProcessBuilder("docker", "stop", containerId).start().waitFor();
// Remove container
new ProcessBuilder("docker", "rm", containerId).start().waitFor();
}
} catch (IOException | InterruptedException e) {
LOGGER.error("Failed to stop and remove Lambda containers", e);
}
}
}
Loading

0 comments on commit 2dca5b7

Please sign in to comment.