diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml new file mode 100644 index 0000000..ba40eec --- /dev/null +++ b/.github/workflows/integration_test.yaml @@ -0,0 +1,16 @@ +name: Integration tests + +on: + pull_request: + +jobs: + integration-tests: + uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main + secrets: inherit + with: + channel: 1.28-strict/stable + modules: '["test_charm.py"]' + juju-channel: 3.1/stable + self-hosted-runner: true + self-hosted-runner-label: "xlarge" + microk8s-addons: "dns ingress rbac storage metallb:10.15.119.2-10.15.119.4 registry" diff --git a/src/charm.py b/src/charm.py index e6c9c02..1f19868 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,6 +17,7 @@ from charm_helpers import create_env from literals import ( + AIRBYTE_API_PORT, BUCKET_CONFIGS, CONNECTOR_BUILDER_SERVER_API_PORT, CONTAINERS, @@ -282,8 +283,7 @@ def _update(self, event): self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}") return - env = create_env(self.model.name, self.app.name, self.config, self._state) - self.model.unit.set_ports(INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) + self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) for container_name in list(CONTAINERS.keys()): container = self.unit.get_container(container_name) @@ -303,6 +303,7 @@ def _update(self, event): permissions=0o755, ) + env = create_env(self.model.name, self.app.name, container_name, self.config, self._state) pebble_layer = get_pebble_layer(container_name, env) container.add_layer(container_name, pebble_layer, combine=True) container.replan() diff --git a/src/charm_helpers.py b/src/charm_helpers.py index a4bb6d4..f18a3da 100644 --- a/src/charm_helpers.py +++ b/src/charm_helpers.py @@ -15,12 +15,13 @@ from structured_config import StorageType -def create_env(model_name, app_name, config, state): +def create_env(model_name, app_name, container_name, config, state): """Create set of environment variables for application. Args: model_name: Name of the juju model. app_name: Name of the application. + container_name: Name of Airbyte container. config: Charm config. state: Charm state. @@ -70,6 +71,10 @@ def create_env(model_name, app_name, config, state): "AIRBYTE_URL": config["webapp-url"], } + # https://github.com/airbytehq/airbyte/issues/29506#issuecomment-1775148609 + if container_name == "airbyte-api-server": + env.update({"INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}"}) + if config["storage-type"].value == StorageType.minio and state.minio: minio_endpoint = construct_svc_endpoint( state.minio["service"], diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..289a524 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests module.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0998abd --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,16 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Fixtures for jenkins-k8s charm tests.""" + +import pytest + + +def pytest_addoption(parser: pytest.Parser): + """Parse additional pytest options. + + Args: + parser: pytest command line parser. + """ + # The prebuilt charm file. + parser.addoption("--charm-file", action="append", default=[]) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..44d7d2d --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,61 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Charm integration test config.""" + +import asyncio +import logging + +import pytest_asyncio +from helpers import ( + APP_NAME_AIRBYTE_SERVER, + APP_NAME_TEMPORAL_ADMIN, + APP_NAME_TEMPORAL_SERVER, + create_default_namespace, + get_airbyte_charm_resources, + perform_airbyte_integrations, + perform_temporal_integrations, + run_sample_workflow, +) +from pytest_operator.plugin import OpsTest + +logger = logging.getLogger(__name__) + + +@pytest_asyncio.fixture(name="deploy", scope="module") +async def deploy(ops_test: OpsTest): + """Test the app is up and running.""" + await ops_test.model.set_config({"update-status-hook-interval": "1m"}) + + charm = await ops_test.build_charm(".") + resources = get_airbyte_charm_resources() + + await ops_test.model.deploy(charm, resources=resources, application_name=APP_NAME_AIRBYTE_SERVER, trust=True) + await ops_test.model.deploy( + APP_NAME_TEMPORAL_SERVER, + channel="edge", + config={"num-history-shards": 4}, + ) + await ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge") + await ops_test.model.deploy("postgresql-k8s", channel="14/edge", trust=True) + await ops_test.model.deploy("minio", channel="edge") + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=["postgresql-k8s", "minio"], + status="active", + raise_on_blocked=False, + timeout=1200, + ) + await ops_test.model.wait_for_idle( + apps=[APP_NAME_TEMPORAL_SERVER, APP_NAME_TEMPORAL_ADMIN], + status="blocked", + raise_on_blocked=False, + timeout=600, + ) + + await perform_temporal_integrations(ops_test) + await create_default_namespace(ops_test) + await run_sample_workflow(ops_test) + + await perform_airbyte_integrations(ops_test) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py new file mode 100644 index 0000000..6ad9eac --- /dev/null +++ b/tests/integration/helpers.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Charm integration test helpers.""" + +import logging +import time +from pathlib import Path + +import requests +import yaml +from pytest_operator.plugin import OpsTest +from temporal_client.activities import say_hello +from temporal_client.workflows import SayHello +from temporalio.client import Client +from temporalio.worker import Worker + +logger = logging.getLogger(__name__) + +METADATA = yaml.safe_load(Path("./charmcraft.yaml").read_text()) +APP_NAME_AIRBYTE_SERVER = METADATA["name"] +APP_NAME_TEMPORAL_SERVER = "temporal-k8s" +APP_NAME_TEMPORAL_ADMIN = "temporal-admin-k8s" +APP_NAME_TEMPORAL_UI = "temporal-ui-k8s" + +GET_HEADERS = {"accept": "application/json"} +POST_HEADERS = {"accept": "application/json", "content-type": "application/json"} + + +def get_airbyte_charm_resources(): + return { + "airbyte-api-server": METADATA["resources"]["airbyte-api-server"]["upstream-source"], + "airbyte-bootloader": METADATA["resources"]["airbyte-bootloader"]["upstream-source"], + "airbyte-connector-builder-server": METADATA["resources"]["airbyte-connector-builder-server"][ + "upstream-source" + ], + "airbyte-cron": METADATA["resources"]["airbyte-cron"]["upstream-source"], + "airbyte-pod-sweeper": METADATA["resources"]["airbyte-pod-sweeper"]["upstream-source"], + "airbyte-server": METADATA["resources"]["airbyte-server"]["upstream-source"], + "airbyte-workers": METADATA["resources"]["airbyte-workers"]["upstream-source"], + } + + +async def run_sample_workflow(ops_test: OpsTest): + """Connect a client and runs a basic Temporal workflow. + + Args: + ops_test: PyTest object. + """ + url = await get_application_url(ops_test, application=APP_NAME_TEMPORAL_SERVER, port=7233) + logger.info("running workflow on app address: %s", url) + + client = await Client.connect(url) + + # Run a worker for the workflow + async with Worker(client, task_queue="my-task-queue", workflows=[SayHello], activities=[say_hello]): + name = "Jean-luc" + result = await client.execute_workflow(SayHello.run, name, id="my-workflow-id", task_queue="my-task-queue") + logger.info(f"result: {result}") + assert result == f"Hello, {name}!" + + +async def create_default_namespace(ops_test: OpsTest): + """Create default namespace on Temporal server using tctl. + + Args: + ops_test: PyTest object. + """ + # Register default namespace from admin charm. + action = ( + await ops_test.model.applications[APP_NAME_TEMPORAL_ADMIN] + .units[0] + .run_action("tctl", args="--ns default namespace register -rd 3") + ) + result = (await action.wait()).results + logger.info(f"tctl result: {result}") + assert "result" in result and result["result"] == "command succeeded" + + +async def get_application_url(ops_test: OpsTest, application, port): + """Return application URL from the model. + + Args: + ops_test: PyTest object. + application: Name of the application. + port: Port number of the URL. + + Returns: + Application URL of the form {address}:{port} + """ + status = await ops_test.model.get_status() # noqa: F821 + address = status["applications"][application].public_address + return f"{address}:{port}" + + +async def get_unit_url(ops_test: OpsTest, application, unit, port, protocol="http"): + """Return unit URL from the model. + + Args: + ops_test: PyTest object. + application: Name of the application. + unit: Number of the unit. + port: Port number of the URL. + protocol: Transfer protocol (default: http). + + Returns: + Unit URL of the form {protocol}://{address}:{port} + """ + status = await ops_test.model.get_status() # noqa: F821 + address = status["applications"][application]["units"][f"{application}/{unit}"]["address"] + return f"{protocol}://{address}:{port}" + + +async def perform_temporal_integrations(ops_test: OpsTest): + """Integrate Temporal charm with postgresql, admin and ui charms. + + Args: + ops_test: PyTest object. + """ + await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:db", "postgresql-k8s:database") + await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:visibility", "postgresql-k8s:database") + await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:admin", f"{APP_NAME_TEMPORAL_ADMIN}:admin") + await ops_test.model.wait_for_idle( + apps=[APP_NAME_TEMPORAL_SERVER, "postgresql-k8s"], status="active", raise_on_blocked=False, timeout=180 + ) + + assert ops_test.model.applications[APP_NAME_TEMPORAL_SERVER].units[0].workload_status == "active" + + +async def perform_airbyte_integrations(ops_test: OpsTest): + """Perform Airbyte charm integrations. + + Args: + ops_test: PyTest object. + """ + await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "postgresql-k8s") + await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "minio") + await ops_test.model.wait_for_idle( + apps=[APP_NAME_AIRBYTE_SERVER, "postgresql-k8s", "minio"], + status="active", + raise_on_blocked=False, + wait_for_active=True, + idle_period=60, + timeout=600, + ) + + assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active" + + +def get_airbyte_workspace_id(api_url): + """Get Airbyte default workspace ID. + + Args: + api_url: Airbyte API base URL. + """ + url = f"{api_url}/v1/workspaces?includeDeleted=false&limit=20&offset=0" + logger.info("fetching Airbyte workspace ID") + response = requests.get(url, headers=GET_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("data")[0]["workspaceId"] + + +def create_airbyte_source(api_url, workspace_id): + """Create Airbyte sample source. + + Args: + api_url: Airbyte API base URL. + workspace_id: default workspace ID. + """ + url = f"{api_url}/v1/sources" + payload = { + "configuration": {"sourceType": "pokeapi", "pokemon_name": "pikachu"}, + "name": "API Test", + "workspaceId": workspace_id, + } + + logger.info("creating Airbyte source") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("sourceId") + + +def create_airbyte_destination(api_url, model_name, workspace_id, db_password): + """Create Airbyte sample destination. + + Args: + api_url: Airbyte API base URL. + model_name: name of the juju model. + workspace_id: default workspace ID. + password: database password. + """ + url = f"{api_url}/v1/destinations" + payload = { + "configuration": { + "destinationType": "postgres", + "port": 5432, + "schema": "pokeapi", + "ssl_mode": {"mode": "disable"}, + "tunnel_method": {"tunnel_method": "NO_TUNNEL"}, + "host": f"postgresql-k8s-primary.{model_name}.svc.cluster.local", + "database": "airbyte-k8s_db", + "username": "operator", + "password": db_password, + }, + "workspaceId": workspace_id, + "name": "Postgres", + } + + logger.info("creating Airbyte destination") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("destinationId") + + +def create_airbyte_connection(api_url, source_id, destination_id): + """Create Airbyte connection. + + Args: + api_url: Airbyte API base URL. + source_id: Airbyte source ID. + destination_id: Airbyte destination ID. + """ + url = f"{api_url}/v1/connections" + payload = { + "schedule": {"scheduleType": "manual"}, + "dataResidency": "auto", + "namespaceDefinition": "destination", + "namespaceFormat": None, + "nonBreakingSchemaUpdatesBehavior": "ignore", + "sourceId": source_id, + "destinationId": destination_id, + } + + logger.info("creating Airbyte connection") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("connectionId") + + +def trigger_airbyte_connection(api_url, connection_id): + """Trigger Airbyte connection. + + Args: + api_url: Airbyte API base URL. + connection_id: Airbyte connection ID. + """ + url = f"{api_url}/v1/jobs" + payload = {"jobType": "sync", "connectionId": connection_id} + logger.info("triggering Airbyte connection") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("jobId") + + +def check_airbyte_job_status(api_url, job_id): + """Get Airbyte sync job status. + + Args: + api_url: Airbyte API base URL. + job_id: Sync job ID. + """ + url = f"{api_url}/v1/jobs/{job_id}" + logger.info("fetching Airbyte job status") + response = requests.get(url, headers=GET_HEADERS, timeout=120) + logger.info(response.json()) + + return response.json().get("status") + + +def cancel_airbyte_job(api_url, job_id): + """Cancel Airbyte sync job. + + Args: + api_url: Airbyte API base URL. + job_id: Sync job ID. + """ + url = f"{api_url}/v1/jobs/{job_id}" + logger.info("cancelling Airbyte job") + response = requests.delete(url, headers=GET_HEADERS, timeout=120) + logger.info(response.json()) + + return response.json().get("status") + + +async def get_db_password(ops_test): + """Get PostgreSQL DB admin password. + + Args: + ops_test: PyTest object. + """ + postgresql_unit = ops_test.model.applications["postgresql-k8s"].units[0] + for i in range(10): + action = await postgresql_unit.run_action("get-password") + result = await action.wait() + logger.info(f"attempt {i} -> action result {result.status} {result.results}") + if "password" in result.results: + return result.results["password"] + time.sleep(2) + + +async def run_test_sync_job(ops_test): + """Run test Airbyte connection. + + Args: + ops_test: PyTest object. + """ + # Create connection + api_url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8006) + logger.info("curling app address: %s", api_url) + workspace_id = get_airbyte_workspace_id(api_url) + db_password = await get_db_password(ops_test) + assert db_password + + # Create Source + source_id = create_airbyte_source(api_url, workspace_id) + + # Create destination + destination_id = create_airbyte_destination(api_url, ops_test.model.name, workspace_id, db_password) + + # Create connection + connection_id = create_airbyte_connection(api_url, source_id, destination_id) + + # Trigger sync job + for i in range(2): + logger.info(f"attempt {i+1} to trigger new job") + job_id = trigger_airbyte_connection(api_url, connection_id) + + # Wait until job is successful + job_successful = False + for j in range(15): + logger.info(f"job {i+1} attempt {j+1}: getting job status") + status = check_airbyte_job_status(api_url, job_id) + + if status == "failed": + break + + if status == "succeeded": + logger.info(f"job {i+1} attempt {j+1}: job successful!") + job_successful = True + break + + logger.info(f"job {i+1} attempt {j+1}: job still running, retrying in 20 seconds") + time.sleep(20) + + if job_successful: + break + + cancel_airbyte_job(api_url, job_id) + + assert job_successful diff --git a/tests/integration/temporal_client/activities.py b/tests/integration/temporal_client/activities.py new file mode 100644 index 0000000..2e8a8d6 --- /dev/null +++ b/tests/integration/temporal_client/activities.py @@ -0,0 +1,20 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + + +"""Temporal client activity.""" + +from temporalio import activity + + +@activity.defn +async def say_hello(name: str) -> str: + """Temporal activity. + + Args: + name: used to run the dynamic activity. + + Returns: + String in the form "Hello, {name}! + """ + return f"Hello, {name}!" diff --git a/tests/integration/temporal_client/workflows.py b/tests/integration/temporal_client/workflows.py new file mode 100644 index 0000000..058682f --- /dev/null +++ b/tests/integration/temporal_client/workflows.py @@ -0,0 +1,32 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + + +"""Temporal client sample workflow.""" + +import asyncio +from datetime import timedelta +from typing import List + +from temporalio import workflow + +# Import our activity, passing it through the sandbox +with workflow.unsafe.imports_passed_through(): + from .activities import say_hello + + +@workflow.defn +class SayHello: + """Temporal workflow class.""" + + @workflow.run + async def run(self, name: str) -> str: + """Workflow execution method. + + Args: + name: used to run the dynamic activity. + + Returns: + Workflow execution + """ + return await workflow.execute_activity(say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 4b73607..0abc93e 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -1,32 +1,32 @@ +#!/usr/bin/env python3 # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -import asyncio import logging -from pathlib import Path +import time import pytest -import yaml +import requests +from conftest import deploy # noqa: F401, pylint: disable=W0611 +from helpers import APP_NAME_AIRBYTE_SERVER, get_unit_url, run_test_sync_job from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] - @pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest): - """Build the charm-under-test and deploy it together with related charms. - - Assert on the unit status before any relations/configurations take place. - """ - # Build and deploy charm from local source folder - charm = await ops_test.build_charm(".") - resources = {"httpbin-image": METADATA["resources"]["httpbin-image"]["upstream-source"]} - - # Deploy the charm and wait for active/idle status - await asyncio.gather( - ops_test.model.deploy(charm, resources=resources, application_name=APP_NAME), - ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", raise_on_blocked=True, timeout=1000), - ) +@pytest.mark.usefixtures("deploy") +class TestDeployment: + """Integration tests for charm.""" + + async def test_deployment(self, ops_test: OpsTest): + url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8001) + logger.info("curling app address: %s", url) + + response = requests.get(f"{url}/api/v1/health", timeout=300) + + assert response.status_code == 200 + assert response.json().get("available") + + async def test_sync_job(self, ops_test: OpsTest): + await run_test_sync_job(ops_test) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7368e18..ca13a03 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -375,6 +375,9 @@ def create_plan(container_name, storage_type): }, } + if container_name == "airbyte-api-server": + want_plan["services"][container_name]["environment"].update({"INTERNAL_API_HOST": f"http://airbyte-k8s:8001"}) + if storage_type == StorageType.minio: want_plan["services"][container_name]["environment"].update( { diff --git a/tox.ini b/tox.ini index 8141ee6..9854c36 100644 --- a/tox.ini +++ b/tox.ini @@ -112,4 +112,4 @@ commands = --tb native \ --log-cli-level=INFO \ {posargs} \ - {[vars]tests_path}/integration/new_test_charm.py + {[vars]tests_path}/integration/test_charm.py