diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 0983dfa2..f62f4515 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -3228,16 +3228,16 @@ def starmap_indexed( invoking the indexed mapper function with unpacked elements of the source. """ - from ._map import map_ + from ._map import map_indexed_ if mapper is None: return compose(identity) - def starred(values: Tuple[Any, ...]) -> Any: + def starred(values: Tuple[Any, ...], i: int) -> Any: assert mapper # mypy is paranoid - return mapper(*values) + return mapper(*values, i) - return compose(map_(starred)) + return compose(map_indexed_(starred)) def start_with(*args: _T) -> Callable[[Observable[_T]], Observable[_T]]: diff --git a/reactivex/operators/_map.py b/reactivex/operators/_map.py index 48b4c094..25009cd7 100644 --- a/reactivex/operators/_map.py +++ b/reactivex/operators/_map.py @@ -65,7 +65,7 @@ def _identity(value: _T1, _: int) -> _T2: return compose( ops.zip_with_iterable(infinite()), - ops.starmap_indexed(_mapper_indexed), + ops.starmap(_mapper_indexed), ) diff --git a/tests/test_observable/test_starmap.py b/tests/test_observable/test_starmap.py index a4ba460e..6fd3ac76 100644 --- a/tests/test_observable/test_starmap.py +++ b/tests/test_observable/test_starmap.py @@ -405,6 +405,211 @@ def mapper(x, y): assert xs.subscriptions == [subscribe(200, 290)] assert invoked[0] == 3 + def test_starmap_with_index_throws(self): + with self.assertRaises(RxException): + mapper = ops.starmap_indexed(lambda x, y, index: x) + + return return_value((1, 10)).pipe(mapper).subscribe(lambda x: _raise("ex")) + + with self.assertRaises(RxException): + return ( + throw("ex").pipe(mapper).subscribe(lambda x: x, lambda ex: _raise(ex)) + ) + + with self.assertRaises(RxException): + return ( + empty() + .pipe(mapper) + .subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex")) + ) + + with self.assertRaises(RxException): + return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe() + + def test_starmap_with_index_dispose_inside_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(100, (4, 40)), on_next(200, (3, 30)), on_next(500, (2, 20)), on_next(600, (1, 10)) + ) + invoked = [0] + results = scheduler.create_observer() + d = SerialDisposable() + + def projection(x, y, index): + invoked[0] += 1 + if scheduler.clock > 400: + d.dispose() + + return x + y + index * 100 + + d.disposable = xs.pipe(ops.starmap_indexed(projection)).subscribe(results) + + def action(scheduler, state): + return d.dispose() + + scheduler.schedule_absolute(disposed, action) + scheduler.start() + assert results.messages == [on_next(100, 44), on_next(200, 133)] + assert xs.subscriptions == [subscribe(0, 500)] + assert invoked[0] == 3 + + def test_starmap_with_index_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_next(290, 233), + on_next(350, 322), + on_completed(400), + ] + assert xs.subscriptions == [subscribe(200, 400)] + assert invoked[0] == 4 + + def test_starmap_with_index_default_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + return xs.pipe(ops.starmap_indexed()) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + ] + + assert xs.subscriptions == [subscribe(200, 400)] + + def test_starmap_with_index_not_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_next(290, 233), + on_next(350, 322), + ] + assert xs.subscriptions == [subscribe(200, 1000)] + assert invoked[0] == 4 + + def test_starmap_with_index_error(self): + scheduler = TestScheduler() + ex = "ex" + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_error(400, ex), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_next(290, 233), + on_next(350, 322), + on_error(400, ex), + ] + assert xs.subscriptions == [subscribe(200, 400)] + + def test_starmap_with_index_mapper_throws(self): + scheduler = TestScheduler() + invoked = [0] + ex = "ex" + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + if invoked[0] == 3: + raise Exception(ex) + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_error(290, ex), + ] + assert xs.subscriptions == [subscribe(200, 290)] + assert invoked[0] == 3 + if __name__ == "__main__": unittest.main()