Skip to content

Commit

Permalink
test sample airbyte connection
Browse files Browse the repository at this point in the history
  • Loading branch information
kelkawi-a committed Jun 13, 2024
1 parent e94b94f commit 4c0fdff
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 13 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Setup operator environment
uses: charmed-kubernetes/actions-operator@main
with:
juju-channel: 3.1/stable
juju-channel: 3.4/stable
provider: microk8s
microk8s-addons: "ingress storage dns rbac registry"
channel: 1.25-strict/stable
Expand All @@ -24,5 +24,5 @@ jobs:
uses: canonical/charm-logdump-action@main
if: failure()
with:
app: airbyte-ui-k8s
app: airbyte-k8s
model: testing
5 changes: 3 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from charm_helpers import create_env
from literals import (
AIRBYTE_API_PORT,
BUCKET_CONFIGS,
CONNECTOR_BUILDER_SERVER_API_PORT,
CONTAINERS,
Expand Down Expand Up @@ -282,8 +283,7 @@ def _update(self, event):
self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}")
return

env = create_env(self.model.name, self.app.name, self.config, self._state)
self.model.unit.set_ports(INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT)
self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT)

for container_name in list(CONTAINERS.keys()):
container = self.unit.get_container(container_name)
Expand All @@ -303,6 +303,7 @@ def _update(self, event):
permissions=0o755,
)

env = create_env(self.model.name, self.app.name, container_name, self.config, self._state)
pebble_layer = get_pebble_layer(container_name, env)
container.add_layer(container_name, pebble_layer, combine=True)
container.replan()
Expand Down
7 changes: 6 additions & 1 deletion src/charm_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from structured_config import StorageType


def create_env(model_name, app_name, config, state):
def create_env(model_name, app_name, container_name, config, state):
"""Create set of environment variables for application.
Args:
model_name: Name of the juju model.
app_name: Name of the application.
container_name: Name of Airbyte container.
config: Charm config.
state: Charm state.
Expand Down Expand Up @@ -70,6 +71,10 @@ def create_env(model_name, app_name, config, state):
"AIRBYTE_URL": config["webapp-url"],
}

# https://github.com/airbytehq/airbyte/issues/29506#issuecomment-1775148609
if container_name == "airbyte-api-server":
env.update({"INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}"})

if config["storage-type"].value == StorageType.minio and state.minio:
minio_endpoint = construct_svc_endpoint(
state.minio["service"],
Expand Down
13 changes: 10 additions & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_airbyte_charm_resources,
perform_airbyte_integrations,
perform_temporal_integrations,
run_sample_workflow,
)
from pytest_operator.plugin import OpsTest

Expand All @@ -24,6 +25,8 @@
@pytest_asyncio.fixture(name="deploy", scope="module")
async def deploy(ops_test: OpsTest):
"""Test the app is up and running."""
await ops_test.model.set_config({"update-status-hook-interval": "1m"})

charm = await ops_test.build_charm(".")
resources = get_airbyte_charm_resources()

Expand All @@ -32,16 +35,19 @@ async def deploy(ops_test: OpsTest):
ops_test.model.deploy(
APP_NAME_TEMPORAL_SERVER,
channel="edge",
config={"num-history-shards": 1},
config={"num-history-shards": 512},
),
ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge"),
ops_test.model.deploy("postgresql-k8s", channel="14/stable", trust=True),
ops_test.model.deploy("postgresql-k8s", channel="14/edge", trust=True),
ops_test.model.deploy("minio", channel="edge"),
)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=["postgresql-k8s", "minio"], status="active", raise_on_blocked=False, timeout=1200
apps=["postgresql-k8s", "minio"],
status="active",
raise_on_blocked=False,
timeout=1200,
)
await ops_test.model.wait_for_idle(
apps=[APP_NAME_TEMPORAL_SERVER, APP_NAME_TEMPORAL_ADMIN],
Expand All @@ -52,5 +58,6 @@ async def deploy(ops_test: OpsTest):

await perform_temporal_integrations(ops_test)
await create_default_namespace(ops_test)
await run_sample_workflow(ops_test)

await perform_airbyte_integrations(ops_test)
172 changes: 169 additions & 3 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Temporal charm integration test helpers."""
"""Charm integration test helpers."""

import logging
import time
from pathlib import Path

import requests
import yaml
from pytest_operator.plugin import OpsTest
from temporal_client.activities import say_hello
Expand All @@ -22,6 +24,9 @@
APP_NAME_TEMPORAL_ADMIN = "temporal-admin-k8s"
APP_NAME_TEMPORAL_UI = "temporal-ui-k8s"

GET_HEADERS = {"accept": "application/json"}
POST_HEADERS = {"accept": "application/json", "content-type": "application/json"}


def get_airbyte_charm_resources():
return {
Expand Down Expand Up @@ -117,7 +122,7 @@ async def perform_temporal_integrations(ops_test: OpsTest):
await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:visibility", "postgresql-k8s:database")
await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:admin", f"{APP_NAME_TEMPORAL_ADMIN}:admin")
await ops_test.model.wait_for_idle(
apps=[APP_NAME_TEMPORAL_SERVER], status="active", raise_on_blocked=False, timeout=180
apps=[APP_NAME_TEMPORAL_SERVER, "postgresql-k8s"], status="active", raise_on_blocked=False, timeout=180
)

assert ops_test.model.applications[APP_NAME_TEMPORAL_SERVER].units[0].workload_status == "active"
Expand All @@ -132,7 +137,168 @@ async def perform_airbyte_integrations(ops_test: OpsTest):
await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "postgresql-k8s")
await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "minio")
await ops_test.model.wait_for_idle(
apps=[APP_NAME_AIRBYTE_SERVER], status="active", raise_on_blocked=False, timeout=600
apps=[APP_NAME_AIRBYTE_SERVER, "postgresql-k8s", "minio"],
status="active",
raise_on_blocked=False,
wait_for_active=True,
idle_period=60,
timeout=600,
)

assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active"


def get_airbyte_workspace_id(api_url):
"""Get Airbyte default workspace ID.
Args:
api_url: Airbyte API base URL.
"""
url = f"{api_url}/v1/workspaces?includeDeleted=false&limit=20&offset=0"
logger.info("fetching Airbyte workspace ID")
response = requests.get(url, headers=GET_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("data")[0]["workspaceId"]


def create_airbyte_source(api_url, workspace_id):
"""Create Airbyte sample source.
Args:
api_url: Airbyte API base URL.
workspace_id: default workspace ID.
"""
url = f"{api_url}/v1/sources"
payload = {
"configuration": {"sourceType": "pokeapi", "pokemon_name": "pikachu"},
"name": "API Test",
"workspaceId": workspace_id,
}

logger.info("creating Airbyte source")
response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("sourceId")


def create_airbyte_destination(api_url, model_name, workspace_id, db_password):
"""Create Airbyte sample destination.
Args:
api_url: Airbyte API base URL.
model_name: name of the juju model.
workspace_id: default workspace ID.
password: database password.
"""
url = f"{api_url}/v1/destinations"
payload = {
"configuration": {
"destinationType": "postgres",
"port": 5432,
"schema": "pokeapi",
"ssl_mode": {"mode": "disable"},
"tunnel_method": {"tunnel_method": "NO_TUNNEL"},
"host": f"postgresql-k8s-primary.{model_name}.svc.cluster.local",
"database": "airbyte-k8s_db",
"username": "operator",
"password": db_password,
},
"workspaceId": workspace_id,
"name": "Postgres",
}

logger.info("creating Airbyte destination")
response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("destinationId")


def create_airbyte_connection(api_url, source_id, destination_id):
"""Create Airbyte connection.
Args:
api_url: Airbyte API base URL.
source_id: Airbyte source ID.
destination_id: Airbyte destination ID.
"""
url = f"{api_url}/v1/connections"
payload = {
"schedule": {"scheduleType": "manual"},
"dataResidency": "auto",
"namespaceDefinition": "destination",
"namespaceFormat": None,
"nonBreakingSchemaUpdatesBehavior": "ignore",
"sourceId": source_id,
"destinationId": destination_id,
}

logger.info("creating Airbyte connection")
response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("connectionId")


def trigger_airbyte_connection(api_url, connection_id):
"""Trigger Airbyte connection.
Args:
api_url: Airbyte API base URL.
connection_id: Airbyte connection ID.
"""
url = f"{api_url}/v1/jobs"
payload = {"jobType": "sync", "connectionId": connection_id}
logger.info("triggering Airbyte connection")
response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("jobId")


def check_airbyte_job_status(api_url, job_id):
"""Get Airbyte sync job status.
Args:
api_url: Airbyte API base URL.
job_id: Sync job ID.
"""
url = f"{api_url}/v1/jobs/{job_id}"
logger.info("fetching Airbyte job status")
response = requests.get(url, headers=GET_HEADERS, timeout=120)
logger.info(response.json())

return response.json().get("status")


def cancel_airbyte_job(api_url, job_id):
"""Cancel Airbyte sync job.
Args:
api_url: Airbyte API base URL.
job_id: Sync job ID.
"""
url = f"{api_url}/v1/jobs/{job_id}"
logger.info("cancelling Airbyte job")
response = requests.delete(url, headers=GET_HEADERS, timeout=120)
logger.info(response.json())

return response.json().get("status")


async def get_db_password(ops_test):
"""Get PostgreSQL DB admin password.
Args:
ops_test: PyTest object.
"""
postgresql_unit = ops_test.model.applications["postgresql-k8s"].units[0]
for i in range(10):
action = await postgresql_unit.run_action("get-password")
result = await action.wait()
logger.info(f"attempt {i} -> action result {result.status} {result.results}")
if "password" in result.results:
return result.results["password"]
time.sleep(2)
62 changes: 60 additions & 2 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,23 @@
# See LICENSE file for licensing details.

import logging
import time

import pytest
import requests
from conftest import deploy # noqa: F401, pylint: disable=W0611
from helpers import APP_NAME_AIRBYTE_SERVER, get_unit_url
from helpers import (
APP_NAME_AIRBYTE_SERVER,
cancel_airbyte_job,
check_airbyte_job_status,
create_airbyte_connection,
create_airbyte_destination,
create_airbyte_source,
get_airbyte_workspace_id,
get_db_password,
get_unit_url,
trigger_airbyte_connection,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)
Expand All @@ -23,6 +35,52 @@ async def test_deployment(self, ops_test: OpsTest):
logger.info("curling app address: %s", url)

response = requests.get(f"{url}/api/v1/health", timeout=300)
print(response.json())

assert response.status_code == 200
assert response.json().get("available")

async def test_sync_job(self, ops_test: OpsTest):
# Create connection
api_url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8006)
logger.info("curling app address: %s", api_url)
workspace_id = get_airbyte_workspace_id(api_url)
db_password = await get_db_password(ops_test)
assert db_password

# Create Source
source_id = create_airbyte_source(api_url, workspace_id)

# Create destination
destination_id = create_airbyte_destination(api_url, ops_test.model.name, workspace_id, db_password)

# Create connection
connection_id = create_airbyte_connection(api_url, source_id, destination_id)

# Trigger sync job
for i in range(3):
logger.info(f"attempt {i+1} to trigger new job")
job_id = trigger_airbyte_connection(api_url, connection_id)

# Wait until job is successful
job_successful = False
for j in range(15):
logger.info(f"job {i+1} attempt {j+1}: getting job status")
status = check_airbyte_job_status(api_url, job_id)

if status == "failed":
break

if status == "succeeded":
logger.info(f"job {i+1} attempt {j+1}: job successful!")
job_successful = True
break

logger.info(f"job {i+1} attempt {j+1}: job still running, retrying in 20 seconds")
time.sleep(20)

if job_successful:
break

cancel_airbyte_job(api_url, job_id)

assert job_successful
Loading

0 comments on commit 4c0fdff

Please sign in to comment.