diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml index 0f2cf59..ba40eec 100644 --- a/.github/workflows/integration_test.yaml +++ b/.github/workflows/integration_test.yaml @@ -4,25 +4,13 @@ on: pull_request: jobs: - integration-test-microk8s: - name: Integration tests (microk8s) - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Setup operator environment - uses: charmed-kubernetes/actions-operator@main - with: - juju-channel: 3.1/stable - provider: microk8s - microk8s-addons: "ingress storage dns rbac registry" - channel: 1.25-strict/stable - - name: Run integration tests - # set a predictable model name so it can be consumed by charm-logdump-action - run: tox -e integration -- --model testing - - name: Dump logs - uses: canonical/charm-logdump-action@main - if: failure() - with: - app: airbyte-ui-k8s - model: testing + integration-tests: + uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main + secrets: inherit + with: + channel: 1.28-strict/stable + modules: '["test_charm.py"]' + juju-channel: 3.1/stable + self-hosted-runner: true + self-hosted-runner-label: "xlarge" + microk8s-addons: "dns ingress rbac storage metallb:10.15.119.2-10.15.119.4 registry" diff --git a/src/charm.py b/src/charm.py index e6c9c02..1f19868 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,6 +17,7 @@ from charm_helpers import create_env from literals import ( + AIRBYTE_API_PORT, BUCKET_CONFIGS, CONNECTOR_BUILDER_SERVER_API_PORT, CONTAINERS, @@ -282,8 +283,7 @@ def _update(self, event): self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}") return - env = create_env(self.model.name, self.app.name, self.config, self._state) - self.model.unit.set_ports(INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) + self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) for container_name in list(CONTAINERS.keys()): container = self.unit.get_container(container_name) @@ -303,6 +303,7 @@ def _update(self, event): permissions=0o755, ) + env = create_env(self.model.name, self.app.name, container_name, self.config, self._state) pebble_layer = get_pebble_layer(container_name, env) container.add_layer(container_name, pebble_layer, combine=True) container.replan() diff --git a/src/charm_helpers.py b/src/charm_helpers.py index a4bb6d4..f18a3da 100644 --- a/src/charm_helpers.py +++ b/src/charm_helpers.py @@ -15,12 +15,13 @@ from structured_config import StorageType -def create_env(model_name, app_name, config, state): +def create_env(model_name, app_name, container_name, config, state): """Create set of environment variables for application. Args: model_name: Name of the juju model. app_name: Name of the application. + container_name: Name of Airbyte container. config: Charm config. state: Charm state. @@ -70,6 +71,10 @@ def create_env(model_name, app_name, config, state): "AIRBYTE_URL": config["webapp-url"], } + # https://github.com/airbytehq/airbyte/issues/29506#issuecomment-1775148609 + if container_name == "airbyte-api-server": + env.update({"INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}"}) + if config["storage-type"].value == StorageType.minio and state.minio: minio_endpoint = construct_svc_endpoint( state.minio["service"], diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..289a524 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests module.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0998abd --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,16 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Fixtures for jenkins-k8s charm tests.""" + +import pytest + + +def pytest_addoption(parser: pytest.Parser): + """Parse additional pytest options. + + Args: + parser: pytest command line parser. + """ + # The prebuilt charm file. + parser.addoption("--charm-file", action="append", default=[]) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ca12f76..44d7d2d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -15,6 +15,7 @@ get_airbyte_charm_resources, perform_airbyte_integrations, perform_temporal_integrations, + run_sample_workflow, ) from pytest_operator.plugin import OpsTest @@ -24,24 +25,27 @@ @pytest_asyncio.fixture(name="deploy", scope="module") async def deploy(ops_test: OpsTest): """Test the app is up and running.""" + await ops_test.model.set_config({"update-status-hook-interval": "1m"}) + charm = await ops_test.build_charm(".") resources = get_airbyte_charm_resources() - asyncio.gather( - ops_test.model.deploy(charm, resources=resources, application_name=APP_NAME_AIRBYTE_SERVER, trust=True), - ops_test.model.deploy( - APP_NAME_TEMPORAL_SERVER, - channel="edge", - config={"num-history-shards": 1}, - ), - ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge"), - ops_test.model.deploy("postgresql-k8s", channel="14/stable", trust=True), - ops_test.model.deploy("minio", channel="edge"), + await ops_test.model.deploy(charm, resources=resources, application_name=APP_NAME_AIRBYTE_SERVER, trust=True) + await ops_test.model.deploy( + APP_NAME_TEMPORAL_SERVER, + channel="edge", + config={"num-history-shards": 4}, ) + await ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge") + await ops_test.model.deploy("postgresql-k8s", channel="14/edge", trust=True) + await ops_test.model.deploy("minio", channel="edge") async with ops_test.fast_forward(): await ops_test.model.wait_for_idle( - apps=["postgresql-k8s", "minio"], status="active", raise_on_blocked=False, timeout=1200 + apps=["postgresql-k8s", "minio"], + status="active", + raise_on_blocked=False, + timeout=1200, ) await ops_test.model.wait_for_idle( apps=[APP_NAME_TEMPORAL_SERVER, APP_NAME_TEMPORAL_ADMIN], @@ -52,5 +56,6 @@ async def deploy(ops_test: OpsTest): await perform_temporal_integrations(ops_test) await create_default_namespace(ops_test) + await run_sample_workflow(ops_test) await perform_airbyte_integrations(ops_test) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index b0ed688..94f9e7f 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -2,11 +2,13 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Temporal charm integration test helpers.""" +"""Charm integration test helpers.""" import logging +import time from pathlib import Path +import requests import yaml from pytest_operator.plugin import OpsTest from temporal_client.activities import say_hello @@ -22,6 +24,9 @@ APP_NAME_TEMPORAL_ADMIN = "temporal-admin-k8s" APP_NAME_TEMPORAL_UI = "temporal-ui-k8s" +GET_HEADERS = {"accept": "application/json"} +POST_HEADERS = {"accept": "application/json", "content-type": "application/json"} + def get_airbyte_charm_resources(): return { @@ -117,7 +122,7 @@ async def perform_temporal_integrations(ops_test: OpsTest): await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:visibility", "postgresql-k8s:database") await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:admin", f"{APP_NAME_TEMPORAL_ADMIN}:admin") await ops_test.model.wait_for_idle( - apps=[APP_NAME_TEMPORAL_SERVER], status="active", raise_on_blocked=False, timeout=180 + apps=[APP_NAME_TEMPORAL_SERVER, "postgresql-k8s"], status="active", raise_on_blocked=False, timeout=180 ) assert ops_test.model.applications[APP_NAME_TEMPORAL_SERVER].units[0].workload_status == "active" @@ -132,7 +137,168 @@ async def perform_airbyte_integrations(ops_test: OpsTest): await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "postgresql-k8s") await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "minio") await ops_test.model.wait_for_idle( - apps=[APP_NAME_AIRBYTE_SERVER], status="active", raise_on_blocked=False, timeout=600 + apps=[APP_NAME_AIRBYTE_SERVER, "postgresql-k8s", "minio"], + status="active", + raise_on_blocked=False, + wait_for_active=True, + idle_period=60, + timeout=600, ) assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active" + + +def get_airbyte_workspace_id(api_url): + """Get Airbyte default workspace ID. + + Args: + api_url: Airbyte API base URL. + """ + url = f"{api_url}/v1/workspaces?includeDeleted=false&limit=20&offset=0" + logger.info("fetching Airbyte workspace ID") + response = requests.get(url, headers=GET_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("data")[0]["workspaceId"] + + +def create_airbyte_source(api_url, workspace_id): + """Create Airbyte sample source. + + Args: + api_url: Airbyte API base URL. + workspace_id: default workspace ID. + """ + url = f"{api_url}/v1/sources" + payload = { + "configuration": {"sourceType": "pokeapi", "pokemon_name": "pikachu"}, + "name": "API Test", + "workspaceId": workspace_id, + } + + logger.info("creating Airbyte source") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("sourceId") + + +def create_airbyte_destination(api_url, model_name, workspace_id, db_password): + """Create Airbyte sample destination. + + Args: + api_url: Airbyte API base URL. + model_name: name of the juju model. + workspace_id: default workspace ID. + password: database password. + """ + url = f"{api_url}/v1/destinations" + payload = { + "configuration": { + "destinationType": "postgres", + "port": 5432, + "schema": "pokeapi", + "ssl_mode": {"mode": "disable"}, + "tunnel_method": {"tunnel_method": "NO_TUNNEL"}, + "host": f"postgresql-k8s-primary.{model_name}.svc.cluster.local", + "database": "airbyte-k8s_db", + "username": "operator", + "password": db_password, + }, + "workspaceId": workspace_id, + "name": "Postgres", + } + + logger.info("creating Airbyte destination") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("destinationId") + + +def create_airbyte_connection(api_url, source_id, destination_id): + """Create Airbyte connection. + + Args: + api_url: Airbyte API base URL. + source_id: Airbyte source ID. + destination_id: Airbyte destination ID. + """ + url = f"{api_url}/v1/connections" + payload = { + "schedule": {"scheduleType": "manual"}, + "dataResidency": "auto", + "namespaceDefinition": "destination", + "namespaceFormat": None, + "nonBreakingSchemaUpdatesBehavior": "ignore", + "sourceId": source_id, + "destinationId": destination_id, + } + + logger.info("creating Airbyte connection") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("connectionId") + + +def trigger_airbyte_connection(api_url, connection_id): + """Trigger Airbyte connection. + + Args: + api_url: Airbyte API base URL. + connection_id: Airbyte connection ID. + """ + url = f"{api_url}/v1/jobs" + payload = {"jobType": "sync", "connectionId": connection_id} + logger.info("triggering Airbyte connection") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("jobId") + + +def check_airbyte_job_status(api_url, job_id): + """Get Airbyte sync job status. + + Args: + api_url: Airbyte API base URL. + job_id: Sync job ID. + """ + url = f"{api_url}/v1/jobs/{job_id}" + logger.info("fetching Airbyte job status") + response = requests.get(url, headers=GET_HEADERS, timeout=120) + logger.info(response.json()) + + return response.json().get("status") + + +def cancel_airbyte_job(api_url, job_id): + """Cancel Airbyte sync job. + + Args: + api_url: Airbyte API base URL. + job_id: Sync job ID. + """ + url = f"{api_url}/v1/jobs/{job_id}" + logger.info("cancelling Airbyte job") + response = requests.delete(url, headers=GET_HEADERS, timeout=120) + logger.info(response.json()) + + return response.json().get("status") + + +async def get_db_password(ops_test): + """Get PostgreSQL DB admin password. + + Args: + ops_test: PyTest object. + """ + postgresql_unit = ops_test.model.applications["postgresql-k8s"].units[0] + for i in range(10): + action = await postgresql_unit.run_action("get-password") + result = await action.wait() + logger.info(f"attempt {i} -> action result {result.status} {result.results}") + if "password" in result.results: + return result.results["password"] + time.sleep(2) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 78be12d..18052e0 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -3,11 +3,23 @@ # See LICENSE file for licensing details. import logging +import time import pytest import requests from conftest import deploy # noqa: F401, pylint: disable=W0611 -from helpers import APP_NAME_AIRBYTE_SERVER, get_unit_url +from helpers import ( + APP_NAME_AIRBYTE_SERVER, + cancel_airbyte_job, + check_airbyte_job_status, + create_airbyte_connection, + create_airbyte_destination, + create_airbyte_source, + get_airbyte_workspace_id, + get_db_password, + get_unit_url, + trigger_airbyte_connection, +) from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) @@ -23,6 +35,52 @@ async def test_deployment(self, ops_test: OpsTest): logger.info("curling app address: %s", url) response = requests.get(f"{url}/api/v1/health", timeout=300) - print(response.json()) + assert response.status_code == 200 assert response.json().get("available") + + async def test_sync_job(self, ops_test: OpsTest): + # Create connection + api_url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8006) + logger.info("curling app address: %s", api_url) + workspace_id = get_airbyte_workspace_id(api_url) + db_password = await get_db_password(ops_test) + assert db_password + + # Create Source + source_id = create_airbyte_source(api_url, workspace_id) + + # Create destination + destination_id = create_airbyte_destination(api_url, ops_test.model.name, workspace_id, db_password) + + # Create connection + connection_id = create_airbyte_connection(api_url, source_id, destination_id) + + # Trigger sync job + for i in range(2): + logger.info(f"attempt {i+1} to trigger new job") + job_id = trigger_airbyte_connection(api_url, connection_id) + + # Wait until job is successful + job_successful = False + for j in range(15): + logger.info(f"job {i+1} attempt {j+1}: getting job status") + status = check_airbyte_job_status(api_url, job_id) + + if status == "failed": + break + + if status == "succeeded": + logger.info(f"job {i+1} attempt {j+1}: job successful!") + job_successful = True + break + + logger.info(f"job {i+1} attempt {j+1}: job still running, retrying in 20 seconds") + time.sleep(20) + + if job_successful: + break + + cancel_airbyte_job(api_url, job_id) + + assert job_successful diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7368e18..ca13a03 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -375,6 +375,9 @@ def create_plan(container_name, storage_type): }, } + if container_name == "airbyte-api-server": + want_plan["services"][container_name]["environment"].update({"INTERNAL_API_HOST": f"http://airbyte-k8s:8001"}) + if storage_type == StorageType.minio: want_plan["services"][container_name]["environment"].update( {