Skip to content

Commit

Permalink
Merge pull request #57 from scale-vector/rfix/wei-type-variant-support
Browse files Browse the repository at this point in the history
* adds support for deterministic variant types which are visible as different type depending on the content
* adds support for `Wei` variant type which is `Decimal` that within destination decimal range and `text` outside of that range
* adds option to automatically export schemas in `Pipeline` v1
* for the default schema, the schema name is not a part of dataset name.
  • Loading branch information
rudolfix authored Aug 24, 2022
2 parents 175a831 + 3fc3de7 commit a79744e
Show file tree
Hide file tree
Showing 42 changed files with 899 additions and 2,101 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
[![LINT Badge](https://github.com/scale-vector/dlt/actions/workflows/lint.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/lint.yml)
[![TEST COMMON Badge](https://github.com/scale-vector/dlt/actions/workflows/test_common.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_common.yml)
[![TEST REDSHIFT Badge](https://github.com/scale-vector/dlt/actions/workflows/test_loader_redshift.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_loader_redshift.yml)
[![TEST GCP Badge](https://github.com/scale-vector/dlt/actions/workflows/test_loader_gcp.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_loader_gcp.yml)
[![TEST BIGQUERY Badge](https://github.com/scale-vector/dlt/actions/workflows/test_loader_bigquery.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_loader_bigquery.yml)

</p>

# Data Load Tool (DLT)

Data Load Tool (DLT) enables simple, python-native data pipelining for data professionals.
Data Load Tool (DLT) enables simple, python-native data pipelining for data professionals.

It is an open source, scalable data loading framework that does not require any help from DevOps.

Expand Down Expand Up @@ -110,10 +110,10 @@ Advanced, commercial-grade use of DLT requires only some configuration.

## Supported data warehouses

Google BigQuery:
Google BigQuery:
```pip3 install python-dlt[gcp]```

Amazon Redshift:
Amazon Redshift:
```pip install python-dlt[redshift]```

## How to load very large sources?
Expand Down
2 changes: 1 addition & 1 deletion deploy/dlt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ LABEL org.label-schema.vendor="ScaleVector" \
org.label-schema.name="DLT" \
org.label-schema.description="DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run."

# prepare dirs to install autopoieses
# prepare dirs to install dlt
RUN mkdir -p /tmp/pydlt

WORKDIR /tmp/pydlt
Expand Down
15 changes: 15 additions & 0 deletions dlt/cli/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dlt.common.typing import DictStrAny
from dlt.common.utils import str2bool

from dlt.pipeline import Pipeline, PostgresPipelineCredentials


def str2bool_a(v: str) -> bool:
try:
Expand All @@ -30,6 +32,10 @@ def main() -> None:
schema.add_argument("file", help="Schema file name, in yaml or json format, will autodetect based on extension")
schema.add_argument("--format", choices=["json", "yaml"], default="yaml", help="Display schema in this format")
schema.add_argument("--remove-defaults", action="store_true", help="Does not show default hint values")
pipeline = subparsers.add_parser("pipeline", help="Operations on the pipelines")
pipeline.add_argument("name", help="Pipeline name")
pipeline.add_argument("workdir", help="Pipeline working directory")
pipeline.add_argument("operation", choices=["failed_loads"], default="failed_loads", help="Show failed loads for a pipeline")

# TODO: consider using fire: https://github.com/google/python-fire
args = parser.parse_args()
Expand Down Expand Up @@ -57,6 +63,15 @@ def main() -> None:
schema_str = s.to_pretty_yaml(remove_defaults=args.remove_defaults)
print(schema_str)
exit(0)
elif args.command == "pipeline":
p = Pipeline(args.name)
p.restore_pipeline(PostgresPipelineCredentials("dummy"), args.workdir)
completed_loads = p.list_completed_loads()
for load_id in completed_loads:
print(f"Checking failed jobs in {load_id}")
for job, failed_message in p.list_failed_jobs(load_id):
print(f"JOB: {job}\nMSG: {failed_message}")
exit(0)
else:
parser.print_help()
exit(-1)
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import functools
from .arithmetics import Decimal # noqa: F401
from .wei import Wei
from .pendulum import pendulum # noqa: F401
from .json import json # noqa: F401, I251
from .time import sleep # noqa: F401
from .arithmetics import Decimal # noqa: F401
from dlt._version import common_version as __version__
1 change: 0 additions & 1 deletion dlt/common/arithmetics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

DEFAULT_NUMERIC_PRECISION = 38
DEFAULT_NUMERIC_SCALE = 9

NUMERIC_DEFAULT_QUANTIZER = Decimal("1." + "0" * DEFAULT_NUMERIC_SCALE)


Expand Down
17 changes: 12 additions & 5 deletions dlt/common/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from datetime import date, datetime # noqa: I251
from functools import partial
from typing import Any, Callable, Union
from uuid import UUID, uuid4
from uuid import UUID
from hexbytes import HexBytes
import simplejson
from simplejson.raw_json import RawJSON

from dlt.common.arithmetics import Decimal
from dlt.common.wei import Wei

# simplejson._toggle_speedups(False)

Expand Down Expand Up @@ -44,20 +45,25 @@ def custom_encode(obj: Any) -> str:
_UUIDT = u'\uF029'
_HEXBYTES = u'\uF02A'
_B64BYTES = u'\uF02B'
_WEI = u'\uF02C'

DECODERS = [
lambda s: Decimal(s),
lambda s: pendulum.parse(s),
lambda s: pendulum.parse(s).date(), # type: ignore
lambda s: UUID(s),
lambda s: HexBytes(s),
lambda s: base64.b64decode(s)
lambda s: base64.b64decode(s),
lambda s: Wei(s)
]


def custom_pua_encode(obj: Any) -> Union[RawJSON, str]:
if isinstance(obj, Decimal):
return _DECIMAL + str(obj.normalize())
# wei is subclass of decimal and must be checked first
if isinstance(obj, Wei):
return _WEI + str(obj)
elif isinstance(obj, Decimal):
return _DECIMAL + str(obj)
# this works both for standard datetime and pendulum
elif isinstance(obj, datetime):
r = obj.isoformat()
Expand All @@ -78,7 +84,8 @@ def custom_pua_encode(obj: Any) -> Union[RawJSON, str]:
def custom_pua_decode(obj: Any) -> Any:
if isinstance(obj, str) and len(obj) > 1:
c = ord(obj[0]) - 0xF026
if c >= 0 and c <= 5:
# decode only the PUA space defined in DECODERS
if c >=0 and c <= 6:
return DECODERS[c](obj[1:])
return obj

Expand Down
80 changes: 46 additions & 34 deletions dlt/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import traceback
import sentry_sdk
from sentry_sdk.transport import HttpTransport
from sentry_sdk.integrations.logging import LoggingIntegration
from logging import LogRecord, Logger
from typing import Any, Callable, Dict, Type
from typing import Any, Type, Protocol

from dlt.common.json import json
from dlt.common.typing import DictStrAny, DictStrStr, StrStr
from dlt.common.typing import DictStrAny, StrStr
from dlt.common.configuration import RunConfiguration
from dlt.common.utils import filter_env_vars
from dlt._version import common_version as __version__
Expand Down Expand Up @@ -59,8 +60,6 @@ def logToRoot(message: str, *args: Any, **kwargs: Any) -> None:
class _MetricsFormatter(logging.Formatter):
def format(self, record: LogRecord) -> str: # noqa: A003
s = super(_MetricsFormatter, self).format(record)
if record.exc_text:
s = s + '|'
# dump metrics dictionary nicely
if "metrics" in record.__dict__:
s = s + ": " + json.dumps(record.__dict__["metrics"])
Expand Down Expand Up @@ -109,11 +108,21 @@ def _init_logging(logger_name: str, level: str, fmt: str, component: str, versio
return logger


def __getattr__(name: str) -> Callable[..., Any]:
class LogMethod(Protocol):
def __call__(self, msg: str, *args: Any, **kwds: Any) -> None:
...


def __getattr__(name: str) -> LogMethod:
# a catch all function for a module that forwards calls to unknown methods to LOGGER
def wrapper(msg: str, *args: Any, **kwargs: Any) -> None:
if LOGGER:
getattr(LOGGER, name)(msg, *args, **kwargs, stacklevel=2)
# skip stack frames when displaying log so the original logging frame is displayed
stacklevel = 2
if name == "exception":
# exception has one more frame
stacklevel = 3
getattr(LOGGER, name)(msg, *args, **kwargs, stacklevel=stacklevel)
return wrapper


Expand All @@ -130,6 +139,7 @@ def _extract_version_info(config: Type[RunConfiguration]) -> StrStr:
def _extract_pod_info() -> StrStr:
return filter_env_vars(["KUBE_NODE_NAME", "KUBE_POD_NAME", "KUBE_POD_NAMESPACE"])


class _SentryHttpTransport(HttpTransport):

timeout: int = 0
Expand All @@ -140,21 +150,34 @@ def _get_pool_options(self, *a: Any, **kw: Any) -> DictStrAny:
return rv


def _init_sentry(config: Type[RunConfiguration], version: StrStr) -> None:
if config.SENTRY_DSN:
sys_ver = version["version"]
release = sys_ver + "_" + version.get("commit_sha", "")
_SentryHttpTransport.timeout = config.REQUEST_TIMEOUT[0]
# TODO: setup automatic sending of log messages by log level (ie. we send a lot dbt trash logs)
# https://docs.sentry.io/platforms/python/guides/logging/
sentry_sdk.init(config.SENTRY_DSN, release=release, transport=_SentryHttpTransport)
# add version tags
for k, v in version.items():
sentry_sdk.set_tag(k, v)
# add kubernetes tags
pod_tags = _extract_pod_info()
for k, v in pod_tags.items():
sentry_sdk.set_tag(k, v)
def _get_sentry_log_level(C: Type[RunConfiguration]) -> LoggingIntegration:
log_level = logging._nameToLevel[C.LOG_LEVEL]
event_level = logging.WARNING if log_level <= logging.WARNING else log_level
return LoggingIntegration(
level=logging.INFO, # Capture info and above as breadcrumbs
event_level=event_level # Send errors as events
)


def _init_sentry(C: Type[RunConfiguration], version: StrStr) -> None:
sys_ver = version["version"]
release = sys_ver + "_" + version.get("commit_sha", "")
_SentryHttpTransport.timeout = C.REQUEST_TIMEOUT[0]
# TODO: ignore certain loggers ie. dbt loggers
# https://docs.sentry.io/platforms/python/guides/logging/
sentry_sdk.init(
C.SENTRY_DSN,
integrations=[_get_sentry_log_level(C)],
release=release,
transport=_SentryHttpTransport
)
# add version tags
for k, v in version.items():
sentry_sdk.set_tag(k, v)
# add kubernetes tags
pod_tags = _extract_pod_info()
for k, v in pod_tags.items():
sentry_sdk.set_tag(k, v)


def init_telemetry(config: Type[RunConfiguration]) -> None:
Expand Down Expand Up @@ -182,24 +205,13 @@ def init_logging_from_config(C: Type[RunConfiguration]) -> None:
C.LOG_FORMAT,
C.PIPELINE_NAME,
version)
_init_sentry(C, version)
if C.SENTRY_DSN:
_init_sentry(C, version)


def is_json_logging(log_format: str) -> bool:
return log_format == "JSON"


def process_internal_exception(msg: str, exc_info: Any = True) -> None:
# Passing default True value will cause implementation to use data provided by sys.exc_info
if LOGGER:
LOGGER.error(msg, exc_info=exc_info, stacklevel=2)
report_exception()


def report_exception() -> None:
if sentry_sdk.Hub.current:
sentry_sdk.capture_exception()


def pretty_format_exception() -> str:
return traceback.format_exc()
12 changes: 9 additions & 3 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Dict, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any

from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnSchema, TColumnName, TSimpleRegex
from dlt.common.schema.utils import column_name_validator
from dlt.common.utils import uniq_id, digest128
from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny
from dlt.common.normalizers.json import TNormalizedRowIterator
from dlt.common.sources import DLT_METADATA_FIELD, TEventDLTMeta, get_table_name
from dlt.common.validation import validate_dict
Expand Down Expand Up @@ -65,7 +65,7 @@ def norm_row_dicts(dict_row: StrAny, __r_lvl: int, parent_name: Optional[str]) -
corrected_k = schema.normalize_column_name(k)
child_name = corrected_k if not parent_name else schema.normalize_make_path(parent_name, corrected_k)
# for lists and dicts we must check if type is possibly complex
if isinstance(v, dict) or isinstance(v, list):
if isinstance(v, (dict, list)):
if not _is_complex_type(schema, table, child_name, __r_lvl):
if isinstance(v, dict):
# flatten the dict more
Expand Down Expand Up @@ -93,7 +93,6 @@ def _get_child_row_hash(parent_row_id: str, child_table: str, list_idx: int) ->
def _add_linking(row: TEventRowChild, extend: DictStrAny, parent_row_id: str, list_idx: int) -> TEventRowChild:
row["_dlt_parent_id"] = parent_row_id
row["_dlt_list_idx"] = list_idx
row.update(extend) # type: ignore

return row

Expand All @@ -120,6 +119,10 @@ def _get_propagated_values(schema: Schema, table: str, row: TEventRow, is_top_le
return extend


def _extend_row(extend: DictStrAny, row: TEventRow) -> None:
row.update(extend) # type: ignore


# generate child tables only for lists
def _normalize_list(
schema: Schema,
Expand All @@ -144,6 +147,7 @@ def _normalize_list(
# list of simple types
child_row_hash = _get_child_row_hash(parent_row_id, table, idx)
e = _add_linking({"value": v, "_dlt_id": child_row_hash}, extend, parent_row_id, idx)
_extend_row(extend, e)
yield (table, parent_table), e


Expand All @@ -161,6 +165,8 @@ def _normalize_row(
is_top_level = parent_table is None
# flatten current row and extract all lists to recur into
flattened_row, lists = _flatten(schema, table, dict_row, _r_lvl)
# always extend row
_extend_row(extend, flattened_row)
# infer record hash or leave existing primary key if present
row_id = flattened_row.get("_dlt_id", None)
if not row_id:
Expand Down
Loading

0 comments on commit a79744e

Please sign in to comment.