Skip to content

Commit

Permalink
Merge pull request #223 from tusharmath/concurrent-stream
Browse files Browse the repository at this point in the history
Control concurrency using an observable in mergeMap( )
  • Loading branch information
tusharmath authored Nov 20, 2017
2 parents 268b8ee + 0721de7 commit 4b695d3
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 91 deletions.
2 changes: 1 addition & 1 deletion API.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ function mergeMap(concurrency: number,project: (s) => Observable,source: Observa
**Example:**
```ts
const $ = O.mergeMap(
1,
O.just(1),
(ev) => O.slice(0, 3, O.interval(100)),
O.fromEvent(document, 'click')
)
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/bm.mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import {Suite} from 'benchmark'
import {mergeMap} from '../src/operators/MergeMap'
import {just} from '../src/sources/Create'
import {fromArray} from '../src/sources/FromArray'
import {array, IDeferred, run} from './lib'

Expand All @@ -12,7 +13,7 @@ const a = array(1e3).map(i => array(1e3))
export function bm_mergeMap(suite: Suite) {
return suite.add(
'file -> mergeMap',
(d: IDeferred) => run(mergeMap(1e2, fromArray, fromArray(a)), d),
(d: IDeferred) => run(mergeMap(just(1e2), fromArray, fromArray(a)), d),
{defer: true}
)
}
2 changes: 1 addition & 1 deletion extra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class Air<T> implements IObservable<T> {
map<S>(fn: (t: T) => S) {
return new Air(O.map(fn, this))
}
mergeMap<S>(n: number, project: (t: T) => IObservable<S>) {
mergeMap<S>(n: IObservable<number>, project: (t: T) => IObservable<S>) {
return new Air(O.mergeMap(n, project, this))
}
multicast() {
Expand Down
8 changes: 8 additions & 0 deletions src/lib/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export class CompositeSubscription implements ISubscription {
this.subscriptions = new LinkedList<ISubscription>()
}

head() {
return this.subscriptions.head()
}

tail() {
return this.subscriptions.tail()
}

add(d: ISubscription) {
return this.subscriptions.add(d)
}
Expand Down
120 changes: 79 additions & 41 deletions src/operators/MergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,42 @@ import {IObserver} from '../lib/Observer'
import {IScheduler} from '../lib/Scheduler'
import {CompositeSubscription, ISubscription} from '../lib/Subscription'
import {curry} from '../lib/Utils'
import {just} from '../sources/Create'

export type Project<T, S> = (t: T) => IObservable<S>

// prettier-ignore
export type mergeMapFunctionWithConcurrency = {
<T, S>(concurrency: IObservable<number>, project: Project<T, S>, source: IObservable<T>): IObservable<S>
<T, S>(concurrency: IObservable<number>): {(project: Project<T, S>, source: IObservable<T>): IObservable<S>}
<T, S>(concurrency: IObservable<number>): {(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}}
}

// prettier-ignore
export type mergeMapFunction = {
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
<T, S>(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}
}
// prettier-ignore
export type joinFunction = <T>(source: IObservable<IObservable<T>>) => IObservable<T>

class ConcurrencyObserver extends ErrorMixin(Virgin)
implements IObserver<number> {
constructor(
private outer: MergeMapOuterObserver<any, any>,
readonly sink: IObserver<any>
) {
super()
}

next(val: number): void {
this.outer.setConc(val)
}

complete(): void {}
}

class MergeMapInnerObserver<T, S> implements IObserver<S> {
node: LinkedListNode<ISubscription>

Expand Down Expand Up @@ -39,27 +72,43 @@ class MergeMapOuterObserver<T, S> extends ErrorMixin(Virgin)
implements IObserver<T> {
private __completed: boolean = false
private __buffer: T[] = []
private __conc = 0
readonly cSub = new CompositeSubscription()

constructor(
readonly conc: number,
conc: IObservable<number>,
readonly proj: Project<T, S>,
readonly sink: IObserver<S>,
readonly cSub: CompositeSubscription,
readonly sh: IScheduler
) {
super()
conc.subscribe(new ConcurrencyObserver(this, sink), sh)
}

private getSpace() {
return this.__conc - this.cSub.length()
}

private subscribeNew() {
while (this.__buffer.length > 0 && this.getSpace() > 0) {
this.next(this.__buffer.shift() as T)
}
}

private unsubscribeOld() {
while (this.getSpace() < 0) {
this.cSub.remove(this.cSub.head())
}
}

next(val: T): void {
if (this.cSub.length() < this.conc + 1) {
if (this.getSpace() > 0) {
const innerObserver = new MergeMapInnerObserver(this)
const node = this.cSub.add(
this.proj(val).subscribe(innerObserver, this.sh)
)
innerObserver.setup(node)
} else {
this.__buffer.push(val)
}
} else this.__buffer.push(val)
}

complete(): void {
Expand All @@ -68,61 +117,50 @@ class MergeMapOuterObserver<T, S> extends ErrorMixin(Virgin)
}

checkComplete() {
if (this.__completed && this.cSub.length() === 1) this.sink.complete()
else if (this.__buffer.length > 0) {
if (this.__buffer.length > 0) {
this.next(this.__buffer.shift() as T)
}
}
if (
this.__completed &&
this.cSub.length() === 0 &&
this.__buffer.length === 0
)
this.sink.complete()
else this.subscribeNew()
}

setConc(value: number) {
this.__conc = value
const space = this.getSpace()
if (space > 0) this.subscribeNew()
else if (space < 0) this.unsubscribeOld()
}
}

class MergeMap<T, S> implements IObservable<S> {
constructor(
private conc: number,
private conc: IObservable<number>,
private proj: Project<T, S>,
private src: IObservable<T>
) {}

subscribe(observer: IObserver<S>, scheduler: IScheduler): ISubscription {
const cSub = new CompositeSubscription()
const outerObserver = new MergeMapOuterObserver<T, S>(
this.conc,
this.proj,
observer,
cSub,
scheduler
)

// prettier-ignore
const outerObserver = new MergeMapOuterObserver<T, S>(this.conc, this.proj, observer, scheduler)
cSub.add(outerObserver.cSub)
cSub.add(this.src.subscribe(outerObserver, scheduler))
return cSub
}
}

// prettier-ignore
export type mergeMapFunctionWithConcurrency = {
<T, S>(concurrency: number, project: Project<T, S>, source: IObservable<T>): IObservable<S>
<T, S>(concurrency: number): {(project: Project<T, S>, source: IObservable<T>): IObservable<S>}
<T, S>(concurrency: number): {(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}}
}

// prettier-ignore
export type mergeMapFunction = {
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
<T, S>(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}
}

export type joinFunction = <T>(
source: IObservable<IObservable<T>>
) => IObservable<T>

export const mergeMap: mergeMapFunctionWithConcurrency = curry(
<T, S>(
concurrency: number,
concurrency: IObservable<number>,
project: Project<T, S>,
source: IObservable<T>
): IObservable<S> => new MergeMap(concurrency, project, source)
)
export const flatMap = mergeMap(Number.POSITIVE_INFINITY) as mergeMapFunction
export const concatMap: mergeMapFunction = mergeMap(1) as mergeMapFunction
export const flatMap = mergeMap(
just(Number.POSITIVE_INFINITY)
) as mergeMapFunction
export const concatMap: mergeMapFunction = mergeMap(just(1)) as mergeMapFunction
export const join = flatMap(i => i) as joinFunction
2 changes: 1 addition & 1 deletion src/testing/TestObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class TestObservable<T> implements IObservable<T> {

constructor(private func: (observer: IObserver<T>) => ISubscription) {}

get marble() {
toString() {
return toMarble(this.subscriptions)
}

Expand Down
2 changes: 1 addition & 1 deletion src/testing/TestObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {toMarble} from './Marble'
export class TestObserver<T> implements IObserver<T> {
results: Array<IObservableEvent>

get marble() {
toString() {
return toMarble(this.results)
}

Expand Down
6 changes: 3 additions & 3 deletions test/test.ForEach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('forEach()', () => {
'#'
)

const actual = testObservable.marble
const actual = testObservable.toString()
assert.strictEqual(actual, expected)
})
})
Expand All @@ -42,7 +42,7 @@ describe('forEach()', () => {

sh.startSubscription(() => forEach(testObserver, $))

const actual = testObserver.marble
const actual = testObserver.toString()
const expected = '-1234|'

assert.strictEqual(actual, expected)
Expand All @@ -56,7 +56,7 @@ describe('forEach()', () => {

sh.startSubscription(() => forEach(testObserver, testObservable, sh))

const actual = testObserver.marble
const actual = testObserver.toString()

assert.strictEqual(actual, expected)
})
Expand Down
Loading

0 comments on commit 4b695d3

Please sign in to comment.