Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data loading: Refactor to use more OO. Add examples/cloud_import.py. #80

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
- InfluxDB: Add adapter for `influxio`
- MongoDB: Add `migr8` program from previous repository
- MongoDB: Improve UX by using `ctk load table mongodb://...`
- load table: Refactor to use more OO
- Add `examples/cloud_import.py`


## 2023/11/06 v0.0.2
Expand Down
Empty file added cratedb_toolkit/api/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions cratedb_toolkit/api/guide.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
class GuidingTexts:
"""
TODO: Add more richness / guidance to the text output.
"""

def __init__(self, admin_url: str = None, table_name: str = None):
self.admin_url = admin_url
self.table_name = table_name

def success(self):
return f"""
Excellent, that worked well.

Now, you may want to inquire your data. To do that, use either CrateDB Admin UI,
or connect on your terminal using `crash`, `ctk shell`, or `psql`.

The CrateDB Admin UI for your cluster is available at [1]. To easily inspect a
few samples of your imported data, or to check the cardinality of your database
table, run [2] or [3]. If you want to export your data again, see [4].

[1] {self.admin_url}
[2] ctk shell --command 'SELECT * FROM {self.table_name} LIMIT 10;'
[3] ctk shell --command 'SELECT COUNT(*) FROM {self.table_name};'
[4] https://community.cratedb.com/t/cratedb-cloud-news-simple-data-export/1556
""" # noqa: S608

def error(self):
return """
That went south.

If you can share your import source, we will love to hear from you on our community
forum [1]. Otherwise, please send us an email [2] about the flaw you've discovered.
To learn more about the data import feature, see [3].

[1] https://community.cratedb.com/
[2] [email protected]
[3] https://community.cratedb.com/t/importing-data-to-cratedb-cloud-clusters/1467
"""
125 changes: 125 additions & 0 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import abc
import dataclasses
import json
import logging
import typing as t
from abc import abstractmethod

from cratedb_toolkit.api.guide import GuidingTexts
from cratedb_toolkit.cluster.util import get_cluster_info
from cratedb_toolkit.exception import CroudException, OperationFailed
from cratedb_toolkit.io.croud import CloudIo
from cratedb_toolkit.model import ClusterInformation, DatabaseAddress, InputOutputResource, TableAddress

logger = logging.getLogger(__name__)


class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress):
raise NotImplementedError("Child class needs to implement this method")


@dataclasses.dataclass
class ManagedCluster(ClusterBase):
"""
Wrap a managed CrateDB database cluster on CrateDB Cloud.
"""

cloud_id: str
address: t.Optional[DatabaseAddress] = None
info: t.Optional[ClusterInformation] = None

def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None):
"""
Load data into a database table on CrateDB Cloud.

Synopsis
--------
export CRATEDB_CLOUD_CLUSTER_ID=95998958-4d96-46eb-a77a-a894e7dde128
ctk load table https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_weather.csv.gz

https://console.cratedb.cloud
"""

target = target or TableAddress()

try:
cluster_info = get_cluster_info(cluster_id=self.cloud_id)
logger.info(
f"Cluster information: name={cluster_info.cloud.get('name')}, url={cluster_info.cloud.get('url')}"
)
cio = CloudIo(cluster_id=self.cloud_id)
except CroudException as ex:
msg = f"Connecting to cluster resource failed: {self.cloud_id}. Reason: {ex}"
if "Resource not found" in str(ex):
logger.error(msg)
return None, False
logger.exception(msg)
raise OperationFailed(msg) from ex

try:
job_info, success = cio.load_resource(resource=resource, target=target)
logger.info("Job information:\n%s", json.dumps(job_info, indent=2))
# TODO: Explicitly report about `failed_records`, etc.
texts = GuidingTexts(
admin_url=cluster_info.cloud["url"],
table_name=job_info["destination"]["table"],
)
if success:
logger.info("Data loading was successful: %s", texts.success())
return job_info, success
else:
# TODO: Add "reason" to exception message.
logger.error(f"Data loading failed: {texts.error()}")
raise OperationFailed("Data loading failed")

# When exiting so, it is expected that error logging has taken place appropriately.
except CroudException as ex:
msg = "Data loading failed: Unknown error"
logger.exception(msg)
raise OperationFailed(msg) from ex


@dataclasses.dataclass
class StandaloneCluster(ClusterBase):
"""
Wrap a standalone CrateDB database cluster.
"""

address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress):
"""
Load data into a database table on a standalone CrateDB Server.

Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo

ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = resource.url
target_url = self.address.dburi
if source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

source_url = source_url.replace("influxdb2://", "http://")
if not influxdb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
else:
raise NotImplementedError("Importing resource not implemented yet")
2 changes: 1 addition & 1 deletion cratedb_toolkit/cluster/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.cluster.util import get_cluster_info
from cratedb_toolkit.exception import CroudException
from cratedb_toolkit.util import jd
from cratedb_toolkit.util.cli import boot_click, make_command
from cratedb_toolkit.util.croud import CroudException


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
Expand Down
8 changes: 8 additions & 0 deletions cratedb_toolkit/exception.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
class TableNotFound(Exception):
pass


class OperationFailed(Exception):
pass


class CroudException(Exception):
pass
23 changes: 18 additions & 5 deletions cratedb_toolkit/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
A one-stop command `ctk load table` to load data into CrateDB database tables.


## Installation

Latest release.
```shell
pip install --upgrade 'cratedb-toolkit[all]'
```

Development version.
```shell
pip install --upgrade 'cratedb-toolkit[all] @ git+https://github.com/crate-workbench/cratedb-toolkit.git'
```

## General Notes

By default, the table name will be derived from the name of the input resource.
Expand Down Expand Up @@ -48,16 +60,17 @@ export CRATEDB_PASSWORD='3$MJ5fALP8bNOYCYBMLOrzd&'
```

### Usage
Load data.
Load data into database table.
```shell
ctk load table https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_weather.csv.gz
ctk load table https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_marketing.json.gz
ctk load table https://github.com/daq-tools/skeem/raw/main/tests/testdata/basic.parquet
ctk load table 'https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_weather.csv.gz'
ctk load table 'https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_marketing.json.gz'
ctk load table 'https://github.com/crate/cratedb-datasets/raw/main/timeseries/yc.2019.07-tiny.parquet.gz'
```

Inquire data.
Query and aggregate data using SQL.
```shell
ctk shell --command="SELECT * FROM data_weather LIMIT 10;"
ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=csv
ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=json
```

Expand Down
Loading