Skip to content

Commit

Permalink
Merge pull request #53 from scale-vector/rfix/pipeline-api-v2
Browse files Browse the repository at this point in the history
pre api v2 dlt engine update
  • Loading branch information
rudolfix authored Aug 9, 2022
2 parents 94d3d79 + 76396f0 commit 5a49c64
Show file tree
Hide file tree
Showing 180 changed files with 5,383 additions and 2,465 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ jobs:
run: poetry install --no-interaction

- run: |
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/loaders --ignore=tests/dbt_runner
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/load --ignore=tests/dbt_runner
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests --ignore=tests/loaders --ignore=tests/dbt_runner -m "not forked"
poetry run pytest tests --ignore=tests/load --ignore=tests/dbt_runner -m "not forked"
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ on:
workflow_dispatch:

env:
NAME: workflow
PROJECT_ID: chat-analytics-rasa-ci
BQ_CRED_CLIENT_EMAIL: chat-analytics-loader@chat-analytics-rasa-ci.iam.gserviceaccount.com
BQ_CRED_PRIVATE_KEY: ${{ secrets.BQ_CRED_PRIVATE_KEY }}
BQ_CRED_TOKEN_URI: https://oauth2.googleapis.com/token
DATASET: workflowtest
CLIENT_EMAIL: chat-analytics-loader@chat-analytics-rasa-ci.iam.gserviceaccount.com
PRIVATE_KEY: ${{ secrets.BQ_CRED_PRIVATE_KEY }}
TOKEN_URI: https://oauth2.googleapis.com/token
DEFAULT_DATASET: workflowtest

jobs:

Expand Down Expand Up @@ -61,17 +60,17 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --no-root -E gcp
run: poetry install --no-interaction --no-root -E bigquery

- name: Install self
run: poetry install --no-interaction

- run: |
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/unpacker --ignore=tests/loaders/redshift --ignore=tests/dbt_runner -k '(not redshift_client)'
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -k '(not redshift_client)'
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests --ignore=tests/common --ignore=tests/unpacker --ignore=tests/loaders/redshift --ignore=tests/dbt_runner -m "not forked" -k "(not redshift_client)"
poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -m "not forked" -k "(not redshift_client)"
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
15 changes: 7 additions & 8 deletions .github/workflows/test_loader_redshift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ on:
workflow_dispatch:

env:
NAME: workflow
PG_HOST: 3.73.90.3
PG_USER: loader
PG_PASSWORD: ${{ secrets.PG_PASSWORD }}
PG_DATABASE_NAME: chat_analytics_rasa_ci
PG_SCHEMA_PREFIX: workflowtest
HOST: 3.73.90.3
USER: loader
PASSWORD: ${{ secrets.PG_PASSWORD }}
DBNAME: chat_analytics_rasa_ci
DEFAULT_DATASET: workflowtest

jobs:

Expand Down Expand Up @@ -67,11 +66,11 @@ jobs:
run: poetry install --no-interaction

- run: |
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/unpacker --ignore=tests/loaders/gcp --ignore=tests/dbt_runner -k '(not bigquery_client)'
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -k '(not bigquery_client)'
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests --ignore=tests/common --ignore=tests/unpacker --ignore=tests/loaders/gcp --ignore=tests/dbt_runner -m "not forked" -k "(not bigquery_client)"
poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -m "not forked" -k "(not bigquery_client)"
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ _test_storage
**/_test_storage
_secrets/
_backup_storage/
experiments
experiments/*
!experiments/
!experiments/pipeline/
!experiments/pipeline/*
secrets.toml

# Byte-compiled / optimized / DLL files
**/__pycache__/
Expand Down
2 changes: 1 addition & 1 deletion dlt/_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
common_version = "0.1.0"
loader_version = "0.1.0"
unpacker_version = "0.1.0"
normalize_version = "0.1.0"
14 changes: 7 additions & 7 deletions dlt/cli/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def str2bool_a(v: str) -> bool:
def main() -> None:
parser = argparse.ArgumentParser(description="Runs various DLT modules", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers(dest="command")
unpack = subparsers.add_parser("unpack", help="Runs unpacker")
add_pool_cli_arguments(unpack)
normalize = subparsers.add_parser("normalize", help="Runs normalize")
add_pool_cli_arguments(normalize)
load = subparsers.add_parser("load", help="Runs loader")
add_pool_cli_arguments(load)
dbt = subparsers.add_parser("dbt", help="Executes dbt package")
Expand All @@ -35,11 +35,11 @@ def main() -> None:
args = parser.parse_args()
run_f: Callable[[TRunArgs], None] = None

if args.command == "unpack":
from dlt.unpacker.unpacker import run_main as unpacker_run
run_f = unpacker_run
if args.command == "normalize":
from dlt.normalize.normalize import run_main as normalize_run
run_f = normalize_run
elif args.command == "load":
from dlt.loaders.loader import run_main as loader_run
from dlt.load.load import run_main as loader_run
run_f = loader_run
elif args.command == "dbt":
from dlt.dbt_runner.runner import run_main as dbt_run
Expand All @@ -54,7 +54,7 @@ def main() -> None:
if args.format == "json":
schema_str = json.dumps(s.to_dict(remove_defaults=args.remove_defaults), indent=2)
else:
schema_str = s.as_yaml(remove_defaults=args.remove_defaults)
schema_str = s.to_pretty_yaml(remove_defaults=args.remove_defaults)
print(schema_str)
exit(0)
else:
Expand Down
33 changes: 19 additions & 14 deletions dlt/common/arithmetics.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
import decimal # noqa: I251
from contextlib import contextmanager
from typing import Iterator
from decimal import ROUND_HALF_UP, Decimal, DefaultContext, DivisionByZero, InvalidOperation, localcontext, Context, ConversionSyntax # noqa: I251
from decimal import ROUND_HALF_UP, Decimal, Inexact, DivisionByZero, DefaultContext, InvalidOperation, localcontext, Context, Subnormal, ConversionSyntax # noqa: I251


DEFAULT_NUMERIC_PRECISION = 38
DEFAULT_NUMERIC_SCALE = 9

NUMERIC_DEFAULT_QUANTIZER = Decimal("1." + "0" * DEFAULT_NUMERIC_SCALE)

DefaultContext.rounding = ROUND_HALF_UP
# use small caps for exponent
DefaultContext.capitals = 0
# use 128 bit precision which is default in most databases
DefaultContext.prec = DEFAULT_NUMERIC_PRECISION
# prevent NaN to be returned
DefaultContext.traps[InvalidOperation] = True
# prevent Inf to be returned
DefaultContext.traps[DivisionByZero] = True
decimal.setcontext(DefaultContext)

def default_context(c: Context, precision: int) -> Context:
c.rounding = ROUND_HALF_UP
# prevent NaN to be returned
c.traps[InvalidOperation] = True
# prevent Inf to be returned
c.traps[DivisionByZero] = True
# force exact operations - prevents unknown rounding
c.traps[Inexact] = True
c.traps[Subnormal] = True
# use 128 bit precision which is default in most databases (DEFAULT_NUMERIC_PRECISION)
c.prec = precision

return c


@contextmanager
def numeric_default_context(precision: int = DEFAULT_NUMERIC_PRECISION) -> Iterator[Context]:
with localcontext() as c:
c.prec = precision
yield c
yield default_context(c, precision)


def numeric_default_quantize(v: Decimal) -> Decimal:
if v == 0:
return v
return v.quantize(NUMERIC_DEFAULT_QUANTIZER)
c = decimal.getcontext().copy()
c.traps[Inexact] = False
return v.quantize(NUMERIC_DEFAULT_QUANTIZER, context=c)
12 changes: 6 additions & 6 deletions dlt/common/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .basic_configuration import BasicConfiguration # noqa: F401
from .unpacking_volume_configuration import UnpackingVolumeConfiguration, ProductionUnpackingVolumeConfiguration # noqa: F401
from .loading_volume_configuration import LoadingVolumeConfiguration, ProductionLoadingVolumeConfiguration # noqa: F401
from .run_configuration import RunConfiguration, BaseConfiguration # noqa: F401
from .normalize_volume_configuration import NormalizeVolumeConfiguration, ProductionNormalizeVolumeConfiguration # noqa: F401
from .load_volume_configuration import LoadVolumeConfiguration, ProductionLoadVolumeConfiguration # noqa: F401
from .schema_volume_configuration import SchemaVolumeConfiguration, ProductionSchemaVolumeConfiguration # noqa: F401
from .pool_runner_configuration import PoolRunnerConfiguration, TPoolType # noqa: F401
from .gcp_client_configuration import GcpClientConfiguration, GcpClientProductionConfiguration # noqa: F401
from .postgres_configuration import PostgresConfiguration, PostgresProductionConfiguration # noqa: F401
from .utils import make_configuration, TSecretValue, open_configuration_file # noqa: F401
from .gcp_client_credentials import GcpClientCredentials # noqa: F401
from .postgres_credentials import PostgresCredentials # noqa: F401
from .utils import make_configuration, TSecretValue # noqa: F401

from .exceptions import ( # noqa: F401
ConfigEntryMissingException, ConfigEnvValueCannotBeCoercedException, ConfigIntegrityException, ConfigFileNotFoundException)
26 changes: 0 additions & 26 deletions dlt/common/configuration/basic_configuration.py

This file was deleted.

34 changes: 0 additions & 34 deletions dlt/common/configuration/gcp_client_configuration.py

This file was deleted.

30 changes: 30 additions & 0 deletions dlt/common/configuration/gcp_client_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dlt.common.typing import StrAny, StrStr
from dlt.common.configuration import BaseConfiguration
from dlt.common.configuration.utils import TSecretValue


class GcpClientCredentials(BaseConfiguration):
PROJECT_ID: str = None
BQ_CRED_TYPE: str = "service_account"
PRIVATE_KEY: TSecretValue = None
TOKEN_URI: str = "https://oauth2.googleapis.com/token"
CLIENT_EMAIL: str = None

HTTP_TIMEOUT: float = 15.0
RETRY_DEADLINE: float = 600

@classmethod
def check_integrity(cls) -> None:
if cls.PRIVATE_KEY and cls.PRIVATE_KEY[-1] != "\n":
# must end with new line, otherwise won't be parsed by Crypto
cls.PRIVATE_KEY = TSecretValue(cls.PRIVATE_KEY + "\n")

@classmethod
def as_credentials(cls) -> StrAny:
return {
"type": cls.BQ_CRED_TYPE,
"project_id": cls.PROJECT_ID,
"private_key": cls.PRIVATE_KEY,
"token_uri": cls.TOKEN_URI,
"client_email": cls.CLIENT_EMAIL
}
11 changes: 11 additions & 0 deletions dlt/common/configuration/load_volume_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os

from dlt.common.configuration.run_configuration import BaseConfiguration


class LoadVolumeConfiguration(BaseConfiguration):
LOAD_VOLUME_PATH: str = os.path.join("_storage", "load") # path to volume where files to be loaded to analytical storage are stored
DELETE_COMPLETED_JOBS: bool = False # if set to true the folder with completed jobs will be deleted

class ProductionLoadVolumeConfiguration(LoadVolumeConfiguration):
LOAD_VOLUME_PATH: str = None
6 changes: 0 additions & 6 deletions dlt/common/configuration/loading_volume_configuration.py

This file was deleted.

11 changes: 11 additions & 0 deletions dlt/common/configuration/normalize_volume_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os

from dlt.common.configuration import BaseConfiguration


class NormalizeVolumeConfiguration(BaseConfiguration):
NORMALIZE_VOLUME_PATH: str = os.path.join("_storage", "normalize") # path to volume where normalized loader files will be stored


class ProductionNormalizeVolumeConfiguration(NormalizeVolumeConfiguration):
NORMALIZE_VOLUME_PATH: str = None
6 changes: 3 additions & 3 deletions dlt/common/configuration/pool_runner_configuration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Literal, Optional
from dlt.common.configuration import BasicConfiguration
from dlt.common.configuration import RunConfiguration

TPoolType = Literal["process", "thread", "none"]

class PoolRunnerConfiguration(BasicConfiguration):
MAX_PARALLELISM: Optional[int] = None # how many threads/processes in the pool
class PoolRunnerConfiguration(RunConfiguration):
WORKERS: Optional[int] = None # how many threads/processes in the pool
EXIT_ON_EXCEPTION: bool = False # should exit on exception
STOP_AFTER_RUNS: int = 10000 # will stop runner with exit code -2 after so many runs, that prevents memory fragmentation
POOL_TYPE: TPoolType = None # type of pool to run, must be set in derived configs
Expand Down
25 changes: 0 additions & 25 deletions dlt/common/configuration/postgres_configuration.py

This file was deleted.

22 changes: 22 additions & 0 deletions dlt/common/configuration/postgres_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dlt.common.configuration import BaseConfiguration
from dlt.common.configuration.utils import TSecretValue
from dlt.common.typing import StrAny


class PostgresCredentials(BaseConfiguration):
DBNAME: str = None
PASSWORD: TSecretValue = None
USER: str = None
HOST: str = None
PORT: int = 5439
CONNECT_TIMEOUT: int = 15

@classmethod
def check_integrity(cls) -> None:
cls.DBNAME = cls.DBNAME.lower()
# cls.DEFAULT_DATASET = cls.DEFAULT_DATASET.lower()
cls.PASSWORD = TSecretValue(cls.PASSWORD.strip())

@classmethod
def as_credentials(cls) -> StrAny:
return cls.as_dict()
Loading

0 comments on commit 5a49c64

Please sign in to comment.