diff --git a/API.md b/API.md index b228fa5..43c6332 100644 --- a/API.md +++ b/API.md @@ -418,7 +418,7 @@ function mergeMap(concurrency: number,project: (s) => Observable,source: Observa **Example:** ```ts const $ = O.mergeMap( - 1, + O.just(1), (ev) => O.slice(0, 3, O.interval(100)), O.fromEvent(document, 'click') ) diff --git a/benchmarks/bm.mergeMap.ts b/benchmarks/bm.mergeMap.ts index 6aba6d2..2a7719c 100644 --- a/benchmarks/bm.mergeMap.ts +++ b/benchmarks/bm.mergeMap.ts @@ -4,6 +4,7 @@ import {Suite} from 'benchmark' import {mergeMap} from '../src/operators/MergeMap' +import {just} from '../src/sources/Create' import {fromArray} from '../src/sources/FromArray' import {array, IDeferred, run} from './lib' @@ -12,7 +13,7 @@ const a = array(1e3).map(i => array(1e3)) export function bm_mergeMap(suite: Suite) { return suite.add( 'file -> mergeMap', - (d: IDeferred) => run(mergeMap(1e2, fromArray, fromArray(a)), d), + (d: IDeferred) => run(mergeMap(just(1e2), fromArray, fromArray(a)), d), {defer: true} ) } diff --git a/extra.ts b/extra.ts index 871c200..daa4ad8 100644 --- a/extra.ts +++ b/extra.ts @@ -50,7 +50,7 @@ export class Air implements IObservable { map(fn: (t: T) => S) { return new Air(O.map(fn, this)) } - mergeMap(n: number, project: (t: T) => IObservable) { + mergeMap(n: IObservable, project: (t: T) => IObservable) { return new Air(O.mergeMap(n, project, this)) } multicast() { diff --git a/src/lib/Subscription.ts b/src/lib/Subscription.ts index 7588b9d..89a0f87 100644 --- a/src/lib/Subscription.ts +++ b/src/lib/Subscription.ts @@ -33,6 +33,14 @@ export class CompositeSubscription implements ISubscription { this.subscriptions = new LinkedList() } + head() { + return this.subscriptions.head() + } + + tail() { + return this.subscriptions.tail() + } + add(d: ISubscription) { return this.subscriptions.add(d) } diff --git a/src/operators/MergeMap.ts b/src/operators/MergeMap.ts index 6691c68..c9c29c6 100644 --- a/src/operators/MergeMap.ts +++ b/src/operators/MergeMap.ts @@ -9,9 +9,42 @@ import {IObserver} from '../lib/Observer' import {IScheduler} from '../lib/Scheduler' import {CompositeSubscription, ISubscription} from '../lib/Subscription' import {curry} from '../lib/Utils' +import {just} from '../sources/Create' export type Project = (t: T) => IObservable +// prettier-ignore +export type mergeMapFunctionWithConcurrency = { + (concurrency: IObservable, project: Project, source: IObservable): IObservable + (concurrency: IObservable): {(project: Project, source: IObservable): IObservable} + (concurrency: IObservable): {(project: Project): {(source: IObservable): IObservable}} +} + +// prettier-ignore +export type mergeMapFunction = { + (project: Project, source: IObservable): IObservable + (project: Project, source: IObservable): IObservable + (project: Project): {(source: IObservable): IObservable} +} +// prettier-ignore +export type joinFunction = (source: IObservable>) => IObservable + +class ConcurrencyObserver extends ErrorMixin(Virgin) + implements IObserver { + constructor( + private outer: MergeMapOuterObserver, + readonly sink: IObserver + ) { + super() + } + + next(val: number): void { + this.outer.setConc(val) + } + + complete(): void {} +} + class MergeMapInnerObserver implements IObserver { node: LinkedListNode @@ -39,27 +72,43 @@ class MergeMapOuterObserver extends ErrorMixin(Virgin) implements IObserver { private __completed: boolean = false private __buffer: T[] = [] + private __conc = 0 + readonly cSub = new CompositeSubscription() constructor( - readonly conc: number, + conc: IObservable, readonly proj: Project, readonly sink: IObserver, - readonly cSub: CompositeSubscription, readonly sh: IScheduler ) { super() + conc.subscribe(new ConcurrencyObserver(this, sink), sh) + } + + private getSpace() { + return this.__conc - this.cSub.length() + } + + private subscribeNew() { + while (this.__buffer.length > 0 && this.getSpace() > 0) { + this.next(this.__buffer.shift() as T) + } + } + + private unsubscribeOld() { + while (this.getSpace() < 0) { + this.cSub.remove(this.cSub.head()) + } } next(val: T): void { - if (this.cSub.length() < this.conc + 1) { + if (this.getSpace() > 0) { const innerObserver = new MergeMapInnerObserver(this) const node = this.cSub.add( this.proj(val).subscribe(innerObserver, this.sh) ) innerObserver.setup(node) - } else { - this.__buffer.push(val) - } + } else this.__buffer.push(val) } complete(): void { @@ -68,61 +117,50 @@ class MergeMapOuterObserver extends ErrorMixin(Virgin) } checkComplete() { - if (this.__completed && this.cSub.length() === 1) this.sink.complete() - else if (this.__buffer.length > 0) { - if (this.__buffer.length > 0) { - this.next(this.__buffer.shift() as T) - } - } + if ( + this.__completed && + this.cSub.length() === 0 && + this.__buffer.length === 0 + ) + this.sink.complete() + else this.subscribeNew() + } + + setConc(value: number) { + this.__conc = value + const space = this.getSpace() + if (space > 0) this.subscribeNew() + else if (space < 0) this.unsubscribeOld() } } class MergeMap implements IObservable { constructor( - private conc: number, + private conc: IObservable, private proj: Project, private src: IObservable ) {} subscribe(observer: IObserver, scheduler: IScheduler): ISubscription { const cSub = new CompositeSubscription() - const outerObserver = new MergeMapOuterObserver( - this.conc, - this.proj, - observer, - cSub, - scheduler - ) + + // prettier-ignore + const outerObserver = new MergeMapOuterObserver(this.conc, this.proj, observer, scheduler) + cSub.add(outerObserver.cSub) cSub.add(this.src.subscribe(outerObserver, scheduler)) return cSub } } -// prettier-ignore -export type mergeMapFunctionWithConcurrency = { - (concurrency: number, project: Project, source: IObservable): IObservable - (concurrency: number): {(project: Project, source: IObservable): IObservable} - (concurrency: number): {(project: Project): {(source: IObservable): IObservable}} -} - -// prettier-ignore -export type mergeMapFunction = { - (project: Project, source: IObservable): IObservable - (project: Project, source: IObservable): IObservable - (project: Project): {(source: IObservable): IObservable} -} - -export type joinFunction = ( - source: IObservable> -) => IObservable - export const mergeMap: mergeMapFunctionWithConcurrency = curry( ( - concurrency: number, + concurrency: IObservable, project: Project, source: IObservable ): IObservable => new MergeMap(concurrency, project, source) ) -export const flatMap = mergeMap(Number.POSITIVE_INFINITY) as mergeMapFunction -export const concatMap: mergeMapFunction = mergeMap(1) as mergeMapFunction +export const flatMap = mergeMap( + just(Number.POSITIVE_INFINITY) +) as mergeMapFunction +export const concatMap: mergeMapFunction = mergeMap(just(1)) as mergeMapFunction export const join = flatMap(i => i) as joinFunction diff --git a/src/testing/TestObservable.ts b/src/testing/TestObservable.ts index 54d0da5..1a92715 100644 --- a/src/testing/TestObservable.ts +++ b/src/testing/TestObservable.ts @@ -13,7 +13,7 @@ export class TestObservable implements IObservable { constructor(private func: (observer: IObserver) => ISubscription) {} - get marble() { + toString() { return toMarble(this.subscriptions) } diff --git a/src/testing/TestObserver.ts b/src/testing/TestObserver.ts index fcb4bf5..f65aa00 100644 --- a/src/testing/TestObserver.ts +++ b/src/testing/TestObserver.ts @@ -9,7 +9,7 @@ import {toMarble} from './Marble' export class TestObserver implements IObserver { results: Array - get marble() { + toString() { return toMarble(this.results) } diff --git a/test/test.ForEach.ts b/test/test.ForEach.ts index cc04de6..2f8df19 100644 --- a/test/test.ForEach.ts +++ b/test/test.ForEach.ts @@ -28,7 +28,7 @@ describe('forEach()', () => { '#' ) - const actual = testObservable.marble + const actual = testObservable.toString() assert.strictEqual(actual, expected) }) }) @@ -42,7 +42,7 @@ describe('forEach()', () => { sh.startSubscription(() => forEach(testObserver, $)) - const actual = testObserver.marble + const actual = testObserver.toString() const expected = '-1234|' assert.strictEqual(actual, expected) @@ -56,7 +56,7 @@ describe('forEach()', () => { sh.startSubscription(() => forEach(testObserver, testObservable, sh)) - const actual = testObserver.marble + const actual = testObserver.toString() assert.strictEqual(actual, expected) }) diff --git a/test/test.MergeMap.ts b/test/test.MergeMap.ts index f5c0ed2..46db16e 100644 --- a/test/test.MergeMap.ts +++ b/test/test.MergeMap.ts @@ -1,7 +1,7 @@ /** * Created by tushar on 31/08/17. */ -import * as t from 'assert' +import * as assert from 'assert' import {mergeMap} from '../src/operators/MergeMap' import {EVENT} from '../src/testing/Events' import {createTestScheduler} from '../src/testing/TestScheduler' @@ -11,50 +11,106 @@ describe('mergeMap()', () => { context('when concurrency is Infinity', () => { it('should work like flatMap()', () => { const sh = createTestScheduler() - const sa$$ = sh.Cold([ - next(10, 'A0'), - next(20, 'A1'), - next(30, 'A2'), - complete(40) - ]) - - const sb$$ = sh.Cold([ - next(10, 'B0'), - next(20, 'B1'), - next(30, 'B2'), - complete(40) - ]) - - const s$$ = sh.Cold(next(10, sa$$), next(20, sb$$), complete(100)) - const {results} = sh.start(() => mergeMap(Infinity, (i: any) => i, s$$)) - - t.deepEqual(results, [ - next(220, 'A0'), - next(230, 'A1'), - next(230, 'B0'), - next(240, 'A2'), - next(240, 'B1'), - next(250, 'B2'), - complete(300) - ]) + const conc = ' -3| ' + const a = ' --A---A| ' + const b = ' ---B---B---B---B| ' + const c = ' ----C---C---C---C---C---|' + const expected = '--ABC-ABC--BC--BC---C---|' + + const observer = sh.start(() => { + const conc$ = sh.Hot(conc) + const a$ = sh.Hot(a) + const b$ = sh.Hot(b) + const c$ = sh.Hot(c) + const source$$ = sh.Hot( + next(200, a$), + next(215, b$), + next(220, c$), + complete(300) + ) + return mergeMap(conc$, (i: any) => i, source$$) + }) + assert.deepEqual(observer.toString(), expected) }) }) context('when concurrency is 1', () => { it('should work like concatMap()', () => { const sh = createTestScheduler() - const s$$ = sh.Cold( - next(10, sh.Cold(next(1, 'A0'), complete(50))), - next(20, sh.Cold(next(1, 'B0'), complete(50))), - next(30, sh.Cold(next(1, 'C0'), complete(50))), - complete(600) - ) - const {results} = sh.start(() => mergeMap(1, (i: any) => i, s$$)) - t.deepEqual(results, [ - next(211, 'A0'), - next(261, 'B0'), - next(311, 'C0'), - complete(800) - ]) + + const conc = ' -1| ' + const a = ' --A---A| ' + const b = ' ---B---B---B---B| ' + const c = ' ----C---C---C---C---C---|' + const expected = '--A---AB---B---BC---C---|' + + const observer = sh.start(() => { + const conc$ = sh.Hot(conc) + const a$ = sh.Hot(a) + const b$ = sh.Hot(b) + const c$ = sh.Hot(c) + const source$$ = sh.Hot( + next(200, a$), + next(215, b$), + next(220, c$), + complete(300) + ) + return mergeMap(conc$, (i: any) => i, source$$) + }) + assert.deepEqual(observer.toString(), expected) + }) + }) + + context('when concurrency increases', () => { + it('should automatically subscribe from buffer', () => { + const sh = createTestScheduler() + const conc = ' -1-----------3----------|' + const a = ' --A---A---A---A---A---A-|' + const b = ' ---B---B---B---B---B---B|' + const c = ' ----C---C---C---C---C---|' + const expected = '--A---A---A---ABC-ABC-AB|' + + const observer = sh.start(() => { + const concurr$ = sh.Hot(conc) + const a$ = sh.Hot(a) + const b$ = sh.Hot(b) + const c$ = sh.Hot(c) + const source$$ = sh.Hot( + next(200, a$), + next(215, b$), + next(220, c$), + complete(300) + ) + return mergeMap(concurr$, (i: any) => i, source$$) + }) + + assert.strictEqual(observer.toString(), expected) + }) + }) + + context('when concurrency decreases', () => { + it('should automatically unsubscribe the oldest', () => { + const sh = createTestScheduler() + const conc = ' -3-----------1----------|' + const a = ' --A---A---A---A---A---A-|' + const b = ' ---B---B---B---B---B---B|' + const c = ' ----C---C---C---C---C---|' + const expected = '--ABC-ABC-ABC---C---C---|' + + const observer = sh.start(() => { + const concurr$ = sh.Hot(conc) + const a$ = sh.Hot(a) + const b$ = sh.Hot(b) + const c$ = sh.Hot(c) + const source$$ = sh.Hot( + next(200, a$), + next(215, b$), + next(220, c$), + complete(300) + ) + return mergeMap(concurr$, (i: any) => i, source$$) + }) + + assert.strictEqual(observer.toString(), expected) }) }) }) diff --git a/test/test.Unique.ts b/test/test.Unique.ts index d98a99d..0ddb66d 100644 --- a/test/test.Unique.ts +++ b/test/test.Unique.ts @@ -9,7 +9,7 @@ describe('unique', () => { it('should return emit unique value', () => { const sh = createTestScheduler() const source = sh.Hot('-a-b-c-a--b-|') - const {marble} = sh.start(() => unique(source)) - assert.strictEqual(marble, '-a-b-c------|') + const observer = sh.start(() => unique(source)) + assert.strictEqual(observer.toString(), '-a-b-c------|') }) })