Skip to content

Commit

Permalink
sql_database: uses full reflection by default (#525)
Browse files Browse the repository at this point in the history
* skips NULL columns in arrow tables inferred from the data

* uses full reflection level by default
  • Loading branch information
rudolfix authored Jul 8, 2024
1 parent 14c0711 commit 2a26ee1
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 93 deletions.
72 changes: 0 additions & 72 deletions sources/sql_database/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,75 +203,3 @@ No issues found. Postgres is the only backend where we observed 2x speedup with
## Learn more
💡 To explore additional customizations for this pipeline, we recommend referring to the official DLT SQL Database verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the DLT SQL Database documentation in [Setup Guide: SQL Database.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database)
## Extended configuration
You are able to configure most of the arguments to `sql_database` and `sql_table` via toml files and environment variables. This is particularly useful with `sql_table`
because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one.):
```toml
[sources.sql_database]
credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server"
```
```toml
[sources.sql_database.chat_message]
backend="pandas"
chunk_size=1000
[sources.sql_database.chat_message.incremental]
cursor_path="updated_at"
```
Example above will setup **backend** and **chunk_size** for a table with name **chat_message**. It will also enable incremental loading on a column named **updated_at**.
Table resource is instantiated as follows:
```python
table = sql_table(table="chat_message", schema="data")
```
Similarly, you can configure `sql_database` source.
```toml
[sources.sql_database]
credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server"
[sources.sql_database]
schema="data"
backend="pandas"
chunk_size=1000
[sources.sql_database.chat_message.incremental]
cursor_path="updated_at"
```
Note that we are able to configure incremental loading per table, even if it is a part of a dlt source. Source below will extract data using **pandas** backend
with **chunk_size** 1000. **chat_message** table will load data incrementally using **updated_at** column. All other tables will load fully.
```python
database = sql_database()
```
You can configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). You can use environment variables [by translating the names properly](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables) ie.
```sh
SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server"
SOURCES__SQL_DATABASE__BACKEND=pandas
SOURCES__SQL_DATABASE__CHUNK_SIZE=1000
SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at
```
### Configuring incremental loading
`dlt.sources.incremental` class is a [config spec](https://dlthub.com/docs/general-usage/credentials/config_specs) and can be configured like any other spec, here's an example that sets all possible options:
```toml
[sources.sql_database.chat_message.incremental]
cursor_path="updated_at"
initial_value=2024-05-27T07:32:00Z
end_value=2024-05-28T07:32:00Z
row_order="asc"
allow_external_schedulers=false
```
Please note that we specify date times in **toml** as initial and end value. For env variables only strings are currently supported.
### Use SqlAlchemy Engine as credentials
You are able to pass an instance of **SqlAlchemy** `Engine` instance instead of credentials:
```python
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://[email protected]:4497/Rfam")
table = sql_table(engine, table="chat_message", schema="data")
```
Engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources.
12 changes: 6 additions & 6 deletions sources/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def sql_database(
chunk_size: int = 50000,
backend: TableBackend = "sqlalchemy",
detect_precision_hints: Optional[bool] = False,
reflection_level: Optional[ReflectionLevel] = "minimal",
reflection_level: Optional[ReflectionLevel] = "full",
defer_table_reflect: Optional[bool] = None,
table_adapter_callback: Callable[[Table], None] = None,
backend_kwargs: Dict[str, Any] = None,
Expand All @@ -60,8 +60,8 @@ def sql_database(
detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables.
This is disabled by default.
reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema.
"minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. This is the default option.
"full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary.
"minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data.
"full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option.
"full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types.
defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Requires table_names to be explicitly passed.
Enable this option when running on Airflow. Available on dlt 0.4.4 and later.
Expand Down Expand Up @@ -134,7 +134,7 @@ def sql_table(
chunk_size: int = 50000,
backend: TableBackend = "sqlalchemy",
detect_precision_hints: Optional[bool] = None,
reflection_level: Optional[ReflectionLevel] = "minimal",
reflection_level: Optional[ReflectionLevel] = "full",
defer_table_reflect: Optional[bool] = None,
table_adapter_callback: Callable[[Table], None] = None,
backend_kwargs: Dict[str, Any] = None,
Expand All @@ -156,8 +156,8 @@ def sql_table(
"sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types,
"connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself.
reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema.
"minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. This is the default option.
"full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary.
"minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data.
"full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option.
"full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types.
detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables.
This is disabled by default.
Expand Down
13 changes: 9 additions & 4 deletions sources/sql_database/arrow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ def row_tuples_to_arrow(
for key in list(columnar_unknown_types):
arrow_col: Optional[pa.Array] = None
try:
arrow_col = columnar_known_types[key] = pa.array(
columnar_unknown_types[key]
)
arrow_col = pa.array(columnar_unknown_types[key])
if pa.types.is_null(arrow_col.type):
logger.warning(
f"Column {key} contains only NULL values and data type could not be inferred. This column is removed from a arrow table"
)
continue

except pa.ArrowInvalid as e:
# Try coercing types not supported by arrow to a json friendly format
# E.g. dataclasses -> dict, UUID -> str
try:
arrow_col = columnar_known_types[key] = pa.array(
arrow_col = pa.array(
map_nested_in_place(
custom_encode, list(columnar_unknown_types[key])
)
Expand All @@ -113,6 +117,7 @@ def row_tuples_to_arrow(
f"Column {key} contains a data type which is not supported by pyarrow. This column will be ignored. Error: {e}"
)
if arrow_col is not None:
columnar_known_types[key] = arrow_col
new_schema_fields.append(
pa.field(
key,
Expand Down
2 changes: 1 addition & 1 deletion sources/sql_database/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,4 @@ class SqlTableResourceConfiguration(BaseConfiguration):
backend: TableBackend = "sqlalchemy"
detect_precision_hints: Optional[bool] = None
defer_table_reflect: Optional[bool] = False
reflection_level: Optional[ReflectionLevel] = "minimal"
reflection_level: Optional[ReflectionLevel] = "full"
7 changes: 4 additions & 3 deletions tests/sql_database/sql_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ def _make_precision_table(table_name: str, nullable: bool) -> Table:
SELECT
cm.id,
cm.content,
cm.created_at,
cm.updated_at,
cm.created_at as _created_at,
cm.updated_at as _updated_at,
au.email as user_email,
au.display_name as user_display_name,
cc.name as channel_name
cc.name as channel_name,
CAST(NULL as TIMESTAMP) as _null_ts
FROM {self.schema}.chat_message cm
JOIN {self.schema}.app_user au ON cm.user_id = au.id
JOIN {self.schema}.chat_channel cc ON cm.channel_id = cc.id
Expand Down
4 changes: 2 additions & 2 deletions tests/sql_database/test_arrow_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from datetime import datetime, timezone, date # noqa: I251
from uuid import uuid4
from dataclasses import dataclass
from sqlalchemy.dialects.postgresql import Range

import pytest
import pyarrow as pa
Expand All @@ -13,6 +11,8 @@
def test_row_tuples_to_arrow_unknown_types(all_unknown: bool) -> None:
"""Test inferring data types with pyarrow"""

from sqlalchemy.dialects.postgresql import Range

# Applies to NUMRANGE, DATERANGE, etc sql types. Sqlalchemy returns a Range dataclass
IntRange = Range[int]

Expand Down
26 changes: 21 additions & 5 deletions tests/sql_database/test_sql_database_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def test_load_sql_schema_loads_all_tables(
credentials=sql_source_db.credentials,
schema=sql_source_db.schema,
backend=backend,
reflection_level="minimal",
type_adapter_callback=default_test_callback(destination_name, backend),
)

Expand Down Expand Up @@ -204,6 +205,7 @@ def test_load_sql_schema_loads_all_tables_parallel(
credentials=sql_source_db.credentials,
schema=sql_source_db.schema,
backend=backend,
reflection_level="minimal",
type_adapter_callback=default_test_callback(destination_name, backend),
).parallelize()

Expand Down Expand Up @@ -235,6 +237,7 @@ def test_load_sql_table_names(
credentials=sql_source_db.credentials,
schema=sql_source_db.schema,
table_names=tables,
reflection_level="minimal",
backend=backend,
)
)
Expand Down Expand Up @@ -263,6 +266,7 @@ def make_source():
credentials=sql_source_db.credentials,
schema=sql_source_db.schema,
table_names=tables,
reflection_level="minimal",
backend=backend,
)

Expand Down Expand Up @@ -304,6 +308,7 @@ def test_load_mysql_data_load(destination_name: str, backend: TableBackend) -> N
credentials="mysql+pymysql://[email protected]:4497/Rfam",
table="family",
backend=backend,
reflection_level="minimal",
backend_kwargs=backend_kwargs,
# table_adapter_callback=_double_as_decimal_adapter,
)
Expand All @@ -318,6 +323,7 @@ def test_load_mysql_data_load(destination_name: str, backend: TableBackend) -> N
credentials="mysql+pymysql://[email protected]:4497/Rfam",
table="family",
backend=backend,
reflection_level="minimal",
# we also try to remove dialect automatically
backend_kwargs={},
# table_adapter_callback=_double_as_decimal_adapter,
Expand All @@ -341,6 +347,7 @@ def sql_table_source() -> List[DltResource]:
credentials=sql_source_db.credentials,
schema=sql_source_db.schema,
table="chat_message",
reflection_level="minimal",
backend=backend,
)
]
Expand All @@ -365,6 +372,7 @@ def sql_table_source() -> List[DltResource]:
schema=sql_source_db.schema,
table="chat_message",
incremental=dlt.sources.incremental("updated_at"),
reflection_level="minimal",
backend=backend,
)
]
Expand Down Expand Up @@ -395,6 +403,7 @@ def sql_table_source() -> List[DltResource]:
"updated_at",
sql_source_db.table_infos["chat_message"]["created_at"].start_value,
),
reflection_level="minimal",
backend=backend,
)
]
Expand Down Expand Up @@ -941,17 +950,24 @@ def test_sql_table_from_view(
credentials=sql_source_db.credentials,
table="chat_message_view",
schema=sql_source_db.schema,
backend=backend,
# use minimal level so we infer types from DATA
reflection_level="minimal",
incremental=dlt.sources.incremental("_created_at")
)

pipeline = make_pipeline("duckdb")
pipeline.run(table)
info = pipeline.run(table)
assert_load_info(info)

assert_row_counts(pipeline, sql_source_db, ["chat_message_view"])
assert "content" in pipeline.default_schema.tables["chat_message_view"]["columns"]
assert (
"content"
in load_tables_to_dicts(pipeline, "chat_message_view")["chat_message_view"][0]
)
assert "_created_at" in pipeline.default_schema.tables["chat_message_view"]["columns"]
db_data = load_tables_to_dicts(pipeline, "chat_message_view")["chat_message_view"]
assert "content" in db_data[0]
assert "_created_at" in db_data[0]
# make sure that all NULLs is not present
assert "_null_ts" in pipeline.default_schema.tables["chat_message_view"]["columns"]


@pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"])
Expand Down

0 comments on commit 2a26ee1

Please sign in to comment.