Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding pg_legacy_replication verified source using decoderbufs #589

Open
wants to merge 84 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 78 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
59e7557
fix: finally got pg_replication tests working as is
Sep 17, 2024
79220b7
feat: got decoderbufs to run and compile in docker
Sep 17, 2024
9de0835
chore: updated protobuf to latest compatible version
Sep 17, 2024
75a0f7f
chore: copying all files from pg_replication; format-lint is reformat…
Sep 18, 2024
73704af
wip: saving work
Sep 18, 2024
7d1b8e7
wip: saving work
Oct 1, 2024
ecbf98d
wip: saving work
Oct 2, 2024
3ed14da
wip: removed all references to publications
Oct 3, 2024
9fe0301
fix: applied suggested changes mentioned here https://github.com/dlt-…
Oct 8, 2024
197ba82
wip: saving work
Oct 8, 2024
c897ee0
wip: finally got snapshot to work
Oct 9, 2024
d303c04
chore: simply cleaning up
Oct 9, 2024
6566fe4
chore: need to find a better way to clean up the underlying engine
Oct 9, 2024
70d40a0
wip: handling begin/commit
Oct 11, 2024
f703431
wip: saving work
Oct 11, 2024
f001633
wip: saving work
Oct 11, 2024
c0df7c9
wip: saving work
Oct 14, 2024
aa464d5
wip: saving work
Oct 17, 2024
db09568
wip: making progress
Oct 17, 2024
c3c0518
wip: saving work
Oct 19, 2024
a5b1a87
refactor: some test refactoring
Oct 19, 2024
7fad621
wip: saving work
Oct 20, 2024
fbc65bc
wip: saving work
Oct 20, 2024
1299b60
wip: cleaning up + refactor
Oct 21, 2024
f44853b
wip: cleaning up + refactor
Oct 21, 2024
46200ca
wip: cleaning up + refactor
Oct 21, 2024
f0f0146
wip: slowly progressing
Oct 22, 2024
cd8d906
wip: all tests pass now to update docs and cleanup
Oct 22, 2024
02851f4
wip: still trying to get it work with all versions of dlt
Oct 22, 2024
beef6ea
wip
Oct 22, 2024
77242e8
wip: changing signature
Oct 23, 2024
f9cdf78
wip: finally got rid of those errors
Oct 23, 2024
327b44c
wip: correcting failing tests
Oct 23, 2024
a9a9bb7
wip: fixed working examples
Oct 23, 2024
37acc35
wip: more refactoring now docs... -_-
Oct 23, 2024
a90acee
wip: cleaning up
Oct 23, 2024
f927f13
wip: cleaning up
Oct 23, 2024
cc7ad61
wip: attempting to refactor to use dlt resources
Oct 24, 2024
f9c7694
wip: second test passing
Oct 24, 2024
927ae03
wip: all tests pass again now for refactoring
Oct 25, 2024
3f31752
wip: init_replication is now a dlt source
Oct 25, 2024
637a6e9
wip: more refactoring
Oct 25, 2024
8a8134b
wip: saving work until I can get hinting to work
Oct 25, 2024
ee3cb9c
wip: finally got something somewhat working
Oct 26, 2024
1727456
wip: done with coding now docs
Oct 27, 2024
81fdce8
fix: various performance improvements
Oct 28, 2024
8fbfc62
fix: minor corrections to handle old versions of postgres
Oct 28, 2024
fd4638b
fix: small type corrections for pg9.6
Oct 29, 2024
526eff3
fix: exposing table options for later arrow support
Oct 29, 2024
2f5ad15
wip: saving work for arrow
Oct 30, 2024
32063e2
wip: first test with arrow passing
Oct 30, 2024
28f463d
wip: almost done passing all tests
Oct 30, 2024
385e8a6
wip: some arrow tests are still not passing
Oct 30, 2024
a291b69
fix: done with pyarrow; too many issues with duckdb atm
Oct 31, 2024
ba23505
wip: some bug fixes
Oct 31, 2024
5993fb4
wip: small refactoring
Nov 3, 2024
6db693a
wip: duckdb needs patching, trying out new max_lsn
Nov 3, 2024
c53c9f9
wip: some refactoring of options to make certain features togglable
Nov 14, 2024
ba1c3fc
wip: lsn and deleted ts are optional
Nov 15, 2024
6b960df
feat: added optional transaction id
Nov 16, 2024
9fa9d98
feat: added optional commit timestamp
Nov 17, 2024
1947029
fix: never handled missing type and added text oid mapping
Nov 18, 2024
7a7ba30
fix: added some logging and bug fixes
Nov 20, 2024
a752581
chore: basic refactoring
Nov 27, 2024
4184ca9
fix: minor corrections
Dec 16, 2024
3c7232f
chore: reverting back to prev state
Dec 16, 2024
c8f1ad2
chore: rebasing 1.x branch onto my own
Dec 16, 2024
7024ce7
fix: corrected bug regarding column names
Dec 19, 2024
63b1de0
chore: minor fixes
Dec 19, 2024
e8b2a0c
chore: small perf fixes and aligning with more adt
Dec 20, 2024
4c33129
chore: refactoring and cleaning
Dec 20, 2024
0b7c151
chore: finished docstrings
Dec 22, 2024
ec72e36
bugfix: misuse of defaultdict
Dec 22, 2024
ecc6089
Finally done with docs
Dec 23, 2024
dd5a63b
fix: wasn't able to execute local tests without these settings
Dec 30, 2024
d377423
feat: added basic support for scalar array types
Jan 14, 2025
acdf446
chore: slight perf improvments for pg_arrays
Jan 15, 2025
a3dc99d
fix: it turns out pg_arrays are annoying found temp workaround
Jan 16, 2025
c9c5bcb
refactor: all sqlalchemy event code is done at engine configuration
Jan 22, 2025
d695afb
chore: bumped python to 3.9; small refactorings
Jan 22, 2025
8f45283
refactor: init_replication is now in pkg ns
Jan 22, 2025
41f8ded
fix: corrected bugs regarding inferring nullability wrong; refactored…
Jan 28, 2025
129b18a
fix: rolling back on managing conn lifecycle using context mgrs: it d…
Jan 29, 2025
9083611
fix: corrected regression with occasional datum_missinng values
Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 146 additions & 17 deletions poetry.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pytest-mock = "^3.12.0"
twisted = "22.10.0"
pytest-forked = "^1.6.0"
pendulum = "^3.0.0"
types-protobuf = "^5.27.0.20240907"
pytest-cov = "^5.0.0"
mypy-protobuf = "^3.6.0"

[tool.poetry.group.sql_database.dependencies]
sqlalchemy = ">=1.4"
Expand All @@ -54,6 +57,11 @@ connectorx = ">=0.3.1"
[tool.poetry.group.pg_replication.dependencies]
psycopg2-binary = ">=2.9.9"

[tool.poetry.group.pg_legacy_replication.dependencies]
protobuf = ">=4.25"
psycopg2-binary = ">=2.9.9"
sqlalchemy = ">=1.4"

[tool.poetry.group.google_sheets.dependencies]
google-api-python-client = "^2.78.0"

Expand Down Expand Up @@ -116,4 +124,4 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
include = '.*py$'
include = '.*py$'
6 changes: 5 additions & 1 deletion sources/.dlt/example.secrets.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ location = "US"
### Sources
[sources]

# local postgres
helpers.credentials="postgresql://loader:loader@localhost:5432/dlt_data"
pg_legacy_replication.credentials="postgresql://loader:loader@localhost:5432/dlt_data"

## chess pipeline
# the section below defines secrets for "chess_dlt_config_example" source in chess/__init__.py
[sources.chess]
secret_str="secret string" # a string secret
secret_str="secret string" # a string secret
130 changes: 130 additions & 0 deletions sources/pg_legacy_replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Postgres legacy replication
[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes
in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the optional `decoderbufs`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean that decoderbufs is optional? if not present, we decode on the client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heavy lifting is done by the decoderbufs extension, which must be added if using a managed postgres like Cloud SQL, or compiled and installed on a self hosted postgres installation.

Detailed instructions can be found here : https://debezium.io/documentation/reference/stable/postgres-plugins.html#logical-decoding-output-plugin-installation

FYI decoderbufs is the default logical replication plugin used by Debezium.

[output plugin](https://github.com/debezium/postgres-decoderbufs), which is a shared library which must be built or enabled.

| Source | Description |
|---------------------|-------------------------------------------------|
| replication_source | Load published messages from a replication slot |

## Install decoderbufs

Instructions can be found [here](https://github.com/debezium/postgres-decoderbufs?tab=readme-ov-file#building)

Below is an example installation in a docker image:
```Dockerfile
FROM postgres:14

# Install dependencies required to build decoderbufs
RUN apt-get update
RUN apt-get install -f -y \
software-properties-common \
build-essential \
pkg-config \
git

RUN apt-get install -f -y \
postgresql-server-dev-14 \
libprotobuf-c-dev && \
rm -rf /var/lib/apt/lists/*

ARG decoderbufs_version=v1.7.0.Final
RUN git clone https://github.com/debezium/postgres-decoderbufs -b $decoderbufs_version --single-branch && \
cd postgres-decoderbufs && \
make && make install && \
cd .. && \
rm -rf postgres-decoderbufs
```

## Initialize the pipeline

```bash
$ dlt init pg_legacy_replication duckdb
```

This uses `duckdb` as destination, but you can choose any of the supported [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Set up user

The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned:

```sql
CREATE ROLE replication_user WITH LOGIN REPLICATION;
```

It also needs various read only privileges on the database (by first connecting to the database):

```sql
\connect dlt_data
GRANT USAGE ON SCHEMA schema_name TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
```

## Add credentials
1. Open `.dlt/secrets.toml`.
2. Enter your Postgres credentials:

```toml
[sources.pg_legacy_replication]
credentials="postgresql://replication_user:<<password>>@localhost:5432/dlt_data"
```
3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Run the pipeline

1. Install the necessary dependencies by running the following command:

```bash
pip install -r requirements.txt
```

1. Now the pipeline can be run by using the command:

```bash
python pg_legacy_replication_pipeline.py
```

1. To make sure that everything is loaded as expected, use the command:

```bash
dlt pipeline pg_replication_pipeline show
```

# Differences between `pg_legacy_replication` and `pg_replication`

## Overview

`pg_legacy_replication` is a fork of the verified `pg_replication` source. The primary goal of this fork is to provide logical replication capabilities for Postgres instances running versions
earlier than 10, when the `pgoutput` plugin was not yet available. This fork draws inspiration from the original `pg_replication` source and the `decoderbufs` library,
which is actively maintained by Debezium.

## Key Differences from `pg_replication`

### Replication User Ownership Requirements
One of the limitations of native Postgre replication is that the replication user must **own** the tables in order to add them to a **publication**.
Additionally, once a table is added to a publication, it cannot be removed, requiring the creation of a new replication slot, which results in the loss of any state tracking.

### Limitations in `pg_replication`
The current pg_replication implementation has several limitations:
- It supports only a single initial snapshot of the data.
- It requires `CREATE` access to the source database in order to perform the initial snapshot.
- **Superuser** access is required to replicate entire Postgres schemas.
While the `pg_legacy_replication` source theoretically reads the entire WAL across all schemas, the current implementation using dlt transformers restricts this functionality.
In practice, this has not been a common use case.
- The implementation is opinionated in its approach to data transfer. Specifically, when updates or deletes are required, it defaults to a `merge` write disposition,
which replicates live data without tracking changes over time.

### Features of `pg_legacy_replication`

This fork of `pg_replication` addresses the aforementioned limitations and introduces the following improvements:
- Adheres to the dlt philosophy by treating the WAL as an upstream resources. This replication stream is then transformed into various DLT resources, with customizable options for write disposition,
file formats, type hints, etc., specified at the resource level rather than at the source level.
- Supports an initial snapshot of all tables using the transaction slot isolation level. Additionally, ad-hoc snapshots can be performed using the serializable deferred isolation level,
similar to `pg_dump`.
- Emphasizes the use of `pyarrow` and parquet formats for efficient data storage and transfer. A dedicated backend has been implemented to support these formats.
- Replication messages are decoded using Protocol Buffers (protobufs) in C, rather than relying on native Python byte buffer parsing. This ensures greater efficiency and performance.

## Next steps
- Add support for the [wal2json](https://github.com/eulerto/wal2json) replication plugin. This is particularly important for environments such as **Amazon RDS**, which supports `wal2json`,
- as opposed to on-premise or Google Cloud SQL instances that support `decoderbufs`.
140 changes: 140 additions & 0 deletions sources/pg_legacy_replication/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Replicates postgres tables in batch using logical decoding."""

from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Sequence, Union

import dlt
from dlt.extract import DltResource
from dlt.extract.items import TDataItem
from dlt.sources.credentials import ConnectionStringCredentials
from collections import defaultdict

from .helpers import (
BackendHandler,
ItemGenerator,
ReplicationOptions,
advance_slot,
cleanup_snapshot_resources,
get_max_lsn,
init_replication,
)


@dlt.source
def replication_source(
slot_name: str,
schema: str,
table_names: Union[str, Sequence[str]],
credentials: ConnectionStringCredentials = dlt.secrets.value,
repl_options: Optional[Mapping[str, ReplicationOptions]] = None,
target_batch_size: int = 1000,
flush_slot: bool = True,
) -> Iterable[DltResource]:
"""
Defines a dlt source for replicating Postgres tables using logical replication.
This source reads from a replication slot and pipes the changes using transformers.

- Relies on a replication slot that publishes DML operations (i.e. `insert`, `update`, and `delete`).
- Maintains LSN of last consumed message in state to track progress.
- At start of the run, advances the slot upto last consumed message in previous run (for pg>10 only)
- Processes in batches to limit memory usage.

Args:
slot_name (str):
The name of the logical replication slot used to fetch WAL changes.
schema (str):
Name of the schema to replicate tables from.
table_names (Union[str, Sequence[str]]):
The name(s) of the tables to replicate. Can be a single table name or a list of table names.
credentials (ConnectionStringCredentials):
Database credentials for connecting to the Postgres instance.
repl_options (Optional[Mapping[str, ReplicationOptions]], optional):
A mapping of table names to `ReplicationOptions`, allowing for fine-grained control over
replication behavior for each table.

Each `ReplicationOptions` dictionary can include the following keys:
- `backend` (Optional[TableBackend]): Specifies the backend to use for table replication.
- `backend_kwargs` (Optional[Mapping[str, Any]]): Additional configuration options for the backend.
- `column_hints` (Optional[TTableSchemaColumns]): A dictionary of hints for column types or properties.
- `include_lsn` (Optional[bool]): Whether to include the LSN (Log Sequence Number)
in the replicated data. Defaults to `True`.
- `include_deleted_ts` (Optional[bool]): Whether to include a timestamp for deleted rows.
Defaults to `True`.
- `include_commit_ts` (Optional[bool]): Whether to include the commit timestamp of each change.
- `include_tx_id` (Optional[bool]): Whether to include the transaction ID of each change.
- `included_columns` (Optional[Set[str]]): A set of specific columns to include in the replication.
If not specified, all columns are included.
target_batch_size (int, optional):
The target size of each batch of replicated data items. Defaults to `1000`.
flush_slot (bool, optional):
If `True`, advances the replication slot to the last processed LSN
to prevent replaying already replicated changes. Defaults to `True`.

Yields:
Iterable[DltResource]:
A collection of `DltResource` objects, each corresponding to a table being replicated.

Notes:
- The `repl_options` parameter allows fine-tuning of replication behavior, such as column filtering
or write disposition configuration, per table.
- The replication process is incremental, ensuring only new changes are processed after the last commit LSN.
"""
table_names = [table_names] if isinstance(table_names, str) else table_names or []
repl_options = defaultdict(lambda: ReplicationOptions(), repl_options or {})

@dlt.resource(name=lambda args: args["slot_name"], standalone=True)
def replication_resource(slot_name: str) -> Iterable[TDataItem]:
# start where we left off in previous run
start_lsn = dlt.current.resource_state().get("last_commit_lsn", 0)
if flush_slot:
advance_slot(start_lsn, slot_name, credentials)

# continue until last message in replication slot
upto_lsn = get_max_lsn(credentials)
if upto_lsn is None:
return

table_qnames = {f"{schema}.{table_name}" for table_name in table_names}

# generate items in batches
while True:
gen = ItemGenerator(
credentials=credentials,
slot_name=slot_name,
table_qnames=table_qnames,
upto_lsn=upto_lsn,
start_lsn=start_lsn,
repl_options=repl_options,
target_batch_size=target_batch_size,
)
yield from gen
if gen.generated_all:
dlt.current.resource_state()["last_commit_lsn"] = gen.last_commit_lsn
break
start_lsn = gen.last_commit_lsn

wal_reader = replication_resource(slot_name)

for table in table_names:
yield dlt.transformer(
_create_table_dispatch(table, repl_options=repl_options[table]),
data_from=wal_reader,
name=table,
)


def _create_table_dispatch(
table: str, repl_options: ReplicationOptions
) -> Callable[[TDataItem], Any]:
"""Creates a dispatch handler that processes data items based on a specified table and optional column hints."""
handler = BackendHandler(table, repl_options)
# FIXME Uhhh.. why do I have to do this?
handler.__qualname__ = "BackendHandler.__call__" # type: ignore[attr-defined]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! why? is dlt checking this somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have no idea why this is required here is the output with the line commented

~/repos/github/neuromantik33/verified-sources $ poetry run pytest tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality 
============================= test session starts ==============================
platform linux -- Python 3.11.11, pytest-7.4.2, pluggy-1.3.0 -- /home/drnick/repos/github/neuromantik33/verified-sources/.venv/bin/python
cachedir: .pytest_cache
rootdir: /home/drnick/repos/github/neuromantik33/verified-sources
configfile: pytest.ini
plugins: mock-3.12.0, requests-mock-1.11.0, cov-5.0.0, forked-1.6.0, anyio-4.0.0
collecting ... collected 2 items

tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb] FAILED [ 50%]
tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb] FAILED [100%]

=================================== FAILURES ===================================
__________________ test_core_functionality[sqlalchemy-duckdb] __________________

src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96cf51f3d0>, 'test_slot_a72aac18')
destination_name = 'duckdb', backend = 'sqlalchemy'

    @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
    @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow"])
    def test_core_functionality(
        src_config: Tuple[dlt.Pipeline, str], destination_name: str, backend: TableBackend
    ) -> None:
        @dlt.resource(write_disposition="merge", primary_key="id_x")
        def tbl_x(data):
            yield data
    
        @dlt.resource(write_disposition="merge", primary_key="id_y")
        def tbl_y(data):
            yield data
    
        src_pl, slot_name = src_config
    
        src_pl.run(
            [
                tbl_x({"id_x": 1, "val_x": "foo"}),
                tbl_y({"id_y": 1, "val_y": True}),
            ]
        )
        add_pk(src_pl.sql_client, "tbl_x", "id_x")
        add_pk(src_pl.sql_client, "tbl_y", "id_y")
    
        snapshots = init_replication(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            take_snapshots=True,
            table_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )
    
>       changes = replication_source(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            repl_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )

backend    = 'sqlalchemy'
destination_name = 'duckdb'
slot_name  = 'test_slot_a72aac18'
snapshots  = <dlt.extract.source.DltSource object at 0x7f96ceb9fd50>
src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96cf51f3d0>, 'test_slot_a72aac18')
src_pl     = <dlt.pipeline.pipeline.Pipeline object at 0x7f96cf51f3d0>
tbl_x      = <dlt.extract.resource.DltResource object at 0x7f96d8ba9d50>
tbl_y      = <dlt.extract.resource.DltResource object at 0x7f96da95bdd0>

tests/pg_legacy_replication/test_pg_replication.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:195: in __call__
    source = self._deco_f(*args, **kwargs)
        args       = ()
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'sqlalchemy'}, 'tbl_y': {'backend': 'sqlalchemy'}}, 'schema': 'src_pl_dataset_202501221235482995', 'slot_name': 'test_slot_a72aac18', 'table_names': ('tbl_x', 'tbl_y')}
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:280: in _wrap
    return _eval_rv(rv, schema_copy)
        _eval_rv   = <function DltSourceFactoryWrapper._wrap.<locals>._eval_rv at 0x7f96d773fba0>
        _make_schema = <function DltSourceFactoryWrapper._wrap.<locals>._make_schema at 0x7f96cf55c540>
        args       = ()
        conf_f     = <function replication_source at 0x7f96cf55c4a0>
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'sqlalchemy'}, 'tbl_y': {'backend': 'sqlalchemy'}}, 'schema': 'src_pl_dataset_202501221235482995', 'slot_name': 'test_slot_a72aac18', 'table_names': ('tbl_x', 'tbl_y')}
        pipeline_name = 'src_pl'
        proxy      = <dlt.common.pipeline.PipelineContext object at 0x7f96ceb05d10>
        rv         = <generator object replication_source at 0x7f96cd7182c0>
        schema_copy = Schema replication_source at 140285669523216
        source_sections = ('sources', 'pg_legacy_replication', 'replication_source')
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:240: in _eval_rv
    _rv = list(_rv)
        _rv        = <generator object replication_source at 0x7f96cd7182c0>
        schema_copy = Schema replication_source at 140285669523216
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
        source_section = 'pg_legacy_replication'
sources/pg_legacy_replication/__init__.py:118: in replication_source
    yield dlt.transformer(
        credentials = <dlt.common.configuration.specs.connection_string_credentials.ConnectionStringCredentials object at 0x7f96cd7ffc50>
        flush_slot = True
        repl_options = defaultdict(<function replication_source.<locals>.<lambda> at 0x7f96cd740180>, {'tbl_x': {'backend': 'sqlalchemy'}, 'tbl_y': {'backend': 'sqlalchemy'}})
        replication_resource = <function replication_source.<locals>.replication_resource at 0x7f96cd7ea700>
        schema     = 'src_pl_dataset_202501221235482995'
        slot_name  = 'test_slot_a72aac18'
        table      = 'tbl_x'
        table_names = ('tbl_x', 'tbl_y')
        target_batch_size = 1000
        wal_reader = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:958: in transformer
    return resource(  # type: ignore
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
        f          = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})
        file_format = None
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        write_disposition = None
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:758: in resource
    return decorator(data)
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data       = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
        decorator  = <function resource.<locals>.decorator at 0x7f96cd7ea980>
        file_format = None
        make_resource = <function resource.<locals>.make_resource at 0x7f96cd7ea840>
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        references = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cd7ea8e0>
        write_disposition = None
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

f = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})

    def decorator(
        f: Callable[TResourceFunParams, Any]
    ) -> Callable[TResourceFunParams, TDltResourceImpl]:
        if not callable(f):
            if data_from:
                # raise more descriptive exception if we construct transformer
                raise InvalidTransformerDataTypeGeneratorFunctionRequired(
                    name or "<no name>", f, type(f)
                )
            raise ResourceFunctionExpected(name or "<no name>", f, type(f))
        if not standalone and callable(name):
            raise DynamicNameNotStandaloneResource(get_callable_name(f))
    
        resource_name = name if name and not callable(name) else get_callable_name(f)
    
        func_module = inspect.getmodule(f)
        source_section = _get_source_section_name(func_module)
        is_inner_resource = is_inner_callable(f)
    
        if spec is None:
            # autodetect spec
            SPEC, resolvable_fields = spec_from_signature(
                f, inspect.signature(f), include_defaults=standalone
            )
            if is_inner_resource and not standalone:
                if len(resolvable_fields) > 0:
                    # prevent required arguments to inner functions that are not standalone
                    raise ResourceInnerCallableConfigWrapDisallowed(resource_name, source_section)
                else:
                    # empty spec for inner functions - they should not be injected
                    SPEC = BaseConfiguration
        else:
            SPEC = spec
        # assign spec to "f"
        set_fun_spec(f, SPEC)
    
        # register non inner resources as source with single resource in it
        if not is_inner_resource:
            # a source function for the source wrapper, args that go to source are forwarded
            # to a single resource within
            def _source(
                name_ovr: str, section_ovr: str, args: Tuple[Any, ...], kwargs: Dict[str, Any]
            ) -> TDltResourceImpl:
                return wrap_standalone(name_ovr or resource_name, section_ovr or source_section, f)(
                    *args, **kwargs
                )
    
            # make the source module same as original resource
>           _source.__qualname__ = f.__qualname__
E           AttributeError: 'BackendHandler' object has no attribute '__qualname__'

SPEC       = <class 'sources.pg_legacy_replication.helpers.BackendHandlerConfiguration'>
_source    = <function resource.<locals>.decorator.<locals>._source at 0x7f96cd7eaa20>
data_from  = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
f          = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})
func_module = <module 'sources.pg_legacy_replication.helpers' from '/home/drnick/repos/github/neuromantik33/verified-sources/sources/pg_legacy_replication/helpers.py'>
is_inner_resource = False
name       = 'tbl_x'
resolvable_fields = {}
resource_name = 'tbl_x'
source_section = 'helpers'
spec       = None
standalone = False
wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cd7ea8e0>

.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:731: AttributeError
--------------------------- Captured stdout teardown ---------------------------
schema "src_pl_dataset_202501221235482995" does not exist

schema "src_pl_dataset_202501221235482995_staging" does not exist

___________________ test_core_functionality[pyarrow-duckdb] ____________________

src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96ceb20b10>, 'test_slot_ae9b491f')
destination_name = 'duckdb', backend = 'pyarrow'

    @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
    @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow"])
    def test_core_functionality(
        src_config: Tuple[dlt.Pipeline, str], destination_name: str, backend: TableBackend
    ) -> None:
        @dlt.resource(write_disposition="merge", primary_key="id_x")
        def tbl_x(data):
            yield data
    
        @dlt.resource(write_disposition="merge", primary_key="id_y")
        def tbl_y(data):
            yield data
    
        src_pl, slot_name = src_config
    
        src_pl.run(
            [
                tbl_x({"id_x": 1, "val_x": "foo"}),
                tbl_y({"id_y": 1, "val_y": True}),
            ]
        )
        add_pk(src_pl.sql_client, "tbl_x", "id_x")
        add_pk(src_pl.sql_client, "tbl_y", "id_y")
    
        snapshots = init_replication(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            take_snapshots=True,
            table_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )
    
>       changes = replication_source(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            repl_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )

backend    = 'pyarrow'
destination_name = 'duckdb'
slot_name  = 'test_slot_ae9b491f'
snapshots  = <dlt.extract.source.DltSource object at 0x7f96cb9de390>
src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96ceb20b10>, 'test_slot_ae9b491f')
src_pl     = <dlt.pipeline.pipeline.Pipeline object at 0x7f96ceb20b10>
tbl_x      = <dlt.extract.resource.DltResource object at 0x7f96ccd0c610>
tbl_y      = <dlt.extract.resource.DltResource object at 0x7f96ccd0e0d0>

tests/pg_legacy_replication/test_pg_replication.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:195: in __call__
    source = self._deco_f(*args, **kwargs)
        args       = ()
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'pyarrow'}, 'tbl_y': {'backend': 'pyarrow'}}, 'schema': 'src_pl_dataset_202501221235515171', 'slot_name': 'test_slot_ae9b491f', 'table_names': ('tbl_x', 'tbl_y')}
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:280: in _wrap
    return _eval_rv(rv, schema_copy)
        _eval_rv   = <function DltSourceFactoryWrapper._wrap.<locals>._eval_rv at 0x7f96d773fba0>
        _make_schema = <function DltSourceFactoryWrapper._wrap.<locals>._make_schema at 0x7f96cf55c540>
        args       = ()
        conf_f     = <function replication_source at 0x7f96cf55c4a0>
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'pyarrow'}, 'tbl_y': {'backend': 'pyarrow'}}, 'schema': 'src_pl_dataset_202501221235515171', 'slot_name': 'test_slot_ae9b491f', 'table_names': ('tbl_x', 'tbl_y')}
        pipeline_name = 'src_pl'
        proxy      = <dlt.common.pipeline.PipelineContext object at 0x7f96ceb05d10>
        rv         = <generator object replication_source at 0x7f971204dd00>
        schema_copy = Schema replication_source at 140285658134864
        source_sections = ('sources', 'pg_legacy_replication', 'replication_source')
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:240: in _eval_rv
    _rv = list(_rv)
        _rv        = <generator object replication_source at 0x7f971204dd00>
        schema_copy = Schema replication_source at 140285658134864
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
        source_section = 'pg_legacy_replication'
sources/pg_legacy_replication/__init__.py:118: in replication_source
    yield dlt.transformer(
        credentials = <dlt.common.configuration.specs.connection_string_credentials.ConnectionStringCredentials object at 0x7f96cb96a0d0>
        flush_slot = True
        repl_options = defaultdict(<function replication_source.<locals>.<lambda> at 0x7f96cc3dbba0>, {'tbl_x': {'backend': 'pyarrow'}, 'tbl_y': {'backend': 'pyarrow'}})
        replication_resource = <function replication_source.<locals>.replication_resource at 0x7f96cc3d85e0>
        schema     = 'src_pl_dataset_202501221235515171'
        slot_name  = 'test_slot_ae9b491f'
        table      = 'tbl_x'
        table_names = ('tbl_x', 'tbl_y')
        target_batch_size = 1000
        wal_reader = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:958: in transformer
    return resource(  # type: ignore
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
        f          = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})
        file_format = None
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        write_disposition = None
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:758: in resource
    return decorator(data)
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data       = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
        decorator  = <function resource.<locals>.decorator at 0x7f96cafa7740>
        file_format = None
        make_resource = <function resource.<locals>.make_resource at 0x7f96cc3d9d00>
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        references = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cafa6840>
        write_disposition = None
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

f = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})

    def decorator(
        f: Callable[TResourceFunParams, Any]
    ) -> Callable[TResourceFunParams, TDltResourceImpl]:
        if not callable(f):
            if data_from:
                # raise more descriptive exception if we construct transformer
                raise InvalidTransformerDataTypeGeneratorFunctionRequired(
                    name or "<no name>", f, type(f)
                )
            raise ResourceFunctionExpected(name or "<no name>", f, type(f))
        if not standalone and callable(name):
            raise DynamicNameNotStandaloneResource(get_callable_name(f))
    
        resource_name = name if name and not callable(name) else get_callable_name(f)
    
        func_module = inspect.getmodule(f)
        source_section = _get_source_section_name(func_module)
        is_inner_resource = is_inner_callable(f)
    
        if spec is None:
            # autodetect spec
            SPEC, resolvable_fields = spec_from_signature(
                f, inspect.signature(f), include_defaults=standalone
            )
            if is_inner_resource and not standalone:
                if len(resolvable_fields) > 0:
                    # prevent required arguments to inner functions that are not standalone
                    raise ResourceInnerCallableConfigWrapDisallowed(resource_name, source_section)
                else:
                    # empty spec for inner functions - they should not be injected
                    SPEC = BaseConfiguration
        else:
            SPEC = spec
        # assign spec to "f"
        set_fun_spec(f, SPEC)
    
        # register non inner resources as source with single resource in it
        if not is_inner_resource:
            # a source function for the source wrapper, args that go to source are forwarded
            # to a single resource within
            def _source(
                name_ovr: str, section_ovr: str, args: Tuple[Any, ...], kwargs: Dict[str, Any]
            ) -> TDltResourceImpl:
                return wrap_standalone(name_ovr or resource_name, section_ovr or source_section, f)(
                    *args, **kwargs
                )
    
            # make the source module same as original resource
>           _source.__qualname__ = f.__qualname__
E           AttributeError: 'BackendHandler' object has no attribute '__qualname__'

SPEC       = <class 'sources.pg_legacy_replication.helpers.BackendHandlerConfiguration'>
_source    = <function resource.<locals>.decorator.<locals>._source at 0x7f96cafa76a0>
data_from  = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
f          = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})
func_module = <module 'sources.pg_legacy_replication.helpers' from '/home/drnick/repos/github/neuromantik33/verified-sources/sources/pg_legacy_replication/helpers.py'>
is_inner_resource = False
name       = 'tbl_x'
resolvable_fields = {}
resource_name = 'tbl_x'
source_section = 'helpers'
spec       = None
standalone = False
wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cafa6840>

.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:731: AttributeError
----------------------------- Captured stderr call -----------------------------
2025-01-22 13:35:54,498|[WARNING]|38699|140286848428928|dlt|source.py|register:572|A source with ref dlt.helpers.tbl_x is already registered and will be overwritten
2025-01-22 13:35:54,513|[WARNING]|38699|140286848428928|dlt|source.py|register:572|A source with ref dlt.helpers.tbl_y is already registered and will be overwritten
--------------------------- Captured stdout teardown ---------------------------
schema "src_pl_dataset_202501221235515171" does not exist

schema "src_pl_dataset_202501221235515171_staging" does not exist

=============================== warnings summary ===============================
.venv/lib/python3.11/site-packages/dlt/helpers/dbt/__init__.py:3
  /home/drnick/repos/github/neuromantik33/verified-sources/.venv/lib/python3.11/site-packages/dlt/helpers/dbt/__init__.py:3: DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
    import pkg_resources

.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: 19 warnings
tests/pg_legacy_replication/test_pg_replication.py: 1958 warnings
  /home/drnick/repos/github/neuromantik33/verified-sources/.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: DeprecationWarning: currentThread() is deprecated, use current_thread() instead
    if m := re.match(r"dlt-pool-(\d+)-", threading.currentThread().getName()):

.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: 19 warnings
tests/pg_legacy_replication/test_pg_replication.py: 1958 warnings
  /home/drnick/repos/github/neuromantik33/verified-sources/.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: DeprecationWarning: getName() is deprecated, get the name attribute instead
    if m := re.match(r"dlt-pool-(\d+)-", threading.currentThread().getName()):

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
============================= slowest 10 durations =============================
2.99s call     tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
2.89s call     tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
0.17s teardown tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
0.17s teardown tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
0.07s setup    tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
0.04s setup    tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
=========================== short test summary info ============================
FAILED tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
FAILED tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
======================= 2 failed, 3955 warnings in 6.67s =======================

return handler


__all__ = [
"ReplicationOptions",
"cleanup_snapshot_resources",
"init_replication",
"replication_source",
]
6 changes: 6 additions & 0 deletions sources/pg_legacy_replication/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# class SqlDatabaseSourceImportError(Exception):
# def __init__(self) -> None:
# super().__init__(
# "Could not import `sql_database` source. Run `dlt init sql_database <dest>`"
# " to download the source code."
# )
Loading
Loading