Skip to content

Commit

Permalink
Merge pull request #56 from scale-vector/rfix/streamlit-pandas-helpers
Browse files Browse the repository at this point in the history
streamlit and pandas helpers
* allows setting `project_id` via config file/env when default GCP credentials are used. `project_id` if present overrides the one in default credentials
* experimental Streamlit helpers to: backup and restore pipeline state into `secrets.toml` and to write explore streamlit page to explore the schema
* experimental Pandas helper to read data from a destination into Panda frames using credentials already present in pipeline
* bumps to version `0.1.0rc11`
  • Loading branch information
rudolfix authored Aug 14, 2022
2 parents 5a49c64 + 58a9c98 commit 175a831
Show file tree
Hide file tree
Showing 41 changed files with 1,260 additions and 176 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/test_loader_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ on:
workflow_dispatch:

env:
PROJECT_ID: chat-analytics-rasa-ci
CLIENT_EMAIL: chat-analytics-loader@chat-analytics-rasa-ci.iam.gserviceaccount.com
PRIVATE_KEY: ${{ secrets.BQ_CRED_PRIVATE_KEY }}
TOKEN_URI: https://oauth2.googleapis.com/token
GCP__PROJECT_ID: chat-analytics-rasa-ci
GCP__CLIENT_EMAIL: chat-analytics-loader@chat-analytics-rasa-ci.iam.gserviceaccount.com
GCP__PRIVATE_KEY: ${{ secrets.BQ_CRED_PRIVATE_KEY }}
GCP__TOKEN_URI: https://oauth2.googleapis.com/token
DEFAULT_DATASET: workflowtest

jobs:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test_loader_redshift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ on:
workflow_dispatch:

env:
HOST: 3.73.90.3
USER: loader
PASSWORD: ${{ secrets.PG_PASSWORD }}
DBNAME: chat_analytics_rasa_ci
PG__HOST: 3.73.90.3
PG__USER: loader
PG__PASSWORD: ${{ secrets.PG_PASSWORD }}
PG__DBNAME: chat_analytics_rasa_ci
DEFAULT_DATASET: workflowtest

jobs:
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .run_configuration import RunConfiguration, BaseConfiguration # noqa: F401
from .run_configuration import RunConfiguration, BaseConfiguration, CredentialsConfiguration # noqa: F401
from .normalize_volume_configuration import NormalizeVolumeConfiguration, ProductionNormalizeVolumeConfiguration # noqa: F401
from .load_volume_configuration import LoadVolumeConfiguration, ProductionLoadVolumeConfiguration # noqa: F401
from .schema_volume_configuration import SchemaVolumeConfiguration, ProductionSchemaVolumeConfiguration # noqa: F401
from .pool_runner_configuration import PoolRunnerConfiguration, TPoolType # noqa: F401
from .gcp_client_credentials import GcpClientCredentials # noqa: F401
from .postgres_credentials import PostgresCredentials # noqa: F401
from .utils import make_configuration, TSecretValue # noqa: F401
from .utils import make_configuration # noqa: F401

from .exceptions import ( # noqa: F401
ConfigEntryMissingException, ConfigEnvValueCannotBeCoercedException, ConfigIntegrityException, ConfigFileNotFoundException)
9 changes: 7 additions & 2 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ def __init__(self, msg: str) -> None:
class ConfigEntryMissingException(ConfigurationException):
"""thrown when not all required config elements are present"""

def __init__(self, missing_set: Iterable[str]) -> None:
def __init__(self, missing_set: Iterable[str], namespace: str = None) -> None:
self.missing_set = missing_set
super().__init__('Missing config keys: ' + str(missing_set))
self.namespace = namespace

msg = 'Missing config keys: ' + str(missing_set)
if namespace:
msg += ". Note that required namespace for that keys is " + namespace + " and namespace separator is two underscores"
super().__init__(msg)


class ConfigEnvValueCannotBeCoercedException(ConfigurationException):
Expand Down
14 changes: 8 additions & 6 deletions dlt/common/configuration/gcp_client_credentials.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from dlt.common.typing import StrAny, StrStr
from dlt.common.configuration import BaseConfiguration
from dlt.common.configuration.utils import TSecretValue
from dlt.common.typing import StrAny, TSecretValue
from dlt.common.configuration import CredentialsConfiguration


class GcpClientCredentials(BaseConfiguration):
class GcpClientCredentials(CredentialsConfiguration):

__namespace__: str = "GCP"

PROJECT_ID: str = None
BQ_CRED_TYPE: str = "service_account"
CRED_TYPE: str = "service_account"
PRIVATE_KEY: TSecretValue = None
TOKEN_URI: str = "https://oauth2.googleapis.com/token"
CLIENT_EMAIL: str = None
Expand All @@ -22,7 +24,7 @@ def check_integrity(cls) -> None:
@classmethod
def as_credentials(cls) -> StrAny:
return {
"type": cls.BQ_CRED_TYPE,
"type": cls.CRED_TYPE,
"project_id": cls.PROJECT_ID,
"private_key": cls.PRIVATE_KEY,
"token_uri": cls.TOKEN_URI,
Expand Down
10 changes: 6 additions & 4 deletions dlt/common/configuration/postgres_credentials.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from dlt.common.configuration import BaseConfiguration
from dlt.common.configuration.utils import TSecretValue
from dlt.common.typing import StrAny
from dlt.common.typing import StrAny, TSecretValue
from dlt.common.configuration import CredentialsConfiguration


class PostgresCredentials(BaseConfiguration):
class PostgresCredentials(CredentialsConfiguration):

__namespace__: str = "PG"

DBNAME: str = None
PASSWORD: TSecretValue = None
USER: str = None
Expand Down
Empty file.
40 changes: 40 additions & 0 deletions dlt/common/configuration/providers/environ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from os import environ
from os.path import isdir
from typing import Any, Optional, Type

from dlt.common.typing import TSecretValue

SECRET_STORAGE_PATH: str = "/run/secrets/%s"


def get_key_name(key: str, namespace: str = None) -> str:
if namespace:
return namespace + "__" + key
else:
return key


def get_key(key: str, hint: Type[Any], namespace: str = None) -> Optional[str]:
# apply namespace to the key
key = get_key_name(key, namespace)
if hint is TSecretValue:
# try secret storage
try:
# must conform to RFC1123
secret_name = key.lower().replace("_", "-")
secret_path = SECRET_STORAGE_PATH % secret_name
# kubernetes stores secrets as files in a dir, docker compose plainly
if isdir(secret_path):
secret_path += "/" + secret_name
with open(secret_path, "r", encoding="utf-8") as f:
secret = f.read()
# add secret to environ so forks have access
# TODO: removing new lines is not always good. for password OK for PEMs not
# TODO: in regular secrets that is dealt with in particular configuration logic
environ[key] = secret.strip()
# do not strip returned secret
return secret
# includes FileNotFound
except OSError:
pass
return environ.get(key, None)
9 changes: 7 additions & 2 deletions dlt/common/configuration/run_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from os.path import isfile
from typing import Any, Optional, Tuple, IO

from dlt.common.typing import StrAny
from dlt.common.typing import StrAny, DictStrAny
from dlt.common.utils import encoding_for_mode
from dlt.common.configuration.exceptions import ConfigFileNotFoundException

Expand All @@ -14,9 +14,10 @@ class BaseConfiguration:

# will be set to true if not all config entries could be resolved
__is_partial__: bool = True
__namespace__: str = None

@classmethod
def as_dict(config, lowercase: bool = True) -> StrAny:
def as_dict(config, lowercase: bool = True) -> DictStrAny:
may_lower = lambda k: k.lower() if lowercase else k
return {may_lower(k):getattr(config, k) for k in dir(config) if not callable(getattr(config, k)) and not k.startswith("__")}

Expand All @@ -32,6 +33,10 @@ def apply_dict(config, values: StrAny, uppercase: bool = True, apply_non_spec: b
setattr(config, k, v)


class CredentialsConfiguration(BaseConfiguration):
pass


class RunConfiguration(BaseConfiguration):
PIPELINE_NAME: Optional[str] = None # the name of the component
SENTRY_DSN: Optional[str] = None # keep None to disable Sentry
Expand Down
56 changes: 19 additions & 37 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import sys
import semver
from os import environ
from os.path import isdir
from typing import Any, Dict, List, Mapping, Optional, Type, TypeVar, cast
from typing import Any, Dict, List, Mapping, Type, TypeVar, cast

from dlt.common.typing import StrAny, TSecretValue, is_optional_type, is_literal_type
from dlt.common.typing import StrAny, is_optional_type, is_literal_type
from dlt.common.configuration import BaseConfiguration
from dlt.common.configuration.providers import environ
from dlt.common.configuration.exceptions import (ConfigEntryMissingException,
ConfigEnvValueCannotBeCoercedException)
from dlt.common.utils import uniq_id
Expand All @@ -17,9 +16,9 @@
ALLOWED_TYPE_COERCIONS = [(float, int), (str, int), (str, float)]
IS_DEVELOPMENT_CONFIG_KEY: str = "IS_DEVELOPMENT_CONFIG"
CHECK_INTEGRITY_F: str = "check_integrity"
SECRET_STORAGE_PATH: str = "/run/secrets/%s"

TConfiguration = TypeVar("TConfiguration", bound=Type[BaseConfiguration])
# TODO: remove production configuration support
TProductionConfiguration = TypeVar("TProductionConfiguration", bound=Type[BaseConfiguration])


Expand Down Expand Up @@ -61,12 +60,18 @@ def is_direct_descendant(child: Type[Any], base: Type[Any]) -> bool:


def _is_development_config() -> bool:
is_dev_config = True

# get from environment
if IS_DEVELOPMENT_CONFIG_KEY in environ:
is_dev_config = _coerce_single_value(IS_DEVELOPMENT_CONFIG_KEY, environ[IS_DEVELOPMENT_CONFIG_KEY], bool)
return is_dev_config
is_dev_config: bool = None
try:
is_dev_config = _coerce_single_value(IS_DEVELOPMENT_CONFIG_KEY, environ.get_key(IS_DEVELOPMENT_CONFIG_KEY, bool), bool)
except ConfigEnvValueCannotBeCoercedException as coer_exc:
# pass for None: this key may not be present
if coer_exc.env_value is None:
pass
else:
# anything else that cannot corece must raise
raise
return True if is_dev_config is None else is_dev_config


def _add_module_version(config: TConfiguration) -> None:
Expand All @@ -80,44 +85,21 @@ def _add_module_version(config: TConfiguration) -> None:

def _apply_environ_to_config(config: TConfiguration, keys_in_config: Mapping[str, type]) -> None:
for key, hint in keys_in_config.items():
value = _get_key_value(key, hint)
value = environ.get_key(key, hint, config.__namespace__)
if value is not None:
value_from_environment_variable = _coerce_single_value(key, value, hint)
# set value
setattr(config, key, value_from_environment_variable)


def _get_key_value(key: str, hint: Type[Any]) -> Optional[str]:
if hint is TSecretValue:
# try secret storage
try:
# must conform to RFC1123
secret_name = key.lower().replace("_", "-")
secret_path = SECRET_STORAGE_PATH % secret_name
# kubernetes stores secrets as files in a dir, docker compose plainly
if isdir(secret_path):
secret_path += "/" + secret_name
with open(secret_path, "r", encoding="utf-8") as f:
secret = f.read()
# add secret to environ so forks have access
# TODO: removing new lines is not always good. for password OK for PEMs not
# TODO: in regular secrets that is dealt with in particular configuration logic
environ[key] = secret.strip()
# do not strip returned secret
return secret
# includes FileNotFound
except OSError:
pass
return environ.get(key, None)


def _is_config_bounded(config: TConfiguration, keys_in_config: Mapping[str, type]) -> None:
# TODO: here we assume all keys are taken from environ provider, that should change when we introduce more providers
_unbound_attrs = [
key for key in keys_in_config if getattr(config, key) is None and not is_optional_type(keys_in_config[key])
environ.get_key_name(key, config.__namespace__) for key in keys_in_config if getattr(config, key) is None and not is_optional_type(keys_in_config[key])
]

if len(_unbound_attrs) > 0:
raise ConfigEntryMissingException(_unbound_attrs)
raise ConfigEntryMissingException(_unbound_attrs, config.__namespace__)


def _check_configuration_integrity(config: TConfiguration) -> None:
Expand Down
9 changes: 8 additions & 1 deletion dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import hashlib
from os import environ
import secrets
from typing import Any, Iterator, Optional, Sequence, TypeVar, Mapping, List, TypedDict, Union
from typing import Any, Dict, Iterator, Optional, Sequence, TypeVar, Mapping, List, TypedDict, Union

from dlt.common.typing import StrAny, DictStrAny, StrStr, TFun

Expand Down Expand Up @@ -126,6 +126,13 @@ def is_interactive() -> bool:
return not hasattr(main, '__file__')


def dict_remove_nones_in_place(d: Dict[Any, Any]) -> Dict[Any, Any]:
for k in list(d.keys()):
if d[k] is None:
del d[k]
return d


@contextmanager
def custom_environ(env: StrStr) -> Iterator[None]:
"""Temporarily set environment variables inside the context manager and
Expand Down
78 changes: 78 additions & 0 deletions dlt/common/wei.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import functools
import hexbytes
from typing import Any

from dlt.common import Decimal
from dlt.common.arithmetics import Context, default_context, decimal
from dlt.common.typing import StrAny

# default scale of platform contracts
WEI_SCALE = 18
# log(2^256) + 1
EVM_DECIMAL_PRECISION = 78
# value of one at wei scale
WEI_SCALE_POW = 10**18


# class WeiDecimal(Decimal):
# ctx = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION)
# def __new__(cls, *args: Any, **kwargs: Any) -> "Wei":
# c = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION)
# d = super(WeiDecimal, cls).__new__(cls, *args, **kwargs, context=c)
# d.c = c
# # d.c = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION)
# return d

# def __getattribute__(self, __name: str) -> Any:
# rv = super().__getattribute__(__name)
# if callable(rv) and not __name.startswith("_"):
# if "context" in rv.func_code.co_varnames:
# return functools.partialmethod(rv, context=self.c)
# return rv

# def __repr__(self) -> str:
# return super().__repr__().replace("Decimal", "Wei")


class Wei(Decimal):

ctx = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION)

# def __slots__hints__(self) -> None:
# self._scale: int = 0

# def __new__(cls, value: int, scale: int = 0) -> "Wei":
# self = super(Wei, cls).__new__(cls, value)
# self._scale = scale
# return self

# def __init__(self, value: int, scale: int = 0) -> None:
# self._c = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION)
# self.value = value
# self.scale = scale


# def to_decimal():
# pass

# def __get__(self, obj, type=None) -> object:
# print("GET!")
# if self.normalize(self.ctx) > 100:
# return "STR"
# else:
# return self

def __new__(cls, value: int, scale: int = 0) -> "Wei":
d: "Wei" = None
if scale == 0:
d = super(Wei, cls).__new__(cls, value)
else:
d = super(Wei, cls).__new__(cls, Decimal(value, context=cls.ctx) / 10**scale)

return d

# def from_uint256(value: int, scale: int = 0):
# pass

# # def __repr__(self) -> str:
# # return f"{self.scale},{self.value}"
Loading

0 comments on commit 175a831

Please sign in to comment.