From 2a26ee1cd414cb80e8a66a8ad2941dc61a397df0 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Mon, 8 Jul 2024 22:54:55 +0200 Subject: [PATCH] sql_database: uses full reflection by default (#525) * skips NULL columns in arrow tables inferred from the data * uses full reflection level by default --- sources/sql_database/README.md | 72 ------------------- sources/sql_database/__init__.py | 12 ++-- sources/sql_database/arrow_helpers.py | 13 ++-- sources/sql_database/helpers.py | 2 +- tests/sql_database/sql_source.py | 7 +- tests/sql_database/test_arrow_helpers.py | 4 +- .../sql_database/test_sql_database_source.py | 26 +++++-- 7 files changed, 43 insertions(+), 93 deletions(-) diff --git a/sources/sql_database/README.md b/sources/sql_database/README.md index c06ca906f..dfa4b5e16 100644 --- a/sources/sql_database/README.md +++ b/sources/sql_database/README.md @@ -203,75 +203,3 @@ No issues found. Postgres is the only backend where we observed 2x speedup with ## Learn more 💡 To explore additional customizations for this pipeline, we recommend referring to the official DLT SQL Database verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the DLT SQL Database documentation in [Setup Guide: SQL Database.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database) - -## Extended configuration -You are able to configure most of the arguments to `sql_database` and `sql_table` via toml files and environment variables. This is particularly useful with `sql_table` -because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one.): -```toml -[sources.sql_database] -credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -```toml -[sources.sql_database.chat_message] -backend="pandas" -chunk_size=1000 - -[sources.sql_database.chat_message.incremental] -cursor_path="updated_at" -``` -Example above will setup **backend** and **chunk_size** for a table with name **chat_message**. It will also enable incremental loading on a column named **updated_at**. -Table resource is instantiated as follows: -```python -table = sql_table(table="chat_message", schema="data") -``` - -Similarly, you can configure `sql_database` source. -```toml -[sources.sql_database] -credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" - -[sources.sql_database] -schema="data" -backend="pandas" -chunk_size=1000 - -[sources.sql_database.chat_message.incremental] -cursor_path="updated_at" -``` -Note that we are able to configure incremental loading per table, even if it is a part of a dlt source. Source below will extract data using **pandas** backend -with **chunk_size** 1000. **chat_message** table will load data incrementally using **updated_at** column. All other tables will load fully. -```python -database = sql_database() -``` - -You can configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). You can use environment variables [by translating the names properly](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables) ie. -```sh -SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -SOURCES__SQL_DATABASE__BACKEND=pandas -SOURCES__SQL_DATABASE__CHUNK_SIZE=1000 -SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at -``` - -### Configuring incremental loading -`dlt.sources.incremental` class is a [config spec](https://dlthub.com/docs/general-usage/credentials/config_specs) and can be configured like any other spec, here's an example that sets all possible options: -```toml -[sources.sql_database.chat_message.incremental] -cursor_path="updated_at" -initial_value=2024-05-27T07:32:00Z -end_value=2024-05-28T07:32:00Z -row_order="asc" -allow_external_schedulers=false -``` -Please note that we specify date times in **toml** as initial and end value. For env variables only strings are currently supported. - - -### Use SqlAlchemy Engine as credentials -You are able to pass an instance of **SqlAlchemy** `Engine` instance instead of credentials: -```python -from sqlalchemy import create_engine - -engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") -table = sql_table(engine, table="chat_message", schema="data") -``` -Engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. diff --git a/sources/sql_database/__init__.py b/sources/sql_database/__init__.py index 4e0d7c5f7..8d28e3b0e 100644 --- a/sources/sql_database/__init__.py +++ b/sources/sql_database/__init__.py @@ -36,7 +36,7 @@ def sql_database( chunk_size: int = 50000, backend: TableBackend = "sqlalchemy", detect_precision_hints: Optional[bool] = False, - reflection_level: Optional[ReflectionLevel] = "minimal", + reflection_level: Optional[ReflectionLevel] = "full", defer_table_reflect: Optional[bool] = None, table_adapter_callback: Callable[[Table], None] = None, backend_kwargs: Dict[str, Any] = None, @@ -60,8 +60,8 @@ def sql_database( detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. This is disabled by default. reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. - "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. This is the default option. - "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. + "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. + "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Requires table_names to be explicitly passed. Enable this option when running on Airflow. Available on dlt 0.4.4 and later. @@ -134,7 +134,7 @@ def sql_table( chunk_size: int = 50000, backend: TableBackend = "sqlalchemy", detect_precision_hints: Optional[bool] = None, - reflection_level: Optional[ReflectionLevel] = "minimal", + reflection_level: Optional[ReflectionLevel] = "full", defer_table_reflect: Optional[bool] = None, table_adapter_callback: Callable[[Table], None] = None, backend_kwargs: Dict[str, Any] = None, @@ -156,8 +156,8 @@ def sql_table( "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. - "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. This is the default option. - "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. + "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. + "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. This is disabled by default. diff --git a/sources/sql_database/arrow_helpers.py b/sources/sql_database/arrow_helpers.py index 79e69ab4c..674ce5eff 100644 --- a/sources/sql_database/arrow_helpers.py +++ b/sources/sql_database/arrow_helpers.py @@ -93,14 +93,18 @@ def row_tuples_to_arrow( for key in list(columnar_unknown_types): arrow_col: Optional[pa.Array] = None try: - arrow_col = columnar_known_types[key] = pa.array( - columnar_unknown_types[key] - ) + arrow_col = pa.array(columnar_unknown_types[key]) + if pa.types.is_null(arrow_col.type): + logger.warning( + f"Column {key} contains only NULL values and data type could not be inferred. This column is removed from a arrow table" + ) + continue + except pa.ArrowInvalid as e: # Try coercing types not supported by arrow to a json friendly format # E.g. dataclasses -> dict, UUID -> str try: - arrow_col = columnar_known_types[key] = pa.array( + arrow_col = pa.array( map_nested_in_place( custom_encode, list(columnar_unknown_types[key]) ) @@ -113,6 +117,7 @@ def row_tuples_to_arrow( f"Column {key} contains a data type which is not supported by pyarrow. This column will be ignored. Error: {e}" ) if arrow_col is not None: + columnar_known_types[key] = arrow_col new_schema_fields.append( pa.field( key, diff --git a/sources/sql_database/helpers.py b/sources/sql_database/helpers.py index b38d973c7..11d0e1634 100644 --- a/sources/sql_database/helpers.py +++ b/sources/sql_database/helpers.py @@ -282,4 +282,4 @@ class SqlTableResourceConfiguration(BaseConfiguration): backend: TableBackend = "sqlalchemy" detect_precision_hints: Optional[bool] = None defer_table_reflect: Optional[bool] = False - reflection_level: Optional[ReflectionLevel] = "minimal" + reflection_level: Optional[ReflectionLevel] = "full" diff --git a/tests/sql_database/sql_source.py b/tests/sql_database/sql_source.py index e3b628e3d..80227ff2d 100644 --- a/tests/sql_database/sql_source.py +++ b/tests/sql_database/sql_source.py @@ -186,11 +186,12 @@ def _make_precision_table(table_name: str, nullable: bool) -> Table: SELECT cm.id, cm.content, - cm.created_at, - cm.updated_at, + cm.created_at as _created_at, + cm.updated_at as _updated_at, au.email as user_email, au.display_name as user_display_name, - cc.name as channel_name + cc.name as channel_name, + CAST(NULL as TIMESTAMP) as _null_ts FROM {self.schema}.chat_message cm JOIN {self.schema}.app_user au ON cm.user_id = au.id JOIN {self.schema}.chat_channel cc ON cm.channel_id = cc.id diff --git a/tests/sql_database/test_arrow_helpers.py b/tests/sql_database/test_arrow_helpers.py index 5e61c5e7d..98326acca 100644 --- a/tests/sql_database/test_arrow_helpers.py +++ b/tests/sql_database/test_arrow_helpers.py @@ -1,7 +1,5 @@ from datetime import datetime, timezone, date # noqa: I251 from uuid import uuid4 -from dataclasses import dataclass -from sqlalchemy.dialects.postgresql import Range import pytest import pyarrow as pa @@ -13,6 +11,8 @@ def test_row_tuples_to_arrow_unknown_types(all_unknown: bool) -> None: """Test inferring data types with pyarrow""" + from sqlalchemy.dialects.postgresql import Range + # Applies to NUMRANGE, DATERANGE, etc sql types. Sqlalchemy returns a Range dataclass IntRange = Range[int] diff --git a/tests/sql_database/test_sql_database_source.py b/tests/sql_database/test_sql_database_source.py index ea6354a37..63743a5c7 100644 --- a/tests/sql_database/test_sql_database_source.py +++ b/tests/sql_database/test_sql_database_source.py @@ -171,6 +171,7 @@ def test_load_sql_schema_loads_all_tables( credentials=sql_source_db.credentials, schema=sql_source_db.schema, backend=backend, + reflection_level="minimal", type_adapter_callback=default_test_callback(destination_name, backend), ) @@ -204,6 +205,7 @@ def test_load_sql_schema_loads_all_tables_parallel( credentials=sql_source_db.credentials, schema=sql_source_db.schema, backend=backend, + reflection_level="minimal", type_adapter_callback=default_test_callback(destination_name, backend), ).parallelize() @@ -235,6 +237,7 @@ def test_load_sql_table_names( credentials=sql_source_db.credentials, schema=sql_source_db.schema, table_names=tables, + reflection_level="minimal", backend=backend, ) ) @@ -263,6 +266,7 @@ def make_source(): credentials=sql_source_db.credentials, schema=sql_source_db.schema, table_names=tables, + reflection_level="minimal", backend=backend, ) @@ -304,6 +308,7 @@ def test_load_mysql_data_load(destination_name: str, backend: TableBackend) -> N credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", table="family", backend=backend, + reflection_level="minimal", backend_kwargs=backend_kwargs, # table_adapter_callback=_double_as_decimal_adapter, ) @@ -318,6 +323,7 @@ def test_load_mysql_data_load(destination_name: str, backend: TableBackend) -> N credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", table="family", backend=backend, + reflection_level="minimal", # we also try to remove dialect automatically backend_kwargs={}, # table_adapter_callback=_double_as_decimal_adapter, @@ -341,6 +347,7 @@ def sql_table_source() -> List[DltResource]: credentials=sql_source_db.credentials, schema=sql_source_db.schema, table="chat_message", + reflection_level="minimal", backend=backend, ) ] @@ -365,6 +372,7 @@ def sql_table_source() -> List[DltResource]: schema=sql_source_db.schema, table="chat_message", incremental=dlt.sources.incremental("updated_at"), + reflection_level="minimal", backend=backend, ) ] @@ -395,6 +403,7 @@ def sql_table_source() -> List[DltResource]: "updated_at", sql_source_db.table_infos["chat_message"]["created_at"].start_value, ), + reflection_level="minimal", backend=backend, ) ] @@ -941,17 +950,24 @@ def test_sql_table_from_view( credentials=sql_source_db.credentials, table="chat_message_view", schema=sql_source_db.schema, + backend=backend, + # use minimal level so we infer types from DATA + reflection_level="minimal", + incremental=dlt.sources.incremental("_created_at") ) pipeline = make_pipeline("duckdb") - pipeline.run(table) + info = pipeline.run(table) + assert_load_info(info) assert_row_counts(pipeline, sql_source_db, ["chat_message_view"]) assert "content" in pipeline.default_schema.tables["chat_message_view"]["columns"] - assert ( - "content" - in load_tables_to_dicts(pipeline, "chat_message_view")["chat_message_view"][0] - ) + assert "_created_at" in pipeline.default_schema.tables["chat_message_view"]["columns"] + db_data = load_tables_to_dicts(pipeline, "chat_message_view")["chat_message_view"] + assert "content" in db_data[0] + assert "_created_at" in db_data[0] + # make sure that all NULLs is not present + assert "_null_ts" in pipeline.default_schema.tables["chat_message_view"]["columns"] @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"])