Skip to content

Commit

Permalink
Merge pull request #421 from dlt-hub/d#/379-round-robin-pipe-iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp authored Jun 16, 2023
2 parents 302ef3f + d1fefc6 commit 9010fe8
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 18 deletions.
87 changes: 73 additions & 14 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from threading import Thread
from typing import Any, ContextManager, Optional, Sequence, Union, Callable, Iterable, Iterator, List, NamedTuple, Awaitable, Tuple, Type, TYPE_CHECKING
from typing import Any, ContextManager, Optional, Sequence, Union, Callable, Iterable, Iterator, List, NamedTuple, Awaitable, Tuple, Type, TYPE_CHECKING, Literal

from dlt.common import sleep
from dlt.common.configuration import configspec
Expand Down Expand Up @@ -65,6 +65,8 @@ class SourcePipeItem(NamedTuple):
Callable[[TDataItems, Optional[Any]], Iterator[ResolvablePipeItem]]
]

TPipeNextItemMode = Union[Literal["fifo"], Literal["round_robin"]]


class ForkPipe:
def __init__(self, pipe: "Pipe", step: int = -1, copy_on_fork: bool = False) -> None:
Expand Down Expand Up @@ -424,23 +426,27 @@ class PipeIteratorConfiguration(BaseConfiguration):
workers: int = 5
futures_poll_interval: float = 0.01
copy_on_fork: bool = False
next_item_mode: str = "fifo"

__section__ = "extract"

def __init__(self, max_parallel_items: int, workers: int, futures_poll_interval: float) -> None:
def __init__(self, max_parallel_items: int, workers: int, futures_poll_interval: float, next_item_mode: TPipeNextItemMode) -> None:
self.max_parallel_items = max_parallel_items
self.workers = workers
self.futures_poll_interval = futures_poll_interval

self._round_robin_index: int = -1
self._initial_sources_count: int = 0
self._async_pool: asyncio.AbstractEventLoop = None
self._async_pool_thread: Thread = None
self._thread_pool: ThreadPoolExecutor = None
self._sources: List[SourcePipeItem] = []
self._futures: List[FuturePipeItem] = []
self._next_item_mode = next_item_mode

@classmethod
@with_config(spec=PipeIteratorConfiguration)
def from_pipe(cls, pipe: Pipe, *, max_parallel_items: int = 20, workers: int = 5, futures_poll_interval: float = 0.01) -> "PipeIterator":
def from_pipe(cls, pipe: Pipe, *, max_parallel_items: int = 20, workers: int = 5, futures_poll_interval: float = 0.01, next_item_mode: TPipeNextItemMode = "fifo") -> "PipeIterator":
# join all dependent pipes
if pipe.parent:
pipe = pipe.full_pipe()
Expand All @@ -450,9 +456,10 @@ def from_pipe(cls, pipe: Pipe, *, max_parallel_items: int = 20, workers: int = 5
pipe.evaluate_gen()
assert isinstance(pipe.gen, Iterator)
# create extractor
extract = cls(max_parallel_items, workers, futures_poll_interval)
extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode)
# add as first source
extract._sources.append(SourcePipeItem(pipe.gen, 0, pipe, None))
cls._initial_sources_count = 1
return extract

@classmethod
Expand All @@ -465,13 +472,16 @@ def from_pipes(
max_parallel_items: int = 20,
workers: int = 5,
futures_poll_interval: float = 0.01,
copy_on_fork: bool = False
copy_on_fork: bool = False,
next_item_mode: TPipeNextItemMode = "fifo"
) -> "PipeIterator":

# print(f"max_parallel_items: {max_parallel_items} workers: {workers}")
extract = cls(max_parallel_items, workers, futures_poll_interval)
extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode)
# clone all pipes before iterating (recursively) as we will fork them (this add steps) and evaluate gens
pipes = PipeIterator.clone_pipes(pipes)


def _fork_pipeline(pipe: Pipe) -> None:
if pipe.parent:
# fork the parent pipe
Expand All @@ -490,9 +500,14 @@ def _fork_pipeline(pipe: Pipe) -> None:
if not any(i.pipe == pipe for i in extract._sources):
extract._sources.append(SourcePipeItem(pipe.gen, 0, pipe, None))

for pipe in reversed(pipes):
# reverse pipes for current mode, as we start processing from the back
if next_item_mode == "fifo":
pipes.reverse()
for pipe in pipes:
_fork_pipeline(pipe)

extract._initial_sources_count = len(extract._sources)

return extract

def __next__(self) -> PipeItem:
Expand Down Expand Up @@ -667,16 +682,21 @@ def _resolve_futures(self) -> ResolvablePipeItem:
return ResolvablePipeItem(item, step, pipe, meta)

def _get_source_item(self) -> ResolvablePipeItem:
if self._next_item_mode == "fifo":
return self._get_source_item_current()
elif self._next_item_mode == "round_robin":
return self._get_source_item_round_robin()

def _get_source_item_current(self) -> ResolvablePipeItem:
# no more sources to iterate
if len(self._sources) == 0:
return None

# get items from last added iterator, this makes the overall Pipe as close to FIFO as possible
gen, step, pipe, meta = self._sources[-1]
# print(f"got {pipe.name} {pipe._pipe_id}")
# register current pipe name during the execution of gen
set_current_pipe_name(pipe.name)
try:
# get items from last added iterator, this makes the overall Pipe as close to FIFO as possible
gen, step, pipe, meta = self._sources[-1]
# print(f"got {pipe.name} {pipe._pipe_id}")
# register current pipe name during the execution of gen
set_current_pipe_name(pipe.name)
item = next(gen)
# full pipe item may be returned, this is used by ForkPipe step
# to redirect execution of an item to another pipe
Expand All @@ -697,8 +717,47 @@ def _get_source_item(self) -> ResolvablePipeItem:
except Exception as ex:
raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex

def _get_source_item_round_robin(self) -> ResolvablePipeItem:
# no more sources to iterate
sources_count = len(self._sources)
if sources_count == 0:
return None
# if there are currently more sources than added initially, we need to process the new ones first
if sources_count > self._initial_sources_count:
return self._get_source_item_current()
try:
# get items from last added iterator, this makes the overall Pipe as close to FIFO as possible
self._round_robin_index = (self._round_robin_index + 1) % sources_count
gen, step, pipe, meta = self._sources[self._round_robin_index]
# print(f"got {pipe.name} {pipe._pipe_id}")
# register current pipe name during the execution of gen
set_current_pipe_name(pipe.name)
item = next(gen)
# full pipe item may be returned, this is used by ForkPipe step
# to redirect execution of an item to another pipe
if isinstance(item, ResolvablePipeItem):
return item
else:
# keep the item assigned step and pipe when creating resolvable item
if isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)
except StopIteration:
# remove empty iterator and try another source
self._sources.pop(self._round_robin_index)
# we need to decrease the index to keep the round robin order
self._round_robin_index -= 1
# since in this case we have popped an initial source, we need to decrease the initial sources count
self._initial_sources_count -= 1
return self._get_source_item()
except (PipelineException, ExtractorException, DltSourceException, PipeException):
raise
except Exception as ex:
raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex

@staticmethod
def clone_pipes(pipes: Sequence[Pipe]) -> Sequence[Pipe]:
def clone_pipes(pipes: Sequence[Pipe]) -> List[Pipe]:
"""This will clone pipes and fix the parent/dependent references"""
cloned_pipes = [p._clone() for p in pipes]
cloned_pairs = {id(p): c for p, c in zip(pipes, cloned_pipes)}
Expand Down
37 changes: 33 additions & 4 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ If you can, yield pages when producing data. This makes some processes more effe

dlt likes resources that yield data because it can request data into a buffer before processing and releasing it. This makes it possible to manage the amount of resources used. In order to configure this option, you can specify buffer size via env variables or by adding to the config.toml

globally: `DATA_WRITER__BUFFER_MAX_ITEMS=100`
globally in your config.toml

or specifically:
```
[data_writer]
max_buffer_items=100
```

`NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS = 100`
or specifically for the normalization and source:

`SOURCES__DATA_WRITER__BUFFER_MAX_ITEMS = 100`
```
[normalize.data_writer]
max_buffer_items=100
[sources.data_writer]
max_buffer_items=200
```

The default buffer is actually set to a moderately low value, so unless you are trying to run dlt on IOT sensors or other tiny infrastructures, you might actually want to increase it to speed up processing.

Expand All @@ -31,3 +40,23 @@ To troubleshoot memory usage you can add the env var `PROGRESS=log`.
### Parallelism

Parallelism can be limited with the config option `max_parallel_items = 5` that you can place under a source. As dlt is a library can also leverage parallelism outside of dlt such as by placing tasks in parallel in a dag.

```
[extract] # global setting
max_parallel_items=5
[sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline
max_parallel_items=5
```

### Resources loading, fifo vs. round robin
When extracting from resources, you have two options to determin what the order of queries to your resources are: `fifo` and `round_robin`. `fifo` is the default option and will result in every resource being fully extracted before the next resource is extracted in the order that you added them to your source. `round_robin` will result in extraction of one item from the first resource, then one item from the second resource etc, doing as many rounds as necessary until all resources are fully extracted. You can change this setting in your config.toml as follows:


```
[extract] # global setting
next_item_mode=round_robin
[sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline
next_item_mode=5
```
35 changes: 35 additions & 0 deletions tests/extract/test_extract_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,41 @@
# from tests.utils import preserve_environ


def test_next_item_mode() -> None:

def nested_gen_level_2():
yield from [88, 89]

def nested_gen():
yield from [55, 56, 77, nested_gen_level_2()]

def source_gen1():
yield from [1, 2, nested_gen(), 3,4]

def source_gen2():
yield from range(11,16)

def source_gen3():
yield from range(20,22)

def get_pipes():
return [
Pipe.from_data("data1", source_gen1()),
Pipe.from_data("data2", source_gen2()),
Pipe.from_data("data3", source_gen3()),
]

# default mode is "fifo"
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo"))
# items will be in order of the pipes, nested iterator items appear inline
assert [pi.item for pi in _l] == [1, 2, 55, 56, 77, 88, 89, 3, 4, 11, 12, 13, 14, 15, 20, 21]

# round robin mode
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin"))
# items will be round robin, nested iterators are fully iterated and appear inline as soon as they are encountered
assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 55, 56, 77, 88, 89, 13, 3, 14, 4, 15]


def test_add_step() -> None:
data = [1, 2, 3]
data_iter = iter(data)
Expand Down

0 comments on commit 9010fe8

Please sign in to comment.