diff --git a/README.md b/README.md index b6156e2504..816c3aa510 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,13 @@ [![LINT Badge](https://github.com/scale-vector/dlt/actions/workflows/lint.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/lint.yml) [![TEST COMMON Badge](https://github.com/scale-vector/dlt/actions/workflows/test_common.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_common.yml) [![TEST REDSHIFT Badge](https://github.com/scale-vector/dlt/actions/workflows/test_loader_redshift.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_loader_redshift.yml) -[![TEST GCP Badge](https://github.com/scale-vector/dlt/actions/workflows/test_loader_gcp.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_loader_gcp.yml) +[![TEST BIGQUERY Badge](https://github.com/scale-vector/dlt/actions/workflows/test_loader_bigquery.yml/badge.svg)](https://github.com/scale-vector/dlt/actions/workflows/test_loader_bigquery.yml)
# Data Load Tool (DLT) -Data Load Tool (DLT) enables simple, python-native data pipelining for data professionals. +Data Load Tool (DLT) enables simple, python-native data pipelining for data professionals. It is an open source, scalable data loading framework that does not require any help from DevOps. @@ -110,10 +110,10 @@ Advanced, commercial-grade use of DLT requires only some configuration. ## Supported data warehouses -Google BigQuery: +Google BigQuery: ```pip3 install python-dlt[gcp]``` -Amazon Redshift: +Amazon Redshift: ```pip install python-dlt[redshift]``` ## How to load very large sources? diff --git a/deploy/dlt/Dockerfile b/deploy/dlt/Dockerfile index ca96ac7a0f..5ddd4c03e5 100644 --- a/deploy/dlt/Dockerfile +++ b/deploy/dlt/Dockerfile @@ -6,7 +6,7 @@ LABEL org.label-schema.vendor="ScaleVector" \ org.label-schema.name="DLT" \ org.label-schema.description="DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." -# prepare dirs to install autopoieses +# prepare dirs to install dlt RUN mkdir -p /tmp/pydlt WORKDIR /tmp/pydlt diff --git a/dlt/cli/dlt.py b/dlt/cli/dlt.py index 7d6ae82ea8..302814a464 100644 --- a/dlt/cli/dlt.py +++ b/dlt/cli/dlt.py @@ -9,6 +9,8 @@ from dlt.common.typing import DictStrAny from dlt.common.utils import str2bool +from dlt.pipeline import Pipeline, PostgresPipelineCredentials + def str2bool_a(v: str) -> bool: try: @@ -30,6 +32,10 @@ def main() -> None: schema.add_argument("file", help="Schema file name, in yaml or json format, will autodetect based on extension") schema.add_argument("--format", choices=["json", "yaml"], default="yaml", help="Display schema in this format") schema.add_argument("--remove-defaults", action="store_true", help="Does not show default hint values") + pipeline = subparsers.add_parser("pipeline", help="Operations on the pipelines") + pipeline.add_argument("name", help="Pipeline name") + pipeline.add_argument("workdir", help="Pipeline working directory") + pipeline.add_argument("operation", choices=["failed_loads"], default="failed_loads", help="Show failed loads for a pipeline") # TODO: consider using fire: https://github.com/google/python-fire args = parser.parse_args() @@ -57,6 +63,15 @@ def main() -> None: schema_str = s.to_pretty_yaml(remove_defaults=args.remove_defaults) print(schema_str) exit(0) + elif args.command == "pipeline": + p = Pipeline(args.name) + p.restore_pipeline(PostgresPipelineCredentials("dummy"), args.workdir) + completed_loads = p.list_completed_loads() + for load_id in completed_loads: + print(f"Checking failed jobs in {load_id}") + for job, failed_message in p.list_failed_jobs(load_id): + print(f"JOB: {job}\nMSG: {failed_message}") + exit(0) else: parser.print_help() exit(-1) diff --git a/dlt/common/__init__.py b/dlt/common/__init__.py index 207abc3cb7..6da3ee3a0e 100644 --- a/dlt/common/__init__.py +++ b/dlt/common/__init__.py @@ -1,6 +1,6 @@ -import functools +from .arithmetics import Decimal # noqa: F401 +from .wei import Wei from .pendulum import pendulum # noqa: F401 from .json import json # noqa: F401, I251 from .time import sleep # noqa: F401 -from .arithmetics import Decimal # noqa: F401 from dlt._version import common_version as __version__ diff --git a/dlt/common/arithmetics.py b/dlt/common/arithmetics.py index 09643e553a..5277acad4f 100644 --- a/dlt/common/arithmetics.py +++ b/dlt/common/arithmetics.py @@ -6,7 +6,6 @@ DEFAULT_NUMERIC_PRECISION = 38 DEFAULT_NUMERIC_SCALE = 9 - NUMERIC_DEFAULT_QUANTIZER = Decimal("1." + "0" * DEFAULT_NUMERIC_SCALE) diff --git a/dlt/common/json.py b/dlt/common/json.py index 4e72d77056..668760ad3c 100644 --- a/dlt/common/json.py +++ b/dlt/common/json.py @@ -3,12 +3,13 @@ from datetime import date, datetime # noqa: I251 from functools import partial from typing import Any, Callable, Union -from uuid import UUID, uuid4 +from uuid import UUID from hexbytes import HexBytes import simplejson from simplejson.raw_json import RawJSON from dlt.common.arithmetics import Decimal +from dlt.common.wei import Wei # simplejson._toggle_speedups(False) @@ -44,6 +45,7 @@ def custom_encode(obj: Any) -> str: _UUIDT = u'\uF029' _HEXBYTES = u'\uF02A' _B64BYTES = u'\uF02B' +_WEI = u'\uF02C' DECODERS = [ lambda s: Decimal(s), @@ -51,13 +53,17 @@ def custom_encode(obj: Any) -> str: lambda s: pendulum.parse(s).date(), # type: ignore lambda s: UUID(s), lambda s: HexBytes(s), - lambda s: base64.b64decode(s) + lambda s: base64.b64decode(s), + lambda s: Wei(s) ] def custom_pua_encode(obj: Any) -> Union[RawJSON, str]: - if isinstance(obj, Decimal): - return _DECIMAL + str(obj.normalize()) + # wei is subclass of decimal and must be checked first + if isinstance(obj, Wei): + return _WEI + str(obj) + elif isinstance(obj, Decimal): + return _DECIMAL + str(obj) # this works both for standard datetime and pendulum elif isinstance(obj, datetime): r = obj.isoformat() @@ -78,7 +84,8 @@ def custom_pua_encode(obj: Any) -> Union[RawJSON, str]: def custom_pua_decode(obj: Any) -> Any: if isinstance(obj, str) and len(obj) > 1: c = ord(obj[0]) - 0xF026 - if c >= 0 and c <= 5: + # decode only the PUA space defined in DECODERS + if c >=0 and c <= 6: return DECODERS[c](obj[1:]) return obj diff --git a/dlt/common/logger.py b/dlt/common/logger.py index beb48a8c82..9de24cb809 100644 --- a/dlt/common/logger.py +++ b/dlt/common/logger.py @@ -3,11 +3,12 @@ import traceback import sentry_sdk from sentry_sdk.transport import HttpTransport +from sentry_sdk.integrations.logging import LoggingIntegration from logging import LogRecord, Logger -from typing import Any, Callable, Dict, Type +from typing import Any, Type, Protocol from dlt.common.json import json -from dlt.common.typing import DictStrAny, DictStrStr, StrStr +from dlt.common.typing import DictStrAny, StrStr from dlt.common.configuration import RunConfiguration from dlt.common.utils import filter_env_vars from dlt._version import common_version as __version__ @@ -59,8 +60,6 @@ def logToRoot(message: str, *args: Any, **kwargs: Any) -> None: class _MetricsFormatter(logging.Formatter): def format(self, record: LogRecord) -> str: # noqa: A003 s = super(_MetricsFormatter, self).format(record) - if record.exc_text: - s = s + '|' # dump metrics dictionary nicely if "metrics" in record.__dict__: s = s + ": " + json.dumps(record.__dict__["metrics"]) @@ -109,11 +108,21 @@ def _init_logging(logger_name: str, level: str, fmt: str, component: str, versio return logger -def __getattr__(name: str) -> Callable[..., Any]: +class LogMethod(Protocol): + def __call__(self, msg: str, *args: Any, **kwds: Any) -> None: + ... + + +def __getattr__(name: str) -> LogMethod: # a catch all function for a module that forwards calls to unknown methods to LOGGER def wrapper(msg: str, *args: Any, **kwargs: Any) -> None: if LOGGER: - getattr(LOGGER, name)(msg, *args, **kwargs, stacklevel=2) + # skip stack frames when displaying log so the original logging frame is displayed + stacklevel = 2 + if name == "exception": + # exception has one more frame + stacklevel = 3 + getattr(LOGGER, name)(msg, *args, **kwargs, stacklevel=stacklevel) return wrapper @@ -130,6 +139,7 @@ def _extract_version_info(config: Type[RunConfiguration]) -> StrStr: def _extract_pod_info() -> StrStr: return filter_env_vars(["KUBE_NODE_NAME", "KUBE_POD_NAME", "KUBE_POD_NAMESPACE"]) + class _SentryHttpTransport(HttpTransport): timeout: int = 0 @@ -140,21 +150,34 @@ def _get_pool_options(self, *a: Any, **kw: Any) -> DictStrAny: return rv -def _init_sentry(config: Type[RunConfiguration], version: StrStr) -> None: - if config.SENTRY_DSN: - sys_ver = version["version"] - release = sys_ver + "_" + version.get("commit_sha", "") - _SentryHttpTransport.timeout = config.REQUEST_TIMEOUT[0] - # TODO: setup automatic sending of log messages by log level (ie. we send a lot dbt trash logs) - # https://docs.sentry.io/platforms/python/guides/logging/ - sentry_sdk.init(config.SENTRY_DSN, release=release, transport=_SentryHttpTransport) - # add version tags - for k, v in version.items(): - sentry_sdk.set_tag(k, v) - # add kubernetes tags - pod_tags = _extract_pod_info() - for k, v in pod_tags.items(): - sentry_sdk.set_tag(k, v) +def _get_sentry_log_level(C: Type[RunConfiguration]) -> LoggingIntegration: + log_level = logging._nameToLevel[C.LOG_LEVEL] + event_level = logging.WARNING if log_level <= logging.WARNING else log_level + return LoggingIntegration( + level=logging.INFO, # Capture info and above as breadcrumbs + event_level=event_level # Send errors as events + ) + + +def _init_sentry(C: Type[RunConfiguration], version: StrStr) -> None: + sys_ver = version["version"] + release = sys_ver + "_" + version.get("commit_sha", "") + _SentryHttpTransport.timeout = C.REQUEST_TIMEOUT[0] + # TODO: ignore certain loggers ie. dbt loggers + # https://docs.sentry.io/platforms/python/guides/logging/ + sentry_sdk.init( + C.SENTRY_DSN, + integrations=[_get_sentry_log_level(C)], + release=release, + transport=_SentryHttpTransport + ) + # add version tags + for k, v in version.items(): + sentry_sdk.set_tag(k, v) + # add kubernetes tags + pod_tags = _extract_pod_info() + for k, v in pod_tags.items(): + sentry_sdk.set_tag(k, v) def init_telemetry(config: Type[RunConfiguration]) -> None: @@ -182,24 +205,13 @@ def init_logging_from_config(C: Type[RunConfiguration]) -> None: C.LOG_FORMAT, C.PIPELINE_NAME, version) - _init_sentry(C, version) + if C.SENTRY_DSN: + _init_sentry(C, version) def is_json_logging(log_format: str) -> bool: return log_format == "JSON" -def process_internal_exception(msg: str, exc_info: Any = True) -> None: - # Passing default True value will cause implementation to use data provided by sys.exc_info - if LOGGER: - LOGGER.error(msg, exc_info=exc_info, stacklevel=2) - report_exception() - - -def report_exception() -> None: - if sentry_sdk.Hub.current: - sentry_sdk.capture_exception() - - def pretty_format_exception() -> str: return traceback.format_exc() diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 50894a255a..5b1b7c99f0 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -1,10 +1,10 @@ from typing import Dict, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any +from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TColumnName, TSimpleRegex from dlt.common.schema.utils import column_name_validator from dlt.common.utils import uniq_id, digest128 -from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny from dlt.common.normalizers.json import TNormalizedRowIterator from dlt.common.sources import DLT_METADATA_FIELD, TEventDLTMeta, get_table_name from dlt.common.validation import validate_dict @@ -65,7 +65,7 @@ def norm_row_dicts(dict_row: StrAny, __r_lvl: int, parent_name: Optional[str]) - corrected_k = schema.normalize_column_name(k) child_name = corrected_k if not parent_name else schema.normalize_make_path(parent_name, corrected_k) # for lists and dicts we must check if type is possibly complex - if isinstance(v, dict) or isinstance(v, list): + if isinstance(v, (dict, list)): if not _is_complex_type(schema, table, child_name, __r_lvl): if isinstance(v, dict): # flatten the dict more @@ -93,7 +93,6 @@ def _get_child_row_hash(parent_row_id: str, child_table: str, list_idx: int) -> def _add_linking(row: TEventRowChild, extend: DictStrAny, parent_row_id: str, list_idx: int) -> TEventRowChild: row["_dlt_parent_id"] = parent_row_id row["_dlt_list_idx"] = list_idx - row.update(extend) # type: ignore return row @@ -120,6 +119,10 @@ def _get_propagated_values(schema: Schema, table: str, row: TEventRow, is_top_le return extend +def _extend_row(extend: DictStrAny, row: TEventRow) -> None: + row.update(extend) # type: ignore + + # generate child tables only for lists def _normalize_list( schema: Schema, @@ -144,6 +147,7 @@ def _normalize_list( # list of simple types child_row_hash = _get_child_row_hash(parent_row_id, table, idx) e = _add_linking({"value": v, "_dlt_id": child_row_hash}, extend, parent_row_id, idx) + _extend_row(extend, e) yield (table, parent_table), e @@ -161,6 +165,8 @@ def _normalize_row( is_top_level = parent_table is None # flatten current row and extract all lists to recur into flattened_row, lists = _flatten(schema, table, dict_row, _r_lvl) + # always extend row + _extend_row(extend, flattened_row) # infer record hash or leave existing primary key if present row_id = flattened_row.get("_dlt_id", None) if not row_id: diff --git a/dlt/common/runners/pool_runner.py b/dlt/common/runners/pool_runner.py index 7b285b8164..8ec13da630 100644 --- a/dlt/common/runners/pool_runner.py +++ b/dlt/common/runners/pool_runner.py @@ -10,7 +10,7 @@ from dlt.common.runners.runnable import Runnable, TPool from dlt.common.time import sleep from dlt.common.telemetry import TRunHealth, TRunMetrics, get_logging_extras, get_metrics_from_prometheus -from dlt.common.logger import init_logging_from_config, init_telemetry, process_internal_exception +from dlt.common.logger import init_logging_from_config, init_telemetry from dlt.common.signals import register_signals from dlt.common.utils import str2bool from dlt.common.exceptions import SignalReceivedException, TimeRangeExhaustedException, UnsupportedProcessStartMethodException @@ -110,6 +110,7 @@ def run_pool(C: Type[PoolRunnerConfiguration], run_f: Union[Runnable[TPool], Cal try: HEALTH_PROPS_GAUGES["runs_count"].inc() # run pool logic + logger.debug("Running pool") with RUN_DURATION_SUMMARY.time(), RUN_DURATION_GAUGE.time(): if callable(run_f): run_metrics = run_f(cast(TPool, pool)) @@ -122,33 +123,16 @@ def run_pool(C: Type[PoolRunnerConfiguration], run_f: Union[Runnable[TPool], Cal # always exit raise else: - process_internal_exception("run") + logger.exception("run") # the run failed run_metrics = TRunMetrics(True, True, -1) # preserve exception + # TODO: convert it to callback global LAST_RUN_EXCEPTION LAST_RUN_EXCEPTION = exc + logger.debug(f"Pool ran with {run_metrics}") - # gather and emit metrics - if not run_metrics.was_idle: - HEALTH_PROPS_GAUGES["runs_not_idle_count"].inc() - if run_metrics.has_failed: - HEALTH_PROPS_GAUGES["runs_failed_count"].inc() - HEALTH_PROPS_GAUGES["runs_cs_failed_gauge"].inc() - HEALTH_PROPS_GAUGES["runs_cs_healthy_gauge"].set(0) - else: - HEALTH_PROPS_GAUGES["runs_healthy_count"].inc() - HEALTH_PROPS_GAUGES["runs_cs_healthy_gauge"].inc() - HEALTH_PROPS_GAUGES["runs_cs_failed_gauge"].set(0) - HEALTH_PROPS_GAUGES["runs_pending_items_gauge"].set(run_metrics.pending_items) - health_props = update_gauges() - logger.health("run health counters", extra={"metrics": health_props}) - logger.metrics("run metrics", extra=get_logging_extras([RUN_DURATION_GAUGE, RUN_DURATION_SUMMARY])) - - # preserve last run metrics - global LAST_RUN_METRICS - LAST_RUN_METRICS = run_metrics - + health_props = _update_metrics(run_metrics) # exit due to signal signals.raise_if_signalled() @@ -161,7 +145,7 @@ def run_pool(C: Type[PoolRunnerConfiguration], run_f: Union[Runnable[TPool], Cal # and was all the time idle or (was not idle but now pending is 0) print(RUN_ARGS) if RUN_ARGS.single_run and (health_props["runs_count"] >= RUN_ARGS.wait_runs and (health_props["runs_not_idle_count"] == 0 or run_metrics.pending_items == 0)): - logger.warning("Stopping runner due to single run override") + logger.info("Stopping runner due to single run override") return 0 if run_metrics.has_failed: @@ -191,3 +175,27 @@ def run_pool(C: Type[PoolRunnerConfiguration], run_f: Union[Runnable[TPool], Cal pool.close() pool.join() pool = None + + +def _update_metrics(run_metrics: TRunMetrics) -> TRunHealth: + # gather and emit metrics + if not run_metrics.was_idle: + HEALTH_PROPS_GAUGES["runs_not_idle_count"].inc() + if run_metrics.has_failed: + HEALTH_PROPS_GAUGES["runs_failed_count"].inc() + HEALTH_PROPS_GAUGES["runs_cs_failed_gauge"].inc() + HEALTH_PROPS_GAUGES["runs_cs_healthy_gauge"].set(0) + else: + HEALTH_PROPS_GAUGES["runs_healthy_count"].inc() + HEALTH_PROPS_GAUGES["runs_cs_healthy_gauge"].inc() + HEALTH_PROPS_GAUGES["runs_cs_failed_gauge"].set(0) + HEALTH_PROPS_GAUGES["runs_pending_items_gauge"].set(run_metrics.pending_items) + health_props = update_gauges() + logger.health("run health counters", extra={"metrics": health_props}) + logger.metrics("run metrics", extra=get_logging_extras([RUN_DURATION_GAUGE, RUN_DURATION_SUMMARY])) + + # preserve last run metrics + global LAST_RUN_METRICS + LAST_RUN_METRICS = run_metrics + + return health_props diff --git a/dlt/common/schema/detections.py b/dlt/common/schema/detections.py index 7ee7b0df8d..697251de22 100644 --- a/dlt/common/schema/detections.py +++ b/dlt/common/schema/detections.py @@ -3,7 +3,7 @@ from hexbytes import HexBytes -from dlt.common import pendulum, Decimal +from dlt.common import pendulum, Wei from dlt.common.schema.typing import TDataType @@ -21,7 +21,7 @@ def is_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]: def is_iso_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]: # only strings can be converted - if t is not str: + if not issubclass(t, str): return None if not v: return None @@ -37,7 +37,7 @@ def is_iso_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]: def is_large_integer(t: Type[Any], v: Any) -> Optional[TDataType]: # only ints can be converted - if t is int: + if issubclass(t, int): # TODO: this is bigquery limit, we need to implement better wei type # if integer does not find in maximum wei then convert to string if v > 578960446186580977117854925043439539266: @@ -51,6 +51,13 @@ def is_large_integer(t: Type[Any], v: Any) -> Optional[TDataType]: def is_hexbytes_to_text(t: Type[Any], v: Any) -> Optional[TDataType]: # HexBytes should be converted to text - if t is HexBytes: + if issubclass(t, HexBytes): return "text" return None + + +def is_wei_to_double(t: Type[Any], v: Any) -> Optional[TDataType]: + # Wei should be converted to double, use this only for aggregate non-financial reporting + if issubclass(t, Wei): + return "double" + return None diff --git a/dlt/common/schema/exceptions.py b/dlt/common/schema/exceptions.py index b739be8644..57e43b81cd 100644 --- a/dlt/common/schema/exceptions.py +++ b/dlt/common/schema/exceptions.py @@ -15,12 +15,21 @@ def __init__(self, name: str, normalized_name: str) -> None: class CannotCoerceColumnException(SchemaException): - def __init__(self, table_name: str, column_name: str, from_type: TDataType, to_type: TDataType, value: Any) -> None: - super().__init__(f"Cannot coerce type in table {table_name} column {column_name} existing type {from_type} coerced type {to_type} value: {value}") + def __init__(self, table_name: str, column_name: str, from_type: TDataType, to_type: TDataType, coerced_value: Any) -> None: + self.table_name = table_name + self.column_name = column_name + self.from_type = from_type + self.to_type = to_type + self.coerced_value = coerced_value + super().__init__(f"Cannot coerce type in table {table_name} column {column_name} existing type {from_type} coerced type {to_type} value: {coerced_value}") -class TablePropertiesClashException(SchemaException): +class TablePropertiesConflictException(SchemaException): def __init__(self, table_name: str, prop_name: str, val1: str, val2: str): + self.table_name = table_name + self.prop_name = prop_name + self.val1 = val1 + self.val2 = val2 super().__init__(f"Cannot merge partial tables for {table_name} due to property {prop_name}: {val1} != {val2}") diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index a06991b696..db7d14f2a6 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -4,13 +4,15 @@ from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Any, cast from dlt.common import json -from dlt.common.typing import DictStrAny, StrAny, REPattern +from dlt.common.typing import DictStrAny, StrAny, REPattern, SupportsVariant, VARIANT_FIELD_FORMAT from dlt.common.normalizers.names import TNormalizeBreakPath, TNormalizeMakePath, TNormalizeNameFunc from dlt.common.normalizers.json import TNormalizeJSONFunc -from dlt.common.schema.typing import TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType, THintType, TWriteDisposition +from dlt.common.schema.typing import (TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, + TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType, + THintType, TWriteDisposition) from dlt.common.schema import utils from dlt.common.schema.exceptions import (CannotCoerceColumnException, CannotCoerceNullException, InvalidSchemaName, - ParentTableNotFoundException, SchemaCorruptedException, TablePropertiesClashException) + ParentTableNotFoundException, SchemaCorruptedException, TablePropertiesConflictException) from dlt.common.validation import validate_dict @@ -189,9 +191,14 @@ def update_schema(self, partial_table: TPartialTableSchema) -> None: else: # check if table properties can be merged if table.get("parent") != partial_table.get("parent"): - raise TablePropertiesClashException(table_name, "parent", table.get("parent"), partial_table.get("parent")) - if table.get("write_disposition") != partial_table.get("write_disposition"): - raise TablePropertiesClashException(table_name, "write_disposition", table.get("write_disposition"), partial_table.get("write_disposition")) + raise TablePropertiesConflictException(table_name, "parent", table.get("parent"), partial_table.get("parent")) + # check if partial table has write disposition set + partial_w_d = partial_table.get("write_disposition") + if partial_w_d: + # get write disposition recursively for existing table + existing_w_d = self.get_write_disposition(table_name) + if existing_w_d != partial_w_d: + raise TablePropertiesConflictException(table_name, "write_disposition", existing_w_d, partial_w_d) # add several columns to existing table table_columns = table["columns"] for column in partial_table["columns"].values(): @@ -200,7 +207,7 @@ def update_schema(self, partial_table: TPartialTableSchema) -> None: # we do not support changing existing columns if not utils.compare_columns(table_columns[column_name], column): # attempt to update to incompatible columns - raise CannotCoerceColumnException(table_name, column_name, table_columns[column_name]["data_type"], column["data_type"], None) + raise CannotCoerceColumnException(table_name, column_name, column["data_type"], table_columns[column_name]["data_type"], None) else: table_columns[column_name] = column @@ -324,10 +331,10 @@ def to_pretty_yaml(self, remove_defaults: bool = True) -> str: d = self.to_dict(remove_defaults=remove_defaults) return cast(str, yaml.dump(d, allow_unicode=True, default_flow_style=False, sort_keys=False)) - def _infer_column(self, k: str, v: Any) -> TColumnSchema: + def _infer_column(self, k: str, v: Any, data_type: TDataType = None) -> TColumnSchema: return TColumnSchema( name=k, - data_type=self._map_value_to_column_type(v, k), + data_type=data_type or self._infer_column_type(v, k), nullable=not self._infer_hint("not_null", v, k), partition=self._infer_hint("partition", v, k), cluster=self._infer_hint("cluster", v, k), @@ -343,41 +350,42 @@ def _coerce_null_value(self, table_columns: TTableSchemaColumns, table_name: str if not existing_column["nullable"]: raise CannotCoerceNullException(table_name, col_name) - def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name: str, col_name: str, v: Any) -> Tuple[str, TColumnSchema, Any]: + def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name: str, col_name: str, v: Any, final: bool = False) -> Tuple[str, TColumnSchema, Any]: new_column: TColumnSchema = None - variant_col_name = col_name + existing_column = table_columns.get(col_name) - if col_name in table_columns: - existing_column = table_columns[col_name] - # existing columns cannot be changed so we must update row - py_data_type = utils.py_type_to_sc_type(type(v)) - # first try to coerce existing value into destination type - try: - rv = utils.coerce_type(existing_column["data_type"], py_data_type, v) - except (ValueError, SyntaxError): - # if that does not work we must create variant extension to the table - variant_col_name = f"{col_name}_v_{py_data_type}" - # if variant exists check type, coercions are not required - if variant_col_name in table_columns: - if table_columns[variant_col_name]["data_type"] != py_data_type: - raise CannotCoerceColumnException(table_name, variant_col_name, table_columns[variant_col_name]["data_type"], py_data_type, v) - else: - # add new column - new_column = self._infer_column(variant_col_name, v) - # must have variant type, not preferred or coerced type - new_column["data_type"] = py_data_type - # coerce even if types are the same (complex case) - rv = utils.coerce_type(py_data_type, py_data_type, v) - else: - # infer new column - new_column = self._infer_column(col_name, v) - # and coerce type if inference changed the python type - py_type = utils.py_type_to_sc_type(type(v)) - rv = utils.coerce_type(new_column["data_type"], py_type, v) - - return variant_col_name, new_column, rv - - def _map_value_to_column_type(self, v: Any, k: str) -> TDataType: + # infer type or get it from existing table + col_type = existing_column.get("data_type") if existing_column else self._infer_column_type(v, col_name) + # get real python type + py_type = utils.py_type_to_sc_type(type(v)) + # and coerce type if inference changed the python type + try: + coerced_v = utils.coerce_type(col_type, py_type, v) + # print(f"co: {py_type} -> {col_type} {v}") + except (ValueError, SyntaxError): + if final: + # this is final call: we cannot generate any more auto-variants + raise CannotCoerceColumnException(table_name, col_name, py_type, table_columns[col_name]["data_type"], v) + # otherwise we must create variant extension to the table + # pass final=True so no more auto-variants can be created recursively + # TODO: generate callback so DLT user can decide what to do + variant_col_name = self.normalize_make_path(col_name, VARIANT_FIELD_FORMAT % py_type) + return self._coerce_non_null_value(table_columns, table_name, variant_col_name, v, final=True) + + # if coerced value is variant, then extract variant value + if isinstance(coerced_v, SupportsVariant): + coerced_v = coerced_v() + if isinstance(coerced_v, tuple): + # variant recovered so call recursively with variant column name and variant value + variant_col_name = self.normalize_make_path(col_name, VARIANT_FIELD_FORMAT % coerced_v[0]) + return self._coerce_non_null_value(table_columns, table_name, variant_col_name, coerced_v[1]) + + if not existing_column: + new_column = self._infer_column(col_name, v, data_type=col_type) + + return col_name, new_column, coerced_v + + def _infer_column_type(self, v: Any, col_name: str) -> TDataType: tv = type(v) # try to autodetect data type mapped_type = utils.autodetect_sc_type(self._normalizers_config.get("detections"), tv, v) @@ -385,7 +393,7 @@ def _map_value_to_column_type(self, v: Any, k: str) -> TDataType: if mapped_type is None: mapped_type = utils.py_type_to_sc_type(tv) # get preferred type based on column name - preferred_type = self.get_preferred_type(k) + preferred_type = self.get_preferred_type(col_name) # try to match python type to preferred if preferred_type: # try to coerce to destination type @@ -398,9 +406,9 @@ def _map_value_to_column_type(self, v: Any, k: str) -> TDataType: pass return mapped_type - def _infer_hint(self, hint_type: THintType, _: Any, k: str) -> bool: + def _infer_hint(self, hint_type: THintType, _: Any, col_name: str) -> bool: if hint_type in self._compiled_hints: - return any(h.search(k) for h in self._compiled_hints[hint_type]) + return any(h.search(col_name) for h in self._compiled_hints[hint_type]) else: return False diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 996ba7253e..c587b864b6 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -6,8 +6,8 @@ TDataType = Literal["text", "double", "bool", "timestamp", "bigint", "binary", "complex", "decimal", "wei"] THintType = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"] TColumnProp = Literal["name", "data_type", "nullable", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"] -TWriteDisposition = Literal["skip", "append", "replace", "merge", "upsert"] -TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text"] +TWriteDisposition = Literal["skip", "append", "replace", "merge"] +TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text", "wei_to_double"] TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] DATA_TYPES: Set[TDataType] = set(get_args(TDataType)) diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index d215383378..af97f26b65 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -6,9 +6,9 @@ import datetime # noqa: I251 from typing import Dict, List, Sequence, Tuple, Type, Any, cast -from dlt.common import pendulum, json, Decimal +from dlt.common import pendulum, json, Decimal, Wei from dlt.common.json import custom_encode as json_custom_encode -from dlt.common.arithmetics import ConversionSyntax +from dlt.common.arithmetics import InvalidOperation from dlt.common.exceptions import DictValidationException from dlt.common.normalizers.names import TNormalizeNameFunc from dlt.common.typing import DictStrAny, REPattern @@ -212,7 +212,6 @@ def upgrade_engine_version(schema_dict: DictStrAny, from_engine: int, to_engine: if idx > 0: parent = parent[:idx] if parent not in old_tables: - # print(f"candidate {parent} for {name} not found") continue else: parent = None @@ -279,16 +278,20 @@ def autodetect_sc_type(detection_fs: Sequence[TTypeDetections], t: Type[Any], v: def py_type_to_sc_type(t: Type[Any]) -> TDataType: - if t is float: + if issubclass(t, float): return "double" - elif t is int: - return "bigint" + # bool is subclass of int so must go first elif t is bool: return "bool" + elif issubclass(t, int): + return "bigint" elif issubclass(t, bytes): return "binary" - elif issubclass(t, dict) or issubclass(t, list): + elif issubclass(t, (dict, list)): return "complex" + # wei is subclass of decimal and must be checked first + elif issubclass(t, Wei): + return "wei" elif issubclass(t, Decimal): return "decimal" elif issubclass(t, datetime.datetime): @@ -326,10 +329,8 @@ def coerce_type(to_type: TDataType, from_type: TDataType, value: Any) -> Any: if from_type == "bigint": return value.to_bytes((value.bit_length() + 7) // 8, 'little') - if to_type in ["wei", "bigint"]: - if from_type == "bigint": - return value - if from_type in ["decimal", "double"]: + if to_type == "bigint": + if from_type in ["wei", "decimal", "double"]: if value % 1 != 0: # only integer decimals and floats can be coerced raise ValueError(value) @@ -351,21 +352,23 @@ def coerce_type(to_type: TDataType, from_type: TDataType, value: Any) -> Any: else: return float(trim_value) - if to_type == "decimal": - if from_type in ["bigint", "wei"]: - return value - if from_type == "double": - return Decimal(value) + # decimal and wei behave identically when converted from/to + if to_type in ["decimal", "wei"]: + # get target class + decimal_cls = Decimal if to_type == "decimal" else Wei + + if from_type in ["bigint", "wei", "decimal", "double"]: + return decimal_cls(value) if from_type == "text": trim_value = value.strip() if trim_value.startswith("0x"): - return int(trim_value[2:], 16) - elif "." not in trim_value and "e" not in trim_value: - return int(trim_value) + return decimal_cls(int(trim_value[2:], 16)) + # elif "." not in trim_value and "e" not in trim_value: + # return int(trim_value) else: try: - return Decimal(trim_value) - except ConversionSyntax: + return decimal_cls(trim_value) + except InvalidOperation: raise ValueError(trim_value) if to_type == "timestamp": @@ -392,7 +395,7 @@ def coerce_type(to_type: TDataType, from_type: TDataType, value: Any) -> Any: if to_type == "bool": if from_type == "text": return str2bool(value) - if from_type not in ["complex", "binary", "datetime"]: + if from_type not in ["complex", "binary", "timestamp"]: # all the numeric types will convert to bool on 0 - False, 1 - True return bool(value) @@ -464,6 +467,7 @@ def new_table(table_name: str, parent_name: str = None, write_disposition: TWrit } if parent_name: table["parent"] = parent_name + assert write_disposition is None else: # set write disposition only for root tables table["write_disposition"] = write_disposition or DEFAULT_WRITE_DISPOSITION diff --git a/dlt/common/signals.py b/dlt/common/signals.py index 7779f50fc8..2202bcb502 100644 --- a/dlt/common/signals.py +++ b/dlt/common/signals.py @@ -1,8 +1,11 @@ import signal from threading import Event -from typing import Any +from typing import Any, Callable, TYPE_CHECKING -from dlt.common import logger +if not TYPE_CHECKING: + from dlt.common import logger +else: + logger: Any = None from dlt.common.exceptions import SignalReceivedException _received_signal: int = 0 diff --git a/dlt/common/typing.py b/dlt/common/typing.py index f5c5b3dfa0..e22f2ae21a 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -1,6 +1,6 @@ from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence from re import Pattern as _REPattern -from typing import Callable, Dict, Any, Literal, Mapping, NewType, Type, TypeVar, TypedDict, TYPE_CHECKING, Union, get_args, get_origin +from typing import Callable, Dict, Any, Literal, Mapping, NewType, Tuple, Type, TypeVar, Generic, Protocol, TYPE_CHECKING, Union, runtime_checkable, get_args, get_origin if TYPE_CHECKING: from _typeshed import StrOrBytesPath from typing import _TypedDict @@ -21,6 +21,22 @@ TDataItem = DictStrAny +TVariantBase = TypeVar("TVariantBase", covariant=True) +TVariantRV = Tuple[str, Any] +VARIANT_FIELD_FORMAT = "v_%s" + + +@runtime_checkable +class SupportsVariant(Protocol, Generic[TVariantBase]): + """Defines variant type protocol that should be recognized by normalizers + + Variant types behave like TVariantBase type (ie. Decimal) but also implement the protocol below that is used to extract the variant value from it. + See `Wei` type declaration which returns Decimal or str for values greater than supported by destination warehouse. + """ + def __call__(self) -> Union[TVariantBase, TVariantRV]: + ... + + def is_optional_type(t: Type[Any]) -> bool: # todo: use typing get_args and get_origin in python 3.8 if hasattr(t, "__origin__"): diff --git a/dlt/common/wei.py b/dlt/common/wei.py index 83ed652a34..53babc23fc 100644 --- a/dlt/common/wei.py +++ b/dlt/common/wei.py @@ -1,12 +1,10 @@ -import functools -import hexbytes -from typing import Any +from typing import Union from dlt.common import Decimal -from dlt.common.arithmetics import Context, default_context, decimal -from dlt.common.typing import StrAny +from dlt.common.typing import TVariantRV, SupportsVariant +from dlt.common.arithmetics import default_context, decimal -# default scale of platform contracts +# default scale of EVM based blockchain WEI_SCALE = 18 # log(2^256) + 1 EVM_DECIMAL_PRECISION = 78 @@ -14,65 +12,29 @@ WEI_SCALE_POW = 10**18 -# class WeiDecimal(Decimal): -# ctx = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION) -# def __new__(cls, *args: Any, **kwargs: Any) -> "Wei": -# c = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION) -# d = super(WeiDecimal, cls).__new__(cls, *args, **kwargs, context=c) -# d.c = c -# # d.c = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION) -# return d - -# def __getattribute__(self, __name: str) -> Any: -# rv = super().__getattribute__(__name) -# if callable(rv) and not __name.startswith("_"): -# if "context" in rv.func_code.co_varnames: -# return functools.partialmethod(rv, context=self.c) -# return rv - -# def __repr__(self) -> str: -# return super().__repr__().replace("Decimal", "Wei") - - -class Wei(Decimal): +class Wei(Decimal,SupportsVariant[Decimal]): ctx = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION) - # def __slots__hints__(self) -> None: - # self._scale: int = 0 - - # def __new__(cls, value: int, scale: int = 0) -> "Wei": - # self = super(Wei, cls).__new__(cls, value) - # self._scale = scale - # return self - - # def __init__(self, value: int, scale: int = 0) -> None: - # self._c = default_context(decimal.getcontext().copy(), EVM_DECIMAL_PRECISION) - # self.value = value - # self.scale = scale - - - # def to_decimal(): - # pass - - # def __get__(self, obj, type=None) -> object: - # print("GET!") - # if self.normalize(self.ctx) > 100: - # return "STR" - # else: - # return self - - def __new__(cls, value: int, scale: int = 0) -> "Wei": + @classmethod + def from_int256(cls, value: int, decimals: int = 0) -> "Wei": d: "Wei" = None - if scale == 0: - d = super(Wei, cls).__new__(cls, value) - else: - d = super(Wei, cls).__new__(cls, Decimal(value, context=cls.ctx) / 10**scale) + with decimal.localcontext(Wei.ctx): + if decimals == 0: + d = cls(value) + else: + d = cls(Decimal(value) / 10**decimals) return d - # def from_uint256(value: int, scale: int = 0): - # pass + def __call__(self) -> Union["Wei", TVariantRV]: + # TODO: this should look into DestinationCapabilitiesContext to get maximum Decimal value. + # this is BigQuery BIGDECIMAL max + if self > 578960446186580977117854925043439539266 or self < -578960446186580977117854925043439539267: + return ("str", str(self)) + else: + return self + - # # def __repr__(self) -> str: - # # return f"{self.scale},{self.value}" + def __repr__(self) -> str: + return f"Wei('{str(self)}')" diff --git a/dlt/dbt_runner/runner.py b/dlt/dbt_runner/runner.py index 10fe280422..ecd48e2869 100644 --- a/dlt/dbt_runner/runner.py +++ b/dlt/dbt_runner/runner.py @@ -6,7 +6,7 @@ from dlt.common import logger from dlt.common.typing import DictStrAny, DictStrStr, StrAny -from dlt.common.logger import process_internal_exception, is_json_logging +from dlt.common.logger import is_json_logging from dlt.common.telemetry import get_logging_extras from dlt.common.file_storage import FileStorage from dlt.common.runners import TRunArgs, initialize_runner, run_pool @@ -193,7 +193,7 @@ def main(args: TRunArgs) -> int: try: configure(C, REGISTRY) except Exception: - process_internal_exception("init module") + logger.exception("init module") return -1 return run_pool(C, run) diff --git a/dlt/load/load.py b/dlt/load/load.py index 7283485b84..8ce8549540 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -5,7 +5,7 @@ from dlt.common import sleep, logger from dlt.common.runners import TRunArgs, TRunMetrics, initialize_runner, run_pool -from dlt.common.logger import process_internal_exception, pretty_format_exception +from dlt.common.logger import pretty_format_exception from dlt.common.exceptions import TerminalValueError from dlt.common.dataset_writers import TLoaderFileFormat from dlt.common.runners import Runnable, workermethod @@ -101,11 +101,11 @@ def w_spool_job(self: "Load", file_path: str, load_id: str, schema: Schema) -> O job = client.start_file_load(table, self.load_storage.storage._make_path(file_path)) except (LoadClientTerminalException, TerminalValueError): # if job irreversible cannot be started, mark it as failed - process_internal_exception(f"Terminal problem with spooling job {file_path}") + logger.exception(f"Terminal problem with spooling job {file_path}") job = JobClientBase.make_job_with_status(file_path, "failed", pretty_format_exception()) except (LoadClientTransientException, Exception): # return no job so file stays in new jobs (root) folder - process_internal_exception(f"Temporary problem with spooling job {file_path}") + logger.exception(f"Temporary problem with spooling job {file_path}") return None self.load_storage.start_job(load_id, job.file_name()) return job @@ -143,7 +143,7 @@ def retrieve_jobs(self, client: JobClientBase, load_id: str) -> Tuple[int, List[ logger.info(f"Will retrieve {file_path}") job = client.restore_file_load(file_path) except LoadClientTerminalException: - process_internal_exception(f"Job retrieval for {file_path} failed, job will be terminated") + logger.exception(f"Job retrieval for {file_path} failed, job will be terminated") job = JobClientBase.make_job_with_status(file_path, "failed", pretty_format_exception()) # proceed to appending job, do not reraise except (LoadClientTransientException, Exception): @@ -261,7 +261,7 @@ def main(args: TRunArgs) -> int: try: load = Load(C, REGISTRY) except Exception: - process_internal_exception("run") + logger.exception("init module") return -1 return run_pool(C, load) diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index b6c8f7ae82..72918fb64b 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -12,7 +12,6 @@ from dlt.common.telemetry import get_logging_extras from dlt.common.utils import uniq_id from dlt.common.typing import TDataItem -from dlt.common.logger import process_internal_exception from dlt.common.exceptions import PoolException from dlt.common.storages import SchemaStorage from dlt.common.schema import TSchemaUpdate, Schema @@ -110,7 +109,7 @@ def w_normalize_files(self: "Normalize", schema_name: str, load_id: str, events_ rows = normalized_data.setdefault(table_name, []) rows.append(row) except Exception: - process_internal_exception(f"Exception when processing file {events_file}") + logger.exception(f"Exception when processing file {events_file}") raise PoolException("normalize_files", events_file) # save rows and return schema changes to be gathered in parent process @@ -140,28 +139,32 @@ def map_single(self, schema_name: str, load_id: str, files: Sequence[str]) -> TM self_id: Any = id(self) return [Normalize.w_normalize_files(self_id, schema_name, load_id, chunk_files[0])], chunk_files - def update_schema(self, schema_name: str, schema_updates: List[TSchemaUpdate]) -> Schema: - # gather schema from all manifests, validate consistency and combine - schema = self.load_or_create_schema(schema_name) + def update_schema(self, schema: Schema, schema_updates: List[TSchemaUpdate]) -> int: + updates_count = 0 for schema_update in schema_updates: for table_name, table_updates in schema_update.items(): logger.debug(f"Updating schema for table {table_name} with {len(table_updates)} deltas") for partial_table in table_updates: + updates_count += 1 schema.update_schema(partial_table) - return schema + return updates_count def spool_files(self, schema_name: str, load_id: str, map_f: TMapFuncType, files: Sequence[str]) -> None: # process files in parallel or in single thread, depending on map_f schema_updates, chunk_files = map_f(schema_name, load_id, files) - schema = self.update_schema(schema_name, schema_updates) + schema = self.load_or_create_schema(schema_name) + # gather schema from all manifests, validate consistency and combine + updates_count = self.update_schema(schema, schema_updates) self.schema_version_gauge.labels(schema_name).set(schema.version) logger.metrics("Normalize metrics", extra=get_logging_extras([self.schema_version_gauge.labels(schema_name)])) logger.info(f"Saving schema {schema_name} with version {schema.version}, writing manifest files") - # schema is updated, save it to schema volume - self.schema_storage.save_schema(schema) - # save schema and schema updates to temp load folder + if updates_count > 0: + # schema is updated, save it to schema volume + self.schema_storage.save_schema(schema) + # save schema to temp load folder self.load_storage.save_temp_schema(schema, load_id) + # save schema updates even if empty self.load_storage.save_temp_schema_updates(load_id, schema_updates) # files must be renamed and deleted together so do not attempt that when process is about to be terminated signals.raise_if_signalled() @@ -225,7 +228,7 @@ def main(args: TRunArgs) -> int: try: n = Normalize(C, REGISTRY) except Exception: - process_internal_exception("init module") + logger.exception("init module") return -1 return run_pool(C, n) diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 24f8a46322..e73357a354 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,2 +1,3 @@ from dlt.pipeline.pipeline import Pipeline # noqa: F401 from dlt.pipeline.typing import GCPPipelineCredentials, PostgresPipelineCredentials # noqa: F401 +from dlt.pipeline.exceptions import CannotRestorePipelineException # noqa: F401 diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index abb754fcb9..70d4319e45 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -9,11 +9,10 @@ from typing import Any, Iterator, List, Sequence, Tuple from prometheus_client import REGISTRY -from dlt.common import json +from dlt.common import json, sleep, signals, logger from dlt.common.runners import pool_runner as runner, TRunArgs, TRunMetrics -from dlt.common.configuration import RunConfiguration, make_configuration +from dlt.common.configuration import PoolRunnerConfiguration, make_configuration from dlt.common.file_storage import FileStorage -from dlt.common.logger import process_internal_exception from dlt.common.schema import Schema, normalize_schema_name from dlt.common.typing import DictStrAny, StrAny from dlt.common.utils import uniq_id, is_interactive @@ -25,7 +24,7 @@ from dlt.load.configuration import configuration as loader_configuration from dlt.normalize import Normalize from dlt.load import Load -from dlt.pipeline.exceptions import InvalidPipelineContextException, MissingDependencyException, NoPipelineException, PipelineStepFailed, CannotRestorePipelineException, SqlClientNotAvailable +from dlt.pipeline.exceptions import MissingDependencyException, NoPipelineException, PipelineStepFailed, CannotRestorePipelineException, SqlClientNotAvailable from dlt.pipeline.typing import PipelineCredentials @@ -34,6 +33,7 @@ def __init__(self, pipeline_name: str, log_level: str = "INFO") -> None: self.pipeline_name = pipeline_name self.default_schema_name: str = None self.root_path: str = None + self.export_schema_path: str = None self.root_storage: FileStorage = None self.credentials: PipelineCredentials = None self.extractor_storage: ExtractorStorageBase = None @@ -44,17 +44,19 @@ def __init__(self, pipeline_name: str, log_level: str = "INFO") -> None: self._loader_instance: Load = None # patch config and initialize pipeline - C = make_configuration(RunConfiguration, RunConfiguration, initial_values={ + self.C = make_configuration(PoolRunnerConfiguration, PoolRunnerConfiguration, initial_values={ "PIPELINE_NAME": pipeline_name, - "LOG_LEVEL": log_level + "LOG_LEVEL": log_level, + "POOL_TYPE": "None" }) - runner.initialize_runner(C, TRunArgs(True, 0)) + runner.initialize_runner(self.C, TRunArgs(True, 0)) - def create_pipeline(self, credentials: PipelineCredentials, working_dir: str = None, schema: Schema = None) -> None: + def create_pipeline(self, credentials: PipelineCredentials, working_dir: str = None, schema: Schema = None, export_schema_path: str = None) -> None: # initialize root storage if not working_dir: working_dir = tempfile.mkdtemp() self.root_storage = FileStorage(working_dir, makedirs=True) + self.export_schema_path = export_schema_path # check if directory contains restorable pipeline try: @@ -76,9 +78,7 @@ def create_pipeline(self, credentials: PipelineCredentials, working_dir: str = N # create new schema if no default supplied if schema is None: schema = Schema(normalize_schema_name(self.pipeline_name)) - # persist schema with the pipeline - self.set_default_schema(schema) - # initialize empty state + # initialize empty state, this must be last operation when creating pipeline so restore reads only fully created ones with self._managed_state(): self.state = { "default_schema_name": self.default_schema_name, @@ -88,12 +88,14 @@ def create_pipeline(self, credentials: PipelineCredentials, working_dir: str = N # TODO: must take schema prefix from resolved configuration "loader_schema_prefix": credentials.default_dataset } + # persist schema with the pipeline + self.set_default_schema(schema) - def restore_pipeline(self, credentials: PipelineCredentials, working_dir: str) -> None: + def restore_pipeline(self, credentials: PipelineCredentials, working_dir: str, export_schema_path: str = None) -> None: try: # do not create extractor dir - it must exist self.root_storage = FileStorage(working_dir, makedirs=False) - # restore state + # restore state, this must be a first operation when restoring pipeline try: self._restore_state() except FileNotFoundError: @@ -105,6 +107,7 @@ def restore_pipeline(self, credentials: PipelineCredentials, working_dir: str) - credentials.default_dataset = self.state["loader_schema_prefix"] self.root_path = self.root_storage.storage_path self.credentials = credentials + self.export_schema_path = export_schema_path self._load_modules() # schema must exist try: @@ -136,6 +139,8 @@ def extract(self, items: Iterator[TItem], schema_name: str = None, table_name: s all_items.append(item) elif isinstance(item, abc.Sequence): all_items.extend(item) + # react to CTRL-C and shutdowns from controllers + signals.raise_if_signalled() try: self._extract_iterator(default_table_name, all_items) @@ -158,6 +163,7 @@ def normalize(self, workers: int = 1, max_events_in_chunk: int = 100000) -> None def load(self, max_parallel_loads: int = 20) -> None: self._verify_loader_instance() self._loader_instance.CONFIG.WORKERS = max_parallel_loads + self._loader_instance.load_client_cls.CONFIG.DEFAULT_SCHEMA_NAME = self.default_schema_name # type: ignore runner.run_pool(self._loader_instance.CONFIG, self._loader_instance) if runner.LAST_RUN_METRICS.has_failed: raise PipelineStepFailed("load", self.last_run_exception, runner.LAST_RUN_METRICS) @@ -238,11 +244,15 @@ def sql_client(self, schema_name: str = None) -> SqlClientBase[Any]: else: raise SqlClientNotAvailable(self._loader_instance.CONFIG.CLIENT_TYPE) + def sleep(self, seconds: float = None) -> None: + sleep(seconds or self.C.RUN_SLEEP) + def _configure_normalize(self) -> None: # create normalize config normalize_initial = { "NORMALIZE_VOLUME_PATH": os.path.join(self.root_path, "normalize"), "SCHEMA_VOLUME_PATH": os.path.join(self.root_path, "schemas"), + "EXPORT_SCHEMA_PATH": os.path.abspath(self.export_schema_path) if self.export_schema_path else None, "LOADER_FILE_FORMAT": self._loader_instance.load_client_cls.capabilities()["preferred_loader_file_format"], "ADD_EVENT_JSON": False } @@ -254,6 +264,7 @@ def _configure_normalize(self) -> None: def _configure_load(self) -> None: # use credentials to populate loader client config, it includes also client type loader_client_initial = dtc_asdict(self.credentials) + loader_client_initial["DEFAULT_SCHEMA_NAME"] = self.default_schema_name # but client type must be passed to loader config loader_initial = {"CLIENT_TYPE": loader_client_initial["CLIENT_TYPE"]} loader_initial.update(self._configure_runner()) @@ -311,7 +322,7 @@ def _extract_iterator(self, default_table_name: str, items: Sequence[DictStrAny] runner.LAST_RUN_METRICS = TRunMetrics(was_idle=False, has_failed=False, pending_items=0) except Exception as ex: - process_internal_exception("extracting iterator failed") + logger.exception("extracting iterator failed") runner.LAST_RUN_METRICS = TRunMetrics(was_idle=False, has_failed=True, pending_items=0) runner.LAST_RUN_EXCEPTION = ex raise diff --git a/dlt/pipeline/typing.py b/dlt/pipeline/typing.py index 4d1d02c57d..bdd9b44c03 100644 --- a/dlt/pipeline/typing.py +++ b/dlt/pipeline/typing.py @@ -1,11 +1,11 @@ -from dataclasses import dataclass -from typing import Literal +from typing import Literal, Type, Any +from dataclasses import dataclass, fields as dtc_fields from dlt.common import json from dlt.common.typing import StrAny, TSecretValue -TLoaderType = Literal["bigquery", "redshift"] +TLoaderType = Literal["bigquery", "redshift", "dummy"] TPipelineStage = Literal["extract", "normalize", "load"] # extractor generator yields functions that returns list of items of the type (table) when called @@ -30,9 +30,9 @@ def default_dataset(self, new_value: str) -> None: @dataclass class GCPPipelineCredentials(PipelineCredentials): - PROJECT_ID: str - DEFAULT_DATASET: str - CLIENT_EMAIL: str + PROJECT_ID: str = None + DEFAULT_DATASET: str = None + CLIENT_EMAIL: str = None PRIVATE_KEY: TSecretValue = None CRED_TYPE: str = "service_account" TOKEN_URI: str = "https://oauth2.googleapis.com/token" @@ -66,10 +66,10 @@ def default_credentials(cls, dataset_prefix: str, project_id: str = None) -> "GC @dataclass class PostgresPipelineCredentials(PipelineCredentials): - DBNAME: str - DEFAULT_DATASET: str - USER: str - HOST: str + DBNAME: str = None + DEFAULT_DATASET: str = None + USER: str = None + HOST: str = None PASSWORD: TSecretValue = None PORT: int = 5439 CONNECT_TIMEOUT: int = 15 @@ -84,10 +84,15 @@ def default_dataset(self, new_value: str) -> None: def credentials_from_dict(credentials: StrAny) -> PipelineCredentials: + + def ignore_unknown_props(typ_: Type[Any], props: StrAny) -> StrAny: + fields = {f.name: f for f in dtc_fields(typ_)} + return {k:v for k,v in props.items() if k in fields} + client_type = credentials.get("CLIENT_TYPE") if client_type == "bigquery": - return GCPPipelineCredentials(**credentials) + return GCPPipelineCredentials(**ignore_unknown_props(GCPPipelineCredentials, credentials)) elif client_type == "redshift": - return PostgresPipelineCredentials(**credentials) + return PostgresPipelineCredentials(**ignore_unknown_props(PostgresPipelineCredentials, credentials)) else: raise ValueError(f"CLIENT_TYPE: {client_type}") diff --git a/examples/ethereum.py b/examples/ethereum.py deleted file mode 100644 index 06915f0876..0000000000 --- a/examples/ethereum.py +++ /dev/null @@ -1,292 +0,0 @@ -from hexbytes import HexBytes -from dlt.common import Decimal -from dlt.common.time import sleep -from dlt.common.typing import DictStrAny -from dlt.pipeline import Pipeline, GCPPipelineCredentials -from dlt.common.arithmetics import numeric_default_context, numeric_default_quantize - -from examples.schemas.ethereum import discover_schema -from examples.sources.eth_source_utils import recode_tuples, flatten_batches, abi_to_selector, signature_to_abi -from examples.sources.ethereum import get_source - -# abi = signature_to_abi("event", "AxieggSpawned(uint256,uint256,uint256,uint256[],(uint256,uint256)[],(uint256,uint256))") -# # abi = signature_to_abi("event", "AxieggSpawned(uint256,uint256,uint256,uint256,(uint256,uint256),(uint256,uint256))") -# import pprint -# pprint.pprint(abi) -# assert abi_to_selector(abi) == HexBytes("0x280ad04291df39737839c19243929c95199e424e2518ea891b63deef3808bfe2") -# exit(-1) -# decoded = { -# "_tokenTypes": [ -# 1 -# ], -# "_tokenAddresses": [ -# "0x32950db2a7164aE833121501C797D79E7B79d74C" -# ], -# "_tokenNumbers": [ -# 11351913 -# ], -# "_startingPrices": [ -# 40000000000000000 -# ], -# "_endingPrices": [ -# 20000000000000000 -# ], -# "_exchangeTokens": [ -# "0xc99a6A985eD2Cac1ef41640596C5A5f9F4E19Ef5" -# ], -# "_durations": [ -# 432000 -# ], -# "_dlt_meta": { -# "table_name": "Marketplace_calls_createAuction" -# }, -# "_tx_blockNumber": 16039643, -# "_tx_blockTimestamp": 1659723680, -# "_tx_transactionHash": "\uf02a0x5714a265cf3456ad6bbacdf7a577f094f99b676bf19fde81153eea96fa224850", -# "_tx_transactionIndex": "0x0", -# "_tx_address": "0x213073989821f738A7BA3520C3D31a1F9aD31bBd", -# "_tx_status": "0x1" -# } - -# flatten_batches(decoded, -# { -# "constant": False, -# "inputs": [ -# { -# "internalType": "enum IExchange.TokenType[]", -# "name": "_tokenTypes", -# "type": "uint8[]" -# }, -# { -# "internalType": "address[]", -# "name": "_tokenAddresses", -# "type": "address[]" -# }, -# { -# "internalType": "uint256[]", -# "name": "_tokenNumbers", -# "type": "uint256[]" -# }, -# { -# "internalType": "uint256[]", -# "name": "_startingPrices", -# "type": "uint256[]" -# }, -# { -# "internalType": "uint256[]", -# "name": "_endingPrices", -# "type": "uint256[]" -# }, -# { -# "internalType": "contract IERC20[]", -# "name": "_exchangeTokens", -# "type": "address[]" -# }, -# { -# "internalType": "uint256[]", -# "name": "_durations", -# "type": "uint256[]" -# } -# ], -# "name": "createAuction", -# "outputs": [], -# "stateMutability": "nonpayable", -# "type": "function" -# }) - -# import pprint -# pprint.pprint(decoded) -# exit(0) - -# decoded = { -# "param_0": 11403430, -# "param_1": [ -# 14474011154664526034909069339965131741826270611619044490146571011551995691272, -# [ -# 1766959143701397048994461030926868181200402784820958467115499487534072066, -# 0, -# "hah" -# ] -# ], -# "_dlt_meta": { -# "table_name": "AXIE_logs_AxieEvolved" -# }, -# "_tx_blockNumber": 16039643, -# "_tx_blockTimestamp": 1659723680, -# "_tx_transactionHash": "\uf02a0x6302c20a4ef5426ee1da68fa3bdfb2c234a1f16fe441c51a35227ba4bd6746fa", -# "_tx_transactionIndex": "0x3", -# "_tx_address": "0x32950db2a7164aE833121501C797D79E7B79d74C", -# "_tx_status": "0x1", -# "_tx_logIndex": 15 -# } -# recode_tuples(decoded, -# { -# "name": "AxieEvolved", -# "type": "event", -# "inputs": [ -# { -# "name": "param_0", -# "type": "uint256", -# "indexed": True -# }, -# { -# "components": [ -# { -# "name": "param_1_1", -# "type": "uint256" -# }, -# { -# "name": "param_1_2", -# "type": "tuple", -# "components": [ -# { -# "name": "param_deep_i", -# "type": "uint256" -# }, -# { -# "name": "param_deep_2", -# "type": "uint256" -# }, -# { -# "name": "param_deep_3", -# "type": "string" -# } -# ] -# } -# ], -# "name": "param_1", -# "type": "tuple", -# "indexed": False -# } -# ], -# "outputs": [], -# "anonymous": False, -# "_dlt_meta": { -# "selector": "0xa006fbbbc9600fe3b3757442d103355696bba0d2b8f9201852984b64d72a0a0b", -# "block": 16039643 -# } -# } -# ) - -# import pprint -# pprint.pprint(decoded) -# exit(0) - -credentials = GCPPipelineCredentials.from_services_file("_secrets/project1234_service.json", "ronin_4") -# credentials = PostgresPipelineCredentials("redshift", "chat_analytics_rasa", "mainnet_6", "loader", "3.73.90.3") - -pipeline = Pipeline("ethereum") - -# print(pipeline.root_path) - -# get ethereum source which is python iterator, request 3 newest blocks with default lag. also pass the state (optional) -# so you can get newly produced blocks when the pipeline is run again -# i = get_source("https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161", 2, state=pipeline.state) - -# from web3 import Web3 -# from web3._utils.abi import abi_to_signature -# from web3.contract import ContractFunction -# from examples.sources.eth_utils import signature_to_abi, decode_tx -# from pprint import pprint -# pprint(signature_to_abi("function", "AxieEvolved(uint256,(uint256,(uint256,string[]))[])")) -# w3 = Web3() -# pprint(decode_tx(w3, "0x70a88b27000000000000000000000000dd4ca6d2565aeabe0b469896141a23f9d3cb181100000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000011af00000000000000000000000000000000000000000000000000000000008467e7a03e101f2dec380e797fd683bae9f692d16aa07de900f720791539f474d8a4c8000000000000000000000000000000000000000000000000000001826f53c08000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000a8754b9fa15fc18bb59458815510e40a12cd2014000000000000000000000000c99a6a985ed2cac1ef41640596c5a5f9f4e19ef5")) - -# print(abi_to_signature({ -# "name": "f", -# "type": "function", -# "inputs": [ -# { -# "name": "s", -# "type": "tuple[]", -# "components": [ -# { -# "name": "a", -# "type": "uint256" -# }, -# { -# "name": "b", -# "type": "uint256[]" -# }, -# { -# "name": "c", -# "type": "tuple[]", -# "components": [ -# { -# "name": "x", -# "type": "uint256" -# }, -# { -# "name": "y", -# "type": "uint256" -# } -# ] -# } -# ] -# }, -# { -# "name": "t", -# "type": "tuple", -# "components": [ -# { -# "name": "x", -# "type": "uint256" -# }, -# { -# "name": "y", -# "type": "uint256" -# } -# ] -# }, -# { -# "name": "a", -# "type": "uint256" -# } -# ], -# "outputs": ["uint256"] -# })) - -# exit(0) - -# "https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161" -rpc_url = "https://api.roninchain.com/rpc" -working_dir = "experiments/pipeline/ronin" -abi_dir="experiments/data/ronin/abi" -# pipeline.create_pipeline(credentials, schema=discover_schema(), working_dir=working_dir) -# i = get_source(rpc_url, 4, abi_dir=abi_dir, is_poa=True, supports_batching=False, state=pipeline.state) - -# you can transform the blocks by defining mapping function -def from_wei_to_eth(block: DictStrAny) -> DictStrAny: - # convert wei to eth providing with numeric precision at the target - with numeric_default_context(): - for tx in block["transactions"]: - v_wei = tx["value"] - if v_wei > 0: - # 9 scale vs 18 digits ether scale: a no go for any financial operations - # TODO: better support for decimal types - tx["eth_value"] = numeric_default_quantize(Decimal(v_wei) / 10**18) - # print(f"Produced nice decimal {tx['eth_value']}") - return block - - -# extract data from the source. operation is atomic and like all atomic operations in pipeline, it does not raise but returns -# execution status -# map(from_wei_to_eth, i) -# pipeline.extract(i, table_name="blocks") -# print(pipeline.state) - -# wait for ethereum network to produce some more blocks -# sleep(20) - -# restore the pipeline from the working directory (simulate continuation from the saved state) -pipeline.restore_pipeline(credentials, working_dir) -# while True: -# # obtain new iterator (the old one is expired), this time use deferred iterator to allow parallel block reading -# i = get_source(rpc_url, 8, abi_dir=abi_dir, is_poa=True, supports_batching=False, state=pipeline.state) -# pipeline.extract(i, table_name="blocks") -# sleep(10) -# print(pipeline.state) - -# this will unpack and load all extracted data -pipeline.normalize() -# pipeline.flush() diff --git a/examples/schemas/ethereum.py b/examples/schemas/ethereum.py deleted file mode 100644 index 11b33f4801..0000000000 --- a/examples/schemas/ethereum.py +++ /dev/null @@ -1,8 +0,0 @@ -from dlt.common.schema import Schema - -from dlt.pipeline import Pipeline - - -def discover_schema() -> Schema: - # all Ethereum compatible blockchains have the same schema so we just provide a nice yaml file - return Pipeline.load_schema_from_file("examples/schemas/ethereum_schema.yml") diff --git a/examples/schemas/ethereum_schema.json b/examples/schemas/ethereum_schema.json deleted file mode 100644 index e96e848be8..0000000000 --- a/examples/schemas/ethereum_schema.json +++ /dev/null @@ -1,424 +0,0 @@ -{ - "version": 8, - "engine_version": 3, - "name": "ethereum", - "tables": { - "_dlt_loads": { - "columns": { - "inserted_at": { - "data_type": "timestamp", - "nullable": false - }, - "load_id": { - "data_type": "text", - "nullable": false - }, - "status": { - "data_type": "bigint", - "nullable": false - } - }, - "write_disposition": "append" - }, - "_dlt_version": { - "columns": { - "engine_version": { - "data_type": "bigint", - "nullable": false - }, - "inserted_at": { - "data_type": "timestamp", - "nullable": false - }, - "version": { - "data_type": "bigint", - "nullable": false - } - }, - "write_disposition": "append" - }, - "blocks": { - "columns": { - "_dlt_load_id": { - "data_type": "text", - "nullable": false - }, - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "number": { - "primary_key": true, - "data_type": "bigint", - "nullable": false - }, - "parent_hash": { - "data_type": "text", - "nullable": true - }, - "hash": { - "cluster": true, - "unique": true, - "data_type": "text", - "nullable": false - }, - "base_fee_per_gas": { - "data_type": "wei", - "nullable": false - }, - "difficulty": { - "data_type": "wei", - "nullable": false - }, - "extra_data": { - "data_type": "text", - "nullable": true - }, - "gas_limit": { - "data_type": "bigint", - "nullable": false - }, - "gas_used": { - "data_type": "bigint", - "nullable": false - }, - "logs_bloom": { - "data_type": "text", - "nullable": true - }, - "miner": { - "data_type": "text", - "nullable": true - }, - "mix_hash": { - "data_type": "text", - "nullable": true - }, - "nonce": { - "data_type": "text", - "nullable": true - }, - "receipts_root": { - "data_type": "text", - "nullable": true - }, - "sha3_uncles": { - "data_type": "text", - "nullable": true - }, - "size": { - "data_type": "bigint", - "nullable": true - }, - "state_root": { - "data_type": "text", - "nullable": false - }, - "timestamp": { - "unique": true, - "sort": true, - "data_type": "timestamp", - "nullable": false - }, - "total_difficulty": { - "data_type": "wei", - "nullable": true - }, - "transactions_root": { - "data_type": "text", - "nullable": false - } - }, - "write_disposition": "append" - }, - "blocks__transactions": { - "columns": { - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "block_number": { - "primary_key": true, - "data_type": "bigint", - "nullable": false - }, - "transaction_index": { - "primary_key": true, - "data_type": "bigint", - "nullable": false - }, - "hash": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "block_hash": { - "cluster": true, - "data_type": "text", - "nullable": false - }, - "block_timestamp": { - "sort": true, - "data_type": "timestamp", - "nullable": false - }, - "chain_id": { - "data_type": "text", - "nullable": true - }, - "from": { - "data_type": "text", - "nullable": true - }, - "gas": { - "data_type": "bigint", - "nullable": true - }, - "gas_price": { - "data_type": "bigint", - "nullable": true - }, - "input": { - "data_type": "text", - "nullable": true - }, - "max_fee_per_gas": { - "data_type": "wei", - "nullable": true - }, - "max_priority_fee_per_gas": { - "data_type": "wei", - "nullable": true - }, - "nonce": { - "data_type": "bigint", - "nullable": true - }, - "r": { - "data_type": "text", - "nullable": true - }, - "s": { - "data_type": "text", - "nullable": true - }, - "status": { - "data_type": "bigint", - "nullable": true - }, - "to": { - "data_type": "text", - "nullable": true - }, - "type": { - "data_type": "text", - "nullable": true - }, - "v": { - "data_type": "bigint", - "nullable": true - }, - "value": { - "data_type": "wei", - "nullable": false - }, - "eth_value": { - "data_type": "decimal", - "nullable": true - } - }, - "parent": "blocks" - }, - "blocks__transactions__logs": { - "columns": { - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "address": { - "data_type": "text", - "nullable": false - }, - "block_timestamp": { - "sort": true, - "data_type": "timestamp", - "nullable": false - }, - "block_hash": { - "cluster": true, - "data_type": "text", - "nullable": false - }, - "block_number": { - "primary_key": true, - "data_type": "bigint", - "nullable": false - }, - "transaction_index": { - "primary_key": true, - "data_type": "bigint", - "nullable": false - }, - "log_index": { - "primary_key": true, - "data_type": "bigint", - "nullable": false - }, - "data": { - "data_type": "text", - "nullable": true - }, - "removed": { - "data_type": "bool", - "nullable": true - }, - "transaction_hash": { - "data_type": "text", - "nullable": false - } - }, - "parent": "blocks__transactions" - }, - "blocks__transactions__logs__topics": { - "columns": { - "_dlt_parent_id": { - "foreign_key": true, - "data_type": "text", - "nullable": false - }, - "_dlt_list_idx": { - "data_type": "bigint", - "nullable": false - }, - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "_dlt_root_id": { - "data_type": "text", - "nullable": false - }, - "value": { - "data_type": "text", - "nullable": true - } - }, - "parent": "blocks__transactions__logs" - }, - "blocks__transactions__access_list": { - "columns": { - "_dlt_parent_id": { - "foreign_key": true, - "data_type": "text", - "nullable": false - }, - "_dlt_list_idx": { - "data_type": "bigint", - "nullable": false - }, - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "_dlt_root_id": { - "data_type": "text", - "nullable": false - }, - "address": { - "data_type": "text", - "nullable": true - } - }, - "parent": "blocks__transactions" - }, - "blocks__transactions__access_list__storage_keys": { - "columns": { - "_dlt_parent_id": { - "foreign_key": true, - "data_type": "text", - "nullable": false - }, - "_dlt_list_idx": { - "data_type": "bigint", - "nullable": false - }, - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "_dlt_root_id": { - "data_type": "text", - "nullable": false - }, - "value": { - "data_type": "text", - "nullable": true - } - }, - "parent": "blocks__transactions__access_list" - }, - "blocks__uncles": { - "columns": { - "_dlt_parent_id": { - "foreign_key": true, - "data_type": "text", - "nullable": false - }, - "_dlt_list_idx": { - "data_type": "bigint", - "nullable": false - }, - "_dlt_id": { - "unique": true, - "data_type": "text", - "nullable": false - }, - "_dlt_root_id": { - "data_type": "text", - "nullable": false - }, - "value": { - "data_type": "text", - "nullable": true - } - }, - "parent": "blocks" - } - }, - "settings": { - "default_hints": { - "foreign_key": [ - "re:^_dlt_parent_id$" - ], - "not_null": [ - "re:^_dlt_id$", - "re:^_dlt_root_id$", - "re:^_dlt_parent_id$", - "re:^_dlt_list_idx$" - ], - "unique": [ - "re:^_dlt_id$" - ] - }, - "preferred_types": {} - }, - "normalizers": { - "names": "dlt.common.normalizers.names.snake_case", - "json": { - "module": "dlt.common.normalizers.json.relational", - "config": { - "propagation": { - "root": { - "_dlt_id": "_dlt_root_id" - } - } - } - } - } -} diff --git a/examples/schemas/ethereum_schema.yml b/examples/schemas/ethereum_schema.yml deleted file mode 100644 index 9b6acc233e..0000000000 --- a/examples/schemas/ethereum_schema.yml +++ /dev/null @@ -1,322 +0,0 @@ -version: 8 -engine_version: 3 -name: ethereum -tables: - _dlt_loads: - columns: - inserted_at: - data_type: timestamp - nullable: false - load_id: - data_type: text - nullable: false - status: - data_type: bigint - nullable: false - write_disposition: append - _dlt_version: - columns: - engine_version: - data_type: bigint - nullable: false - inserted_at: - data_type: timestamp - nullable: false - version: - data_type: bigint - nullable: false - write_disposition: append - blocks: - columns: - _dlt_load_id: - data_type: text - nullable: false - _dlt_id: - unique: true - data_type: text - nullable: false - number: - primary_key: true - data_type: bigint - nullable: false - parent_hash: - data_type: text - nullable: true - hash: - cluster: true - unique: true - data_type: text - nullable: false - base_fee_per_gas: - data_type: wei - nullable: true - difficulty: - data_type: wei - nullable: false - extra_data: - data_type: text - nullable: true - gas_limit: - data_type: bigint - nullable: false - gas_used: - data_type: bigint - nullable: false - logs_bloom: - data_type: binary - nullable: true - miner: - data_type: text - nullable: true - mix_hash: - data_type: text - nullable: true - nonce: - data_type: text - nullable: true - receipts_root: - data_type: text - nullable: true - sha3_uncles: - data_type: text - nullable: true - size: - data_type: bigint - nullable: true - state_root: - data_type: text - nullable: false - timestamp: - unique: true - sort: true - data_type: timestamp - nullable: false - total_difficulty: - data_type: wei - nullable: true - transactions_root: - data_type: text - nullable: false - write_disposition: append - blocks__transactions: - columns: - _dlt_id: - unique: true - data_type: text - nullable: false - block_number: - primary_key: true - foreign_key: true - data_type: bigint - nullable: false - transaction_index: - primary_key: true - data_type: bigint - nullable: false - hash: - unique: true - data_type: text - nullable: false - block_hash: - cluster: true - data_type: text - nullable: false - block_timestamp: - sort: true - data_type: timestamp - nullable: false - chain_id: - data_type: text - nullable: true - from: - data_type: text - nullable: true - gas: - data_type: bigint - nullable: true - gas_price: - data_type: bigint - nullable: true - input: - data_type: text - nullable: true - max_fee_per_gas: - data_type: wei - nullable: true - max_priority_fee_per_gas: - data_type: wei - nullable: true - nonce: - data_type: bigint - nullable: true - r: - data_type: text - nullable: true - s: - data_type: text - nullable: true - status: - data_type: bigint - nullable: true - to: - data_type: text - nullable: true - type: - data_type: text - nullable: true - v: - data_type: bigint - nullable: true - value: - data_type: wei - nullable: false - eth_value: - data_type: decimal - nullable: true - parent: blocks - blocks__transactions__logs: - columns: - _dlt_id: - unique: true - data_type: text - nullable: false - address: - data_type: text - nullable: false - block_timestamp: - sort: true - data_type: timestamp - nullable: false - block_hash: - cluster: true - data_type: text - nullable: false - block_number: - primary_key: true - foreign_key: true - data_type: bigint - nullable: false - transaction_index: - primary_key: true - foreign_key: true - data_type: bigint - nullable: false - log_index: - primary_key: true - data_type: bigint - nullable: false - data: - data_type: text - nullable: true - removed: - data_type: bool - nullable: true - transaction_hash: - data_type: text - nullable: false - parent: blocks__transactions - blocks__transactions__logs__topics: - columns: - _dlt_parent_id: - foreign_key: true - data_type: text - nullable: false - _dlt_list_idx: - data_type: bigint - nullable: false - _dlt_id: - unique: true - data_type: text - nullable: false - _dlt_root_id: - data_type: text - nullable: false - value: - data_type: text - nullable: true - parent: blocks__transactions__logs - blocks__transactions__access_list: - columns: - _dlt_parent_id: - foreign_key: true - data_type: text - nullable: false - _dlt_list_idx: - data_type: bigint - nullable: false - _dlt_id: - unique: true - data_type: text - nullable: false - _dlt_root_id: - data_type: text - nullable: false - address: - data_type: text - nullable: true - parent: blocks__transactions - blocks__transactions__access_list__storage_keys: - columns: - _dlt_parent_id: - foreign_key: true - data_type: text - nullable: false - _dlt_list_idx: - data_type: bigint - nullable: false - _dlt_id: - unique: true - data_type: text - nullable: false - _dlt_root_id: - data_type: text - nullable: false - value: - data_type: text - nullable: true - parent: blocks__transactions__access_list - blocks__uncles: - columns: - _dlt_parent_id: - foreign_key: true - data_type: text - nullable: false - _dlt_list_idx: - data_type: bigint - nullable: false - _dlt_id: - unique: true - data_type: text - nullable: false - _dlt_root_id: - data_type: text - nullable: false - value: - data_type: text - nullable: true - parent: blocks -settings: - default_hints: - foreign_key: - - re:^_dlt_parent_id$ - not_null: - - re:^_dlt_id$ - - re:^_dlt_root_id$ - - re:^_dlt_parent_id$ - - re:^_dlt_list_idx$ - unique: - - re:^_dlt_id$ - preferred_types: {} -normalizers: - names: dlt.common.normalizers.names.snake_case - detections: - - timestamp - - large_integer - - hexbytes_to_text - json: - module: dlt.common.normalizers.json.relational - config: - propagation: - root: - _dlt_id: _dlt_root_id - diff --git a/examples/sources/eth_source_utils.py b/examples/sources/eth_source_utils.py deleted file mode 100644 index cf371844c7..0000000000 --- a/examples/sources/eth_source_utils.py +++ /dev/null @@ -1,308 +0,0 @@ -import os -import re -import itertools -from textwrap import indent -from hexbytes import HexBytes -import requests -from typing import Any, Dict, Iterable, Iterator, List, Sequence, Type, TypedDict, Tuple, cast - -from web3 import Web3 -from web3.types import ABI, ABIElement, ABIFunction, ABIEvent, ABIFunctionParams, ABIFunctionComponents, LogReceipt, EventData -from web3._utils.abi import get_abi_input_names, get_abi_input_types, map_abi_data, get_indexed_event_inputs, normalize_event_input_types -from web3._utils.normalizers import BASE_RETURN_NORMALIZERS -from web3._utils.events import get_event_data, get_event_abi_types_for_decoding -from eth_typing import HexStr -from eth_typing.evm import ChecksumAddress -from eth_abi.codec import ABIDecoder -from eth_abi.exceptions import DecodingError -from eth_utils.abi import function_abi_to_4byte_selector, event_abi_to_log_topic - -from dlt.common import json -from dlt.common.typing import DictStrAny, StrAny, StrStr - - -class TABIInfo(TypedDict): - name: str - abi_file: str - abi: ABI - unknown_selectors: DictStrAny - file_content: StrAny - selectors: Dict[HexBytes, ABIElement] - - -class EthSigItem(TypedDict): - name: str - filtered: bool - - -def abi_to_selector(abi: ABIElement) -> HexBytes: - if abi["type"] == "event": - return HexBytes(event_abi_to_log_topic(abi)) # type: ignore - elif abi["type"] == "function": - return HexBytes(function_abi_to_4byte_selector(abi)) # type: ignore - else: - raise ValueError(abi) - - -def load_abis(w3: Web3, abi_dir: str) -> Dict[ChecksumAddress, TABIInfo]: - contracts: Dict[ChecksumAddress, TABIInfo] = {} - if abi_dir: - for abi_file in os.scandir(abi_dir): - if not abi_file.is_file(): - continue - address = w3.toChecksumAddress(os.path.basename(abi_file).split(".")[0]) - with open(abi_file, mode="r", encoding="utf-8") as f: - abi: DictStrAny = json.load(f) - # print(f"adding contract {abi['name']} with address {address}") - contracts[address] = { - "name": abi['name'], - "abi": abi["abi"], - "abi_file": abi_file.name, - "unknown_selectors": abi.setdefault("unknown_selectors", {}), - "file_content": abi, - "selectors": {abi_to_selector(a):a for a in abi["abi"] if a["type"] in ["function", "event"]} - } - return contracts - - -def save_abis(abi_dir: str, abis: Iterable[TABIInfo]) -> None: - if abi_dir: - for abi in abis: - save_path = os.path.join(abi_dir, abi["abi_file"]) - # print(f"saving {save_path}") - with open(save_path, mode="w", encoding="utf-8") as f: - json.dump(abi["file_content"], f, indent=2) - - -def maybe_update_abi(abi_info: TABIInfo, selector: HexBytes, new_abi: ABIElement, in_block: int) -> None: - add_info = { - "selector": selector.hex(), - "block": in_block - } - if not new_abi: - abi_info["unknown_selectors"][selector.hex()] = add_info - print(f"could not be found or decoded: {add_info}") - else: - print(f"found and decoded {add_info} to {new_abi['name']}") - new_abi["_dlt_meta"] = add_info # type: ignore - abi_info["abi"].append(new_abi) # type: ignore - abi_info["selectors"][selector] = new_abi - - -def signature_to_abi(sig_type: str, sig: str) -> ABIElement: - # get name and pass remainder for tokenization - name, remainder = sig.split("(", maxsplit=1) - - # simple tokenizer that yields "(" ")" "," and tokens between them. empty tokens are ignored - def tokens() -> Iterator[str]: - start = 0 - pos = 0 - while pos < len(remainder): - char = remainder[pos] - if char in ["(", ")",","]: - if pos - start > 0: - yield remainder[start:pos].strip() - yield char - start = pos = pos + 1 - else: - # move to next "," and return token - pos += 1 - - tokens_gen = tokens() - - abi: ABIElement = { - "name": name, - "type": sig_type, # type: ignore - "inputs": [], - "outputs": [] - } - - def _to_components(inputs: List[ABIFunctionComponents], param: str) -> None: - typ_: str = None - input_def: ABIFunctionParams = {} - i = 0 - - while tok := next(tokens_gen): - if tok == "(": - # step into component parsing - # tok = tok[1:] - input_def["components"] = [] - _to_components(input_def["components"], f"{param}_{i}") # type: ignore - typ_ = "tuple" - elif tok == "[]": - # mod last typ_ to be array - assert typ_ is not None - typ_ += tok - elif tok in [")",","]: - # add current type - assert typ_ is not None - input_def.update({ - "name": f"{param}_{i}", - "type": typ_ - }) - inputs.append(input_def) - if tok == ")": - # up from component parsing - return - else: - # prepare for new type - input_def = {} - typ_ = None - i += 1 - else: - typ_ = tok - _to_components(abi["inputs"], "param") # type: ignore - - return abi - - -def decode_tx(codec: ABIDecoder, abi: ABIFunction, params: HexBytes) -> DictStrAny: - names = get_abi_input_names(abi) - types = get_abi_input_types(abi) - decoded = codec.decode_abi(types, params) - normalized = map_abi_data(BASE_RETURN_NORMALIZERS, types, decoded) - - return dict(zip(names, normalized)) - - -def decode_log(codec: ABIDecoder, abi: ABIEvent, log: LogReceipt) -> EventData: - """Decodes raw log data using provided ABI. In case of missing indexes it will figure out the right combination by trying out all possibilities - - Args: - codec (ABIDecoder): ABI decoder - abi (ABIEvent): ABI of the event - log (LogReceipt): raw log data - - Raises: - ValueError: DecodeError or ValueError if no right combination of indexes could be found - - Returns: - EventData: Decoded data - - Will also add/remove indexes in `abi` - """ - log_topics = log["topics"][1:] - log_topics_abi = get_indexed_event_inputs(abi) - log_topic_normalized_inputs = normalize_event_input_types(log_topics_abi) - log_topic_types = get_event_abi_types_for_decoding(log_topic_normalized_inputs) - - if len(log_topics) != len(log_topic_types): - # we have incorrect information on topic indexes in abi so we'll recursively try to discover the right combination - for indexed_inputs in itertools.combinations(abi["inputs"], len(log_topics)): - print(f"attempt recursive decoding for {log['topics'][0].hex()}") - for input_ in abi["inputs"]: - input_["indexed"] = False - for input_ in indexed_inputs: - input_["indexed"] = True - - try: - print(abi) - # codec detects the incorrect padding, for example it does not allow any other byte to be set for uint8, just the LSB - rv: EventData = get_event_data(codec, abi, log) - print("recovered indexes in abi: ") - print(abi) - return rv - except DecodingError: - pass - - raise ValueError("None of the indexed topic combinations decoded correctly") - - return cast(EventData, get_event_data(codec, abi, log)) - - -def fetch_sig(sig_type: str, selector: HexStr) -> Sequence[EthSigItem]: - r = requests.get(f"https://sig.eth.samczsun.com/api/v1/signatures?{sig_type}={selector}") - if r.status_code >= 300: - print(f"{selector} is not known at sig.eth.samczsun.com") - r.raise_for_status() - resp = r.json() - if not resp["ok"]: - print(f"{selector} is not OK at sig.eth.samczsun.com") - r.raise_for_status() - - return resp["result"][sig_type][selector] # type: ignore - - -def fetch_sig_and_decode_log(codec: ABIDecoder, log: LogReceipt) -> Tuple[str, EventData, ABIEvent]: - topic = log["topic"] - - for sig in fetch_sig("event", HexStr(topic.hex())): - sig_name: str = sig["name"] - abi = cast(ABIEvent, signature_to_abi("event", sig_name)) - assert abi_to_selector(abi) == topic - abi["anonymous"] = False - for input_ in abi["inputs"]: - input_.setdefault("indexed", False) - - try: - return sig_name, decode_log(codec, abi, log), abi - except DecodingError: - print("Could not be decoded") - continue - - # no known signatures or nothing could be decoded - return None, None, None - - -def fetch_sig_and_decode_tx(codec: ABIDecoder, tx_input: HexBytes) -> Tuple[str, str, DictStrAny, ABIFunction]: - selector, params = tx_input[:4], tx_input[4:] - - for sig in fetch_sig("function", HexStr(selector.hex())): - sig_name: str = sig["name"] - abi = cast(ABIFunction, signature_to_abi("function", sig_name)) - assert abi_to_selector(abi) == selector - try: - return sig_name, abi["name"], decode_tx(codec, abi, params), abi - except DecodingError as ex: - print(f"Could not be decoded {ex}") - continue - - # no known signatures or nothing could be decoded - return None, None, None, None - - -def prettify_decoded(decoded: DictStrAny, abi: ABIElement) -> DictStrAny: - # TODO: detect ERC20 and format decimals properly - recode_tuples(decoded, abi) - flatten_batches(decoded, abi) - return decoded - - -def flatten_batches(decoded: DictStrAny, abi: ABIElement) -> None: - batch = [] - for input_ in abi["inputs"]: - prev_len: int = None - decoded_val = decoded.get(input_["name"]) - if decoded_val is not None: - if isinstance(decoded_val, Sequence): - if prev_len is None or prev_len == len(decoded_val): - batch.append(input_["name"]) - prev_len = len(decoded_val) - else: - # list length not equal so this is not a batch - return - else: - # not all elements are lists so this is not a batch - return - - decoded["batch"] = [{n:decoded[n][i] for n in batch} for i in range(prev_len)] - for n in batch: - del decoded[n] - - -def recode_tuples(decoded: DictStrAny, abi: ABIElement) -> None: - # replaces lists with dicts representing named tuples - - def _replace_component(decoded_component: Sequence[Any], inputs: Sequence[ABIFunctionComponents]) -> Dict[str, Any]: - for idx, i_i in enumerate(zip(decoded_component, inputs)): - item, input_ = i_i - if input_["type"] == "tuple" and isinstance(item, Sequence): - recoded = _replace_component(item, input_["components"]) - decoded_component[idx] = recoded # type: ignore - return {input_["name"]:item for item, input_ in zip(decoded_component, inputs)} - - input_: ABIFunctionParams = None - for input_ in abi["inputs"]: # type: ignore - decoded_val = decoded.get(input_["name"]) - if input_["type"] == "tuple" and isinstance(decoded_val, Sequence): - decoded[input_["name"]] = _replace_component(decoded_val, input_["components"]) diff --git a/examples/sources/ethereum.py b/examples/sources/ethereum.py deleted file mode 100644 index 829bd65d42..0000000000 --- a/examples/sources/ethereum.py +++ /dev/null @@ -1,238 +0,0 @@ -from typing import Any, Callable, Dict, Iterator, List, Tuple, Type, TypedDict, Union, cast, Sequence -from hexbytes import HexBytes -import requests - -from dlt.common.typing import DictStrAny, StrAny -from dlt.common.sources import TDeferred, TItem, defer_iterator, with_retry, with_table_name - -from dlt.pipeline.exceptions import MissingDependencyException -from examples.sources.eth_source_utils import decode_log, decode_tx, fetch_sig_and_decode_log, fetch_sig_and_decode_tx, maybe_update_abi, prettify_decoded, save_abis - -try: - # import gracefully and produce nice exception that explains the user what to do - from web3 import Web3, HTTPProvider - from web3.middleware import geth_poa_middleware - from eth_typing.evm import ChecksumAddress - from web3._utils.method_formatters import get_result_formatters - from web3._utils.rpc_abi import RPC - from web3.types import LogReceipt, EventData, ABIEvent - - from examples.sources.eth_source_utils import load_abis, TABIInfo, ABIFunction -except ImportError: - raise MissingDependencyException("Ethereum Source", ["web3"], "Web3 is a all purpose python library to interact with Ethereum-compatible blockchains.") - -# this source gets full state of any Ethereum compatible blockchain. -# - can be easily scaled up to hundreds of parallel extractions (ie. via Airflow) -# - supports retrying if node fails -# - supports multi threaded extraction on single machine via get_deferred_source -# - supports pipeline state so it may be used to just get new blocks when they are available -# - the return value is iterator (or deferred iterator) so still mappings may be used (ie. to decode transactions and logs), see example - - -def get_source(node_url: str, no_blocks: int, last_block: int = None, abi_dir: str = None, lag: int = 2, is_poa: bool = False, supports_batching: bool = True, state: DictStrAny = None) -> Iterator[DictStrAny]: - return _get_source(False, node_url, no_blocks, last_block, abi_dir, lag, is_poa, supports_batching, state) # type: ignore - - -def get_deferred_source(node_url: str, no_blocks: int, last_block: int = None, abi_dir: str = None, lag: int = 2, is_poa: bool = False, supports_batching: bool = True, state: DictStrAny = None) -> Iterator[TDeferred[DictStrAny]]: - return _get_source(True, node_url, no_blocks, last_block, abi_dir, lag, is_poa, supports_batching, state) # type: ignore - - -def _get_source(is_deferred: bool, node_url: str, no_blocks: int, last_block: int, abi_dir: str, lag: int, is_poa: bool, supports_batching: bool, state: DictStrAny = None) -> Union[Iterator[TItem], Iterator[TDeferred[DictStrAny]]]: - - # this code is run only once - headers = { - "Content-Type": "application/json", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36" - } - w3 = Web3(Web3.HTTPProvider(node_url, request_kwargs={"headers": headers})) - if is_poa: - w3.middleware_onion.inject(geth_poa_middleware, layer=0) - - # load abis from abi_dir - contracts = load_abis(w3, abi_dir) - - # last block is not provided then take the highest block from the chain - if last_block is None: - # 14788082 - last_block = w3.eth.get_block_number() - lag - - # get default current block - current_block = last_block - no_blocks + 1 - - # get current block from the state if available - if state: - current_block = state.get("ethereum_current_block", current_block) - print(f"Will continue from {current_block} to {last_block}") - else: - print(f"Will start from {current_block} to {last_block}") - - assert current_block >= 0 - if current_block > last_block: - print("Nothing to do") - return - - # get chain id - chain_id = w3.eth.chain_id - - # do not get highest block (in fact we should have 3 block lag here) - while current_block <= last_block: - # get rid of AttributeDict (web3 didn't adopt TypedDict) - print(f"requesting block {current_block}") - - @defer_iterator - @with_retry() - def _get_block_deferred() -> DictStrAny: - return _get_block(w3, current_block, chain_id, supports_batching) - - @with_retry() - def _get_block_retry() -> DictStrAny: - return _get_block(w3, current_block, chain_id, supports_batching) - - # yield deferred items or actual item values - if is_deferred: - yield _get_block_deferred() - else: - block = _get_block_retry() - yield block - yield from _decode_block(w3, block, contracts) - current_block += 1 - - # update state, it will be read and stored with the pipeline instance ONLY when whole iterator finishes - # state participates in the same atomic operation as data - # this must happen after last yield - print(f"finalizing {state}") - save_abis(abi_dir, contracts.values()) - if state is not None: - state["ethereum_current_block"] = current_block - - -def _get_block(w3: Web3, current_block: int, chain_id: int, supports_batching: bool) -> DictStrAny: - print(f"producing block {current_block}") - headers = { - "Content-Type": "application/json", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36" - } - - # get block with all transaction - block = dict(w3.eth.get_block(current_block, full_transactions=True)) - # set explicit chain id - block["chain_id"] = chain_id - # get rid of AttributeDict (web3 didn't adopt TypedDict) - attr_txs = cast(Sequence[Any], block["transactions"]) - transactions: Sequence[DictStrAny] = [dict(tx) for tx in attr_txs] - # maybe_unknown_inputs: List[str] = [] - for tx in transactions: - if "accessList" in tx and len(tx["accessList"]) > 0: - tx["accessList"] = [dict(al) for al in tx["accessList"]] - # propagate sorting and clustering info (we could also do it in unpacker: TODO:!) - tx["blockTimestamp"] = block["timestamp"] - # overwrite chain_id which is not provided in all cases - tx["chainId"] = chain_id - - block["transactions"] = transactions - block["logsBloom"] = bytes(cast(HexBytes, block["logsBloom"])) # serialize as bytes - - receipts = [] - log_formatters: Callable[..., Any] = get_result_formatters(RPC.eth_getLogs, w3.eth) # type: ignore - provider = cast(HTTPProvider, w3.provider) - rpc_endpoint_url = provider.endpoint_uri - if supports_batching: - # get transaction receipts using batching. web3 does not support batching so we must - # call node directly and then convert hex numbers to ints - batch = [] - for idx, tx in enumerate(transactions): - batch.append({ - "jsonrpc": "2.0", - "method": "eth_getTransactionReceipt", - "params": [tx["hash"]], - "id": idx - }) - - r = requests.post(rpc_endpoint_url, json=batch, timeout=(20, 12), headers=headers) - r.raise_for_status() - receipts = r.json() - else: - for idx, tx in enumerate(transactions): - r = requests.post(rpc_endpoint_url, json={ - "jsonrpc": "2.0", - "method": "eth_getTransactionReceipt", - "params": [tx["hash"]], - "id": idx - }, timeout=(20, 12), headers=headers) - r.raise_for_status() - receipts.append(r.json()) - for tx_receipt, tx in zip(receipts, transactions): - if "result" not in tx_receipt: - raise ValueError(tx_receipt) - tx_receipt = tx_receipt["result"] - assert tx_receipt["transactionHash"] == tx["hash"].hex() - tx["transactionIndex"] = tx_receipt["transactionIndex"] - tx["status"] = tx_receipt["status"] - tx["logs"] = [dict(log) for log in log_formatters(tx_receipt["logs"])] - log: LogReceipt = None - for log in tx["logs"]: - log["topic"] = log["topics"][0] - # log["blockHash"] = block["hash"] - - return block - - -def _decode_block(w3: Web3, block: StrAny, contracts: Dict[ChecksumAddress, TABIInfo]) -> Iterator[StrAny]: - transactions: Sequence[Any] = block["transactions"] - for tx in transactions: - tx_info = { - "_tx_blockNumber": tx["blockNumber"], - "_tx_blockTimestamp": tx["blockTimestamp"], - "_tx_transactionHash": tx["hash"], - "_tx_transactionIndex": tx["transactionIndex"], - "_tx_address": tx["to"], - "_tx_status": tx["status"] - } - # decode transaction - if tx["to"] in contracts: - abi_info = contracts[tx["to"]] - tx_input = HexBytes(tx["input"]) - selector = tx_input[:4] - tx_abi = cast(ABIFunction, abi_info["selectors"].get(selector)) - tx_args: DictStrAny = None - fn_name: str = None - - if tx_abi: - tx_args = decode_tx(w3.codec, tx_abi, tx_input[4:]) - fn_name = tx_abi["name"] - else: - if abi_info["unknown_selectors"].get(selector.hex()) is None: - # try to decode with an api - sig, fn_name, tx_args, tx_abi = fetch_sig_and_decode_tx(w3.codec, tx_input) - maybe_update_abi(abi_info, selector, tx_abi, block["number"]) - - if tx_args: - tx_args = with_table_name(tx_args, abi_info["name"] + "_calls_" + fn_name) - # yield arguments with reference to transaction - tx_args.update(tx_info) - yield prettify_decoded(tx_args, tx_abi) - - # decode logs - log: LogReceipt - for log in tx["logs"]: - if log["address"] in contracts: - abi_info = contracts[log["address"]] - selector = log["topic"] - event_abi = cast(ABIEvent, abi_info["selectors"].get(selector)) - event_data: EventData = None - if event_abi: - event_data = decode_log(w3.codec, event_abi, log) - else: - if abi_info["unknown_selectors"].get(selector.hex()) is None: - # try to decode with an api - sig, event_data, event_abi = fetch_sig_and_decode_log(w3.codec, log) - maybe_update_abi(abi_info, selector, event_abi, block["number"]) - - if event_data: - ev_args = with_table_name(dict(event_data["args"]), abi_info["name"] + "_logs_" + event_data["event"]) - # yield arguments with reference to transaction and log - ev_args.update(tx_info) - ev_args.update({ - "_tx_logIndex": event_data["logIndex"] - }) - yield prettify_decoded(ev_args, event_abi) diff --git a/poetry.lock b/poetry.lock index 3ea26ef6b9..7093ffffdf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -33,7 +33,7 @@ domdf-python-tools = ">=2.7.0" [[package]] name = "asttokens" -version = "2.0.5" +version = "2.0.8" description = "Annotate AST trees with source code positions" category = "dev" optional = false @@ -43,7 +43,7 @@ python-versions = "*" six = "*" [package.extras] -test = ["astroid", "pytest"] +test = ["pytest", "astroid (<=2.5.3)"] [[package]] name = "atomicwrites" @@ -99,14 +99,14 @@ yaml = ["pyyaml"] [[package]] name = "boto3" -version = "1.24.43" +version = "1.24.56" description = "The AWS SDK for Python" category = "main" optional = true python-versions = ">= 3.7" [package.dependencies] -botocore = ">=1.27.43,<1.28.0" +botocore = ">=1.27.56,<1.28.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -115,7 +115,7 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.27.43" +version = "1.27.56" description = "Low-level, data-driven core of boto 3." category = "main" optional = true @@ -127,7 +127,7 @@ python-dateutil = ">=2.1,<3.0.0" urllib3 = ">=1.25.4,<1.27" [package.extras] -crt = ["awscrt (==0.13.8)"] +crt = ["awscrt (==0.14.0)"] [[package]] name = "cachetools" @@ -158,7 +158,7 @@ pycparser = "*" [[package]] name = "charset-normalizer" -version = "2.1.0" +version = "2.1.1" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." category = "main" optional = false @@ -306,20 +306,20 @@ termcolor = "*" [[package]] name = "flake8" -version = "3.9.2" +version = "5.0.4" description = "the modular source code checker: pep8 pyflakes and co" category = "dev" optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" +python-versions = ">=3.6.1" [package.dependencies] -mccabe = ">=0.6.0,<0.7.0" -pycodestyle = ">=2.7.0,<2.8.0" -pyflakes = ">=2.3.0,<2.4.0" +mccabe = ">=0.7.0,<0.8.0" +pycodestyle = ">=2.9.0,<2.10.0" +pyflakes = ">=2.5.0,<2.6.0" [[package]] name = "flake8-bugbear" -version = "21.11.29" +version = "22.7.1" description = "A plugin for flake8 finding likely bugs and design problems in your program. Contains warnings that don't belong in pyflakes and pycodestyle." category = "dev" optional = false @@ -330,7 +330,7 @@ attrs = ">=19.2.0" flake8 = ">=3.0.0" [package.extras] -dev = ["coverage", "hypothesis", "hypothesmith (>=0.2)", "pre-commit"] +dev = ["pre-commit", "hypothesmith (>=0.2)", "hypothesis", "coverage"] [[package]] name = "flake8-builtins" @@ -437,7 +437,7 @@ grpc = ["grpcio (>=1.33.2,<2.0dev)", "grpcio-status (>=1.33.2,<2.0dev)"] [[package]] name = "google-auth" -version = "2.9.1" +version = "2.11.0" description = "Google Authentication Library" category = "main" optional = true @@ -450,10 +450,10 @@ rsa = {version = ">=3.1.4,<5", markers = "python_version >= \"3.6\""} six = ">=1.9.0" [package.extras] -aiohttp = ["requests (>=2.20.0,<3.0.0dev)", "aiohttp (>=3.6.2,<4.0.0dev)"] -enterprise_cert = ["cryptography (==36.0.2)", "pyopenssl (==22.0.0)"] -pyopenssl = ["pyopenssl (>=20.0.0)"] reauth = ["pyu2f (>=0.1.5)"] +pyopenssl = ["pyopenssl (>=20.0.0)"] +enterprise_cert = ["pyopenssl (==22.0.0)", "cryptography (==36.0.2)"] +aiohttp = ["aiohttp (>=3.6.2,<4.0.0dev)", "requests (>=2.20.0,<3.0.0dev)"] [[package]] name = "google-cloud-bigquery" @@ -486,7 +486,7 @@ tqdm = ["tqdm (>=4.7.4,<5.0.0dev)"] [[package]] name = "google-cloud-bigquery-storage" -version = "2.14.1" +version = "2.14.2" description = "BigQuery Storage API API client library" category = "main" optional = true @@ -494,14 +494,13 @@ python-versions = ">=3.7" [package.dependencies] google-api-core = {version = ">=1.32.0,<2.0.0 || >=2.8.0,<3.0.0dev", extras = ["grpc"]} -proto-plus = ">=1.18.0,<2.0.0dev" -protobuf = ">=3.19.0,<4.0.0dev" +proto-plus = ">=1.22.0,<2.0.0dev" +protobuf = ">=3.19.0,<5.0.0dev" [package.extras] -fastavro = ["fastavro (>=0.21.2)"] -pandas = ["pandas (>=0.21.1)"] pyarrow = ["pyarrow (>=0.15.0)"] -tests = ["freezegun"] +pandas = ["pandas (>=0.21.1)"] +fastavro = ["fastavro (>=0.21.2)"] [[package]] name = "google-cloud-core" @@ -587,17 +586,17 @@ protobuf = ">=3.6.0" [[package]] name = "hexbytes" -version = "0.2.2" +version = "0.2.3" description = "hexbytes: Python `bytes` subclass that decodes hex, with a readable console output" category = "main" optional = false python-versions = ">=3.6, <4" [package.extras] -dev = ["Sphinx (>=1.6.5,<2)", "bumpversion (>=0.5.3,<1)", "eth-utils (>=1.0.1,<2)", "flake8 (==3.7.9)", "hypothesis (>=3.44.24,<4)", "ipython", "isort (>=4.2.15,<5)", "mypy (==0.770)", "pydocstyle (>=5.0.0,<6)", "pytest-watch (>=4.1.0,<5)", "pytest-xdist", "pytest (==5.4.1)", "sphinx-rtd-theme (>=0.1.9,<1)", "towncrier (>=19.2.0,<20)", "tox (==3.14.6)", "twine", "wheel"] -doc = ["Sphinx (>=1.6.5,<2)", "sphinx-rtd-theme (>=0.1.9,<1)", "towncrier (>=19.2.0,<20)"] -lint = ["flake8 (==3.7.9)", "isort (>=4.2.15,<5)", "mypy (==0.770)", "pydocstyle (>=5.0.0,<6)"] -test = ["eth-utils (>=1.0.1,<2)", "hypothesis (>=3.44.24,<4)", "pytest-xdist", "pytest (==5.4.1)", "tox (==3.14.6)"] +test = ["eth-utils (>=1.0.1,<2)", "hypothesis (>=3.44.24,<=6.31.6)", "tox (>=3.25.1,<4)", "pytest-xdist", "pytest (>=7,<8)"] +lint = ["black (>=22,<23)", "pydocstyle (>=5.0.0,<6)", "mypy (==0.971)", "isort (>=4.2.15,<5)", "flake8 (==3.7.9)"] +doc = ["jinja2 (>=3.0.0,<3.1.0)", "towncrier (>=21,<22)", "sphinx-rtd-theme (>=0.1.9,<1)", "Sphinx (>=1.6.5,<2)"] +dev = ["jinja2 (>=3.0.0,<3.1.0)", "towncrier (>=21,<22)", "sphinx-rtd-theme (>=0.1.9,<1)", "Sphinx (>=1.6.5,<2)", "black (>=22,<23)", "pydocstyle (>=5.0.0,<6)", "mypy (==0.971)", "isort (>=4.2.15,<5)", "flake8 (==3.7.9)", "eth-utils (>=1.0.1,<2)", "hypothesis (>=3.44.24,<=6.31.6)", "tox (>=3.25.1,<4)", "pytest-xdist", "pytest (>=7,<8)", "ipython", "twine", "wheel", "pytest-watch (>=4.1.0,<5)", "bumpversion (>=0.5.3,<1)"] [[package]] name = "hologram" @@ -770,11 +769,11 @@ typing-extensions = "*" [[package]] name = "mccabe" -version = "0.6.1" +version = "0.7.0" description = "McCabe checker, plugin for flake8" category = "dev" optional = false -python-versions = "*" +python-versions = ">=3.6" [[package]] name = "minimal-snowplow-tracker" @@ -798,7 +797,7 @@ python-versions = "*" [[package]] name = "mypy" -version = "0.960" +version = "0.971" description = "Optional static typing for Python" category = "dev" optional = false @@ -810,9 +809,9 @@ tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} typing-extensions = ">=3.10" [package.extras] -dmypy = ["psutil (>=4.0)"] -python2 = ["typed-ast (>=1.4.0,<2)"] reports = ["lxml"] +python2 = ["typed-ast (>=1.4.0,<2)"] +dmypy = ["psutil (>=4.0)"] [[package]] name = "mypy-extensions" @@ -851,7 +850,7 @@ test = ["pytest (>=7.1)", "pytest-cov (>=3.0)", "codecov (>=2.1)"] [[package]] name = "numpy" -version = "1.23.1" +version = "1.23.2" description = "NumPy is the fundamental package for array computing with Python." category = "main" optional = true @@ -881,7 +880,7 @@ future = "*" [[package]] name = "pbr" -version = "5.9.0" +version = "5.10.0" description = "Python Build Reasonableness" category = "dev" optional = false @@ -924,7 +923,7 @@ twisted = ["twisted"] [[package]] name = "proto-plus" -version = "1.20.6" +version = "1.22.0" description = "Beautiful, Pythonic protocol buffers." category = "main" optional = true @@ -992,11 +991,11 @@ pyasn1 = ">=0.4.6,<0.5.0" [[package]] name = "pycodestyle" -version = "2.7.0" +version = "2.9.1" description = "Python style guide checker" category = "dev" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.6" [[package]] name = "pycparser" @@ -1008,11 +1007,11 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "pyflakes" -version = "2.3.1" +version = "2.5.0" description = "passive checker of Python programs" category = "dev" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.6" [[package]] name = "pyparsing" @@ -1138,7 +1137,7 @@ python-versions = "*" [[package]] name = "pytz" -version = "2022.1" +version = "2022.2.1" description = "World timezone definitions, modern and historical" category = "main" optional = true @@ -1224,7 +1223,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "sentry-sdk" -version = "1.9.0" +version = "1.9.5" description = "Python client for Sentry (https://sentry.io)" category = "main" optional = false @@ -1232,7 +1231,10 @@ python-versions = "*" [package.dependencies] certifi = "*" -urllib3 = ">=1.10.0" +urllib3 = [ + {version = ">=1.26.9", markers = "python_version >= \"3.5\""}, + {version = ">=1.26.11", markers = "python_version >= \"3.6\""}, +] [package.extras] aiohttp = ["aiohttp (>=3.5)"] @@ -1331,7 +1333,7 @@ python-versions = ">=3.7" [[package]] name = "tomlkit" -version = "0.11.3" +version = "0.11.4" description = "Style preserving TOML library" category = "main" optional = false @@ -1371,7 +1373,7 @@ python-versions = "*" [[package]] name = "types-requests" -version = "2.28.7" +version = "2.28.9" description = "Typing stubs for requests" category = "dev" optional = false @@ -1390,7 +1392,7 @@ python-versions = "*" [[package]] name = "types-urllib3" -version = "1.26.20" +version = "1.26.23" description = "Typing stubs for urllib3" category = "dev" optional = false @@ -1406,7 +1408,7 @@ python-versions = "*" [[package]] name = "tzdata" -version = "2022.1" +version = "2022.2" description = "Provider of IANA time zone data" category = "main" optional = false @@ -1458,7 +1460,7 @@ redshift = ["psycopg2-binary"] [metadata] lock-version = "1.1" python-versions = "^3.8,<3.11" -content-hash = "11e680ca7b04ac6a81bbcdaa0de674c117636db96f46cda91f462bae30516749" +content-hash = "b6770943cb6b247e4a84f7ae9aafb450dae688dbfea7e8752a8f6c01831a9f35" [metadata.files] agate = [ @@ -1469,10 +1471,7 @@ astatine = [ {file = "astatine-0.3.2-py3-none-any.whl", hash = "sha256:63ce5c4c94b7fb5e21abf4ab20c0f2ac1f10237ec935dcf0041188b60fb2bdd1"}, {file = "astatine-0.3.2.tar.gz", hash = "sha256:5511b1d8ad36284410c4bfa45353492ad19a4698f61f226aee90d446aa2e91ae"}, ] -asttokens = [ - {file = "asttokens-2.0.5-py2.py3-none-any.whl", hash = "sha256:0844691e88552595a6f4a4281a9f7f79b8dd45ca4ccea82e5e05b4bbdb76705c"}, - {file = "asttokens-2.0.5.tar.gz", hash = "sha256:9a54c114f02c7a9480d56550932546a3f1fe71d8a02f1bc7ccd0ee3ee35cf4d5"}, -] +asttokens = [] atomicwrites = [] attrs = [] babel = [ @@ -1559,10 +1558,7 @@ cffi = [ {file = "cffi-1.15.1-cp39-cp39-win_amd64.whl", hash = "sha256:70df4e3b545a17496c9b3f41f5115e69a4f2e77e94e1d2a8e1070bc0c38c8a3c"}, {file = "cffi-1.15.1.tar.gz", hash = "sha256:d400bfb9a37b1351253cb402671cea7e89bdecc294e8016a707f6d1d8ac934f9"}, ] -charset-normalizer = [ - {file = "charset-normalizer-2.1.0.tar.gz", hash = "sha256:575e708016ff3a5e3681541cb9d79312c416835686d054a23accb873b254f413"}, - {file = "charset_normalizer-2.1.0-py3-none-any.whl", hash = "sha256:5189b6f22b01957427f35b6a08d9a0bc45b46d3788ef5a92e978433c7a35f8a5"}, -] +charset-normalizer = [] click = [ {file = "click-8.1.3-py3-none-any.whl", hash = "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"}, {file = "click-8.1.3.tar.gz", hash = "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e"}, @@ -1614,14 +1610,8 @@ domdf-python-tools = [ {file = "domdf_python_tools-3.3.0.tar.gz", hash = "sha256:7c588cdfa3e8bfd9617efbb4fa42652b7010c0f158c81844c862fbda5d86d9cc"}, ] fire = [] -flake8 = [ - {file = "flake8-3.9.2-py2.py3-none-any.whl", hash = "sha256:bf8fd333346d844f616e8d47905ef3a3384edae6b4e9beb0c5101e25e3110907"}, - {file = "flake8-3.9.2.tar.gz", hash = "sha256:07528381786f2a6237b061f6e96610a4167b226cb926e2aa2b6b1d78057c576b"}, -] -flake8-bugbear = [ - {file = "flake8-bugbear-21.11.29.tar.gz", hash = "sha256:8b04cb2fafc6a78e1a9d873bd3988e4282f7959bb6b0d7c1ae648ec09b937a7b"}, - {file = "flake8_bugbear-21.11.29-py36.py37.py38-none-any.whl", hash = "sha256:179e41ddae5de5e3c20d1f61736feeb234e70958fbb56ab3c28a67739c8e9a82"}, -] +flake8 = [] +flake8-bugbear = [] flake8-builtins = [ {file = "flake8-builtins-1.5.3.tar.gz", hash = "sha256:09998853b2405e98e61d2ff3027c47033adbdc17f9fe44ca58443d876eb00f3b"}, {file = "flake8_builtins-1.5.3-py2.py3-none-any.whl", hash = "sha256:7706babee43879320376861897e5d1468e396a40b8918ed7bccf70e5f90b8687"}, @@ -1760,10 +1750,7 @@ grpcio-status = [ {file = "grpcio-status-1.43.0.tar.gz", hash = "sha256:21759006f36a7ffbff187d4191f4118c072d8aa9fa6823a11aad7842a3c6ccd0"}, {file = "grpcio_status-1.43.0-py3-none-any.whl", hash = "sha256:9036b24f5769adafdc3e91d9434c20e9ede0b30f50cc6bff105c0f414bb9e0e0"}, ] -hexbytes = [ - {file = "hexbytes-0.2.2-py3-none-any.whl", hash = "sha256:ef53c37ea9f316fff86fcb1df057b4c6ba454da348083e972031bbf7bc9c3acc"}, - {file = "hexbytes-0.2.2.tar.gz", hash = "sha256:a5881304d186e87578fb263a85317c808cf130e1d4b3d37d30142ab0f7898d03"}, -] +hexbytes = [] hologram = [ {file = "hologram-0.0.14-py3-none-any.whl", hash = "sha256:2911b59115bebd0504eb089532e494fa22ac704989afe41371c5361780433bfe"}, {file = "hologram-0.0.14.tar.gz", hash = "sha256:fd67bd069e4681e1d2a447df976c65060d7a90fee7f6b84d133fd9958db074ec"}, @@ -1898,10 +1885,7 @@ mashumaro = [ {file = "mashumaro-2.9-py3-none-any.whl", hash = "sha256:f616df410d82936b8bb2b4d32af570556685d77f49acf4228134b50230a69799"}, {file = "mashumaro-2.9.tar.gz", hash = "sha256:343b6e2d3e432e31973688c4c8821dcd6ef41fd33264b992afc4aecbfd155f18"}, ] -mccabe = [ - {file = "mccabe-0.6.1-py2.py3-none-any.whl", hash = "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42"}, - {file = "mccabe-0.6.1.tar.gz", hash = "sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"}, -] +mccabe = [] minimal-snowplow-tracker = [ {file = "minimal-snowplow-tracker-0.0.2.tar.gz", hash = "sha256:acabf7572db0e7f5cbf6983d495eef54081f71be392330eb3aadb9ccb39daaa4"}, ] @@ -1959,31 +1943,7 @@ msgpack = [ {file = "msgpack-1.0.4-cp39-cp39-win_amd64.whl", hash = "sha256:4d5834a2a48965a349da1c5a79760d94a1a0172fbb5ab6b5b33cbf8447e109ce"}, {file = "msgpack-1.0.4.tar.gz", hash = "sha256:f5d869c18f030202eb412f08b28d2afeea553d6613aee89e200d7aca7ef01f5f"}, ] -mypy = [ - {file = "mypy-0.960-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3a3e525cd76c2c4f90f1449fd034ba21fcca68050ff7c8397bb7dd25dd8b8248"}, - {file = "mypy-0.960-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7a76dc4f91e92db119b1be293892df8379b08fd31795bb44e0ff84256d34c251"}, - {file = "mypy-0.960-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ffdad80a92c100d1b0fe3d3cf1a4724136029a29afe8566404c0146747114382"}, - {file = "mypy-0.960-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:7d390248ec07fa344b9f365e6ed9d205bd0205e485c555bed37c4235c868e9d5"}, - {file = "mypy-0.960-cp310-cp310-win_amd64.whl", hash = "sha256:925aa84369a07846b7f3b8556ccade1f371aa554f2bd4fb31cb97a24b73b036e"}, - {file = "mypy-0.960-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:239d6b2242d6c7f5822163ee082ef7a28ee02e7ac86c35593ef923796826a385"}, - {file = "mypy-0.960-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f1ba54d440d4feee49d8768ea952137316d454b15301c44403db3f2cb51af024"}, - {file = "mypy-0.960-cp36-cp36m-win_amd64.whl", hash = "sha256:cb7752b24528c118a7403ee955b6a578bfcf5879d5ee91790667c8ea511d2085"}, - {file = "mypy-0.960-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:826a2917c275e2ee05b7c7b736c1e6549a35b7ea5a198ca457f8c2ebea2cbecf"}, - {file = "mypy-0.960-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:3eabcbd2525f295da322dff8175258f3fc4c3eb53f6d1929644ef4d99b92e72d"}, - {file = "mypy-0.960-cp37-cp37m-win_amd64.whl", hash = "sha256:f47322796c412271f5aea48381a528a613f33e0a115452d03ae35d673e6064f8"}, - {file = "mypy-0.960-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:2c7f8bb9619290836a4e167e2ef1f2cf14d70e0bc36c04441e41487456561409"}, - {file = "mypy-0.960-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:fbfb873cf2b8d8c3c513367febde932e061a5f73f762896826ba06391d932b2a"}, - {file = "mypy-0.960-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:cc537885891382e08129d9862553b3d00d4be3eb15b8cae9e2466452f52b0117"}, - {file = "mypy-0.960-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:481f98c6b24383188c928f33dd2f0776690807e12e9989dd0419edd5c74aa53b"}, - {file = "mypy-0.960-cp38-cp38-win_amd64.whl", hash = "sha256:29dc94d9215c3eb80ac3c2ad29d0c22628accfb060348fd23d73abe3ace6c10d"}, - {file = "mypy-0.960-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:33d53a232bb79057f33332dbbb6393e68acbcb776d2f571ba4b1d50a2c8ba873"}, - {file = "mypy-0.960-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8d645e9e7f7a5da3ec3bbcc314ebb9bb22c7ce39e70367830eb3c08d0140b9ce"}, - {file = "mypy-0.960-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:85cf2b14d32b61db24ade8ac9ae7691bdfc572a403e3cb8537da936e74713275"}, - {file = "mypy-0.960-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:a85a20b43fa69efc0b955eba1db435e2ffecb1ca695fe359768e0503b91ea89f"}, - {file = "mypy-0.960-cp39-cp39-win_amd64.whl", hash = "sha256:0ebfb3f414204b98c06791af37a3a96772203da60636e2897408517fcfeee7a8"}, - {file = "mypy-0.960-py3-none-any.whl", hash = "sha256:bfd4f6536bd384c27c392a8b8f790fd0ed5c0cf2f63fc2fed7bce56751d53026"}, - {file = "mypy-0.960.tar.gz", hash = "sha256:d4fccf04c1acf750babd74252e0f2db6bd2ac3aa8fe960797d9f3ef41cf2bfd4"}, -] +mypy = [] mypy-extensions = [ {file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"}, {file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"}, @@ -2002,10 +1962,7 @@ parsedatetime = [ {file = "parsedatetime-2.4-py2-none-any.whl", hash = "sha256:9ee3529454bf35c40a77115f5a596771e59e1aee8c53306f346c461b8e913094"}, {file = "parsedatetime-2.4.tar.gz", hash = "sha256:3d817c58fb9570d1eec1dd46fa9448cd644eeed4fb612684b02dfda3a79cb84b"}, ] -pbr = [ - {file = "pbr-5.9.0-py2.py3-none-any.whl", hash = "sha256:e547125940bcc052856ded43be8e101f63828c2d94239ffbe2b327ba3d5ccf0a"}, - {file = "pbr-5.9.0.tar.gz", hash = "sha256:e8dca2f4b43560edef58813969f52a56cef023146cbb8931626db80e6c1c4308"}, -] +pbr = [] pendulum = [ {file = "pendulum-2.1.2-cp27-cp27m-macosx_10_15_x86_64.whl", hash = "sha256:b6c352f4bd32dff1ea7066bd31ad0f71f8d8100b9ff709fb343f3b86cee43efe"}, {file = "pendulum-2.1.2-cp27-cp27m-win_amd64.whl", hash = "sha256:318f72f62e8e23cd6660dbafe1e346950281a9aed144b5c596b2ddabc1d19739"}, @@ -2037,10 +1994,7 @@ prometheus-client = [ {file = "prometheus_client-0.11.0-py2.py3-none-any.whl", hash = "sha256:b014bc76815eb1399da8ce5fc84b7717a3e63652b0c0f8804092c9363acab1b2"}, {file = "prometheus_client-0.11.0.tar.gz", hash = "sha256:3a8baade6cb80bcfe43297e33e7623f3118d660d41387593758e2fb1ea173a86"}, ] -proto-plus = [ - {file = "proto-plus-1.20.6.tar.gz", hash = "sha256:449b4537e83f4776bd69051c4d776db8ffe3f9d0641f1e87b06c116eb94c90e9"}, - {file = "proto_plus-1.20.6-py3-none-any.whl", hash = "sha256:c6c43c3fcfc360fdab46b47e2e9e805ff56e13169f9f2e45caf88b6b593215ab"}, -] +proto-plus = [] protobuf = [ {file = "protobuf-3.20.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3cc797c9d15d7689ed507b165cd05913acb992d78b379f6014e013f9ecb20996"}, {file = "protobuf-3.20.1-cp310-cp310-manylinux2014_aarch64.whl", hash = "sha256:ff8d8fa42675249bb456f5db06c00de6c2f4c27a065955917b28c4f15978b9c3"}, @@ -2160,18 +2114,12 @@ pyasn1-modules = [ {file = "pyasn1_modules-0.2.8-py3.6.egg", hash = "sha256:cbac4bc38d117f2a49aeedec4407d23e8866ea4ac27ff2cf7fb3e5b570df19e0"}, {file = "pyasn1_modules-0.2.8-py3.7.egg", hash = "sha256:c29a5e5cc7a3f05926aff34e097e84f8589cd790ce0ed41b67aed6857b26aafd"}, ] -pycodestyle = [ - {file = "pycodestyle-2.7.0-py2.py3-none-any.whl", hash = "sha256:514f76d918fcc0b55c6680472f0a37970994e07bbb80725808c17089be302068"}, - {file = "pycodestyle-2.7.0.tar.gz", hash = "sha256:c389c1d06bf7904078ca03399a4816f974a1d590090fecea0c63ec26ebaf1cef"}, -] +pycodestyle = [] pycparser = [ {file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"}, {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, ] -pyflakes = [ - {file = "pyflakes-2.3.1-py2.py3-none-any.whl", hash = "sha256:7893783d01b8a89811dd72d7dfd4d84ff098e5eed95cfa8905b22bbffe52efc3"}, - {file = "pyflakes-2.3.1.tar.gz", hash = "sha256:f5bc8ecabc05bb9d291eb5203d6810b49040f6ff446a756326104746cc00c1db"}, -] +pyflakes = [] pyparsing = [ {file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"}, {file = "pyparsing-3.0.9.tar.gz", hash = "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb"}, @@ -2231,10 +2179,7 @@ pytimeparse = [ {file = "pytimeparse-1.1.8-py2.py3-none-any.whl", hash = "sha256:04b7be6cc8bd9f5647a6325444926c3ac34ee6bc7e69da4367ba282f076036bd"}, {file = "pytimeparse-1.1.8.tar.gz", hash = "sha256:e86136477be924d7e670646a98561957e8ca7308d44841e21f5ddea757556a0a"}, ] -pytz = [ - {file = "pytz-2022.1-py2.py3-none-any.whl", hash = "sha256:e68985985296d9a66a881eb3193b0906246245294a881e7c8afe623866ac6a5c"}, - {file = "pytz-2022.1.tar.gz", hash = "sha256:1e760e2fe6a8163bc0b3d9a19c4f84342afa0a2affebfaa84b01b978a02ecaa7"}, -] +pytz = [] pytzdata = [ {file = "pytzdata-2020.1-py2.py3-none-any.whl", hash = "sha256:e1e14750bcf95016381e4d472bad004eef710f2d6417240904070b3d6654485f"}, {file = "pytzdata-2020.1.tar.gz", hash = "sha256:3efa13b335a00a8de1d345ae41ec78dd11c9f8807f522d39850f2dd828681540"}, diff --git a/pyproject.toml b/pyproject.toml index e7ab850943..4fc8808654 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "python-dlt" -version = "0.1.0rc11" +version = "0.1.0rc12" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["ScaleVector