diff --git a/.prettierrc b/.prettierrc index 3cc6dac..97ef6d6 100644 --- a/.prettierrc +++ b/.prettierrc @@ -1,5 +1,6 @@ { "printWidth": 80, "semi": false, - "singleQuote": true + "singleQuote": true, + "bracketSpacing": false } \ No newline at end of file diff --git a/API.md b/API.md index 57602dc..18d2291 100644 --- a/API.md +++ b/API.md @@ -549,8 +549,8 @@ Takes in multiple sources and a sample source and returns a new observable which ```ts O.sample( - O.interval(1000), (a, b) => [a, b], + O.interval(1000), [ O.mapTo('A', O.interval(100)), O.mapTo('B', O.interval(200)) diff --git a/benchmarks/bm.concat.ts b/benchmarks/bm.concat.ts index 59fcf0a..7b4ca32 100644 --- a/benchmarks/bm.concat.ts +++ b/benchmarks/bm.concat.ts @@ -13,6 +13,8 @@ export function bm_concat(suite: Suite) { return suite.add( '(file, file) -> concat', (d: IDeferred) => run(concat(fromArray(a), fromArray(b)), d), - {defer: true} + { + defer: true + } ) } diff --git a/benchmarks/bm.mergeMap.ts b/benchmarks/bm.mergeMap.ts index 2a7719c..80d4fa5 100644 --- a/benchmarks/bm.mergeMap.ts +++ b/benchmarks/bm.mergeMap.ts @@ -14,6 +14,8 @@ export function bm_mergeMap(suite: Suite) { return suite.add( 'file -> mergeMap', (d: IDeferred) => run(mergeMap(just(1e2), fromArray, fromArray(a)), d), - {defer: true} + { + defer: true + } ) } diff --git a/package.json b/package.json index 8384601..ae89d75 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "build": "rollup -c ./config/rollup.config.js", "lint": "git ls-files | grep '.ts$' | xargs tslint", "lint:fix": "git ls-files | grep '.ts$' | xargs tslint --fix", - "prettier": "git ls-files | grep '.ts$' | xargs prettier --print-width 80 --write --single-quote --no-semi --no-bracket-spacing", + "prettier": "git ls-files | grep '.ts$' | xargs prettier --write --config=.prettierrc", "travis-deploy-once": "travis-deploy-once", "semantic-release": "semantic-release" }, diff --git a/src/lib/Periodic.ts b/src/lib/Periodic.ts new file mode 100644 index 0000000..c5b641c --- /dev/null +++ b/src/lib/Periodic.ts @@ -0,0 +1,20 @@ +import {ISafeObserver} from './SafeObserver' +import {ISubscription} from './Subscription' + +export class Periodic implements ISubscription { + protected sub: ISubscription + protected sink: ISafeObserver + + onEvent() { + this.sink.next(undefined) + if (this.sink.erred) this.unsubscribe() + } + + unsubscribe(): void { + this.sub.unsubscribe() + } + + get closed() { + return this.sub.closed + } +} diff --git a/src/lib/SafeObserver.ts b/src/lib/SafeObserver.ts index 9b8f235..a9ac2c5 100644 --- a/src/lib/SafeObserver.ts +++ b/src/lib/SafeObserver.ts @@ -5,9 +5,13 @@ import {CompleteMixin, ErrorMixin, Virgin} from './Mixins' import {IObserver} from './Observer' import {ISafeFunction, tryCatch} from './Utils' +export interface ISafeObserver extends IObserver { + erred: boolean +} class SafeObserver extends ErrorMixin(CompleteMixin(Virgin)) implements IObserver { private _next: ISafeFunction> + erred = false constructor(public sink: IObserver) { super() @@ -16,17 +20,14 @@ class SafeObserver extends ErrorMixin(CompleteMixin(Virgin)) next(val: T): void { const r = this._next.call(this.sink, val) - if (r.isError()) this.sink.error(r.getError()) + if (r.isError) this.error(r.getError()) } error(err: Error): void { + this.erred = true this.sink.error(err) } - - complete(): void { - this.sink.complete() - } } -export const safeObserver = (observer: IObserver): IObserver => +export const safeObserver = (observer: IObserver): ISafeObserver => observer instanceof SafeObserver ? observer : new SafeObserver(observer) diff --git a/src/lib/Scheduler.ts b/src/lib/Scheduler.ts index 93b3e00..3b805a8 100644 --- a/src/lib/Scheduler.ts +++ b/src/lib/Scheduler.ts @@ -5,9 +5,15 @@ import {ISubscription} from './Subscription' export interface IScheduler { delay(task: () => void, relativeTime: number): ISubscription + periodic(task: () => void, interval: number): ISubscription + frame(task: () => void): ISubscription + + frames(task: () => void): ISubscription + asap(task: () => void): ISubscription + now(): number } @@ -33,6 +39,7 @@ class Periodic implements ISubscription { clearInterval(this.id) } } + class Delay implements ISubscription { closed = false @@ -52,6 +59,7 @@ class Delay implements ISubscription { } } } + class ASAP implements ISubscription { closed = false @@ -69,7 +77,8 @@ class ASAP implements ISubscription { if (this.closed === false) this.closed = true } } -class Frames implements ISubscription { + +class Frame implements ISubscription { closed = false private frame: number @@ -89,8 +98,34 @@ class Frames implements ISubscription { cancelAnimationFrame(this.frame) } } + +class Frames implements ISubscription { + closed = false + private frame: number + + constructor(private task: () => void) { + this.onEvent = this.onEvent.bind(this) + this.frame = requestAnimationFrame(this.onEvent) + } + + onEvent() { + this.task() + this.frame = requestAnimationFrame(this.onEvent) + } + + unsubscribe(): void { + if (this.closed) return + this.closed = true + cancelAnimationFrame(this.frame) + } +} + class Scheduler implements IScheduler { frame(task: () => void): ISubscription { + return new Frame(task) + } + + frames(task: () => void): ISubscription { return new Frames(task) } @@ -110,4 +145,5 @@ class Scheduler implements IScheduler { return Date.now() } } + export const createScheduler = (): IScheduler => new Scheduler() diff --git a/src/lib/Utils.ts b/src/lib/Utils.ts index e345d4b..3ffeeab 100644 --- a/src/lib/Utils.ts +++ b/src/lib/Utils.ts @@ -12,7 +12,7 @@ export function curry(f: Function, l: number = f.length): any { } export interface ISafeValue { - isError(): boolean + isError: boolean getValue(): T getError(): Error } @@ -22,11 +22,7 @@ export interface ISafeFunction { } class Guarded implements ISafeValue { - constructor(private value: Error | T) {} - - isError(): boolean { - return this.value instanceof Error - } + constructor(private value: Error | T, readonly isError: boolean) {} getValue() { return this.value as T @@ -43,9 +39,9 @@ class BaseSafeFunction call(ctx: C, ...t: any[]): ISafeValue { try { - return new Guarded(this.f.apply(ctx, t)) + return new Guarded(this.f.apply(ctx, t), false) } catch (e) { - return new Guarded(e) + return new Guarded(e, true) } } } diff --git a/src/sources/Frames.ts b/src/sources/Frames.ts index 73a8fd4..4ddd2c0 100644 --- a/src/sources/Frames.ts +++ b/src/sources/Frames.ts @@ -3,31 +3,15 @@ */ import {IObservable} from '../lib/Observable' import {IObserver} from '../lib/Observer' -import {safeObserver} from '../lib/SafeObserver' +import {Periodic} from '../lib/Periodic' +import {ISafeObserver, safeObserver} from '../lib/SafeObserver' import {IScheduler} from '../lib/Scheduler' import {ISubscription} from '../lib/Subscription' -class RAFSubscription implements ISubscription { - observer: IObserver - subscription: ISubscription - closed = false - - constructor(private sink: IObserver, private scheduler: IScheduler) { - this.schedule() - } - - private schedule() { - this.subscription = this.scheduler.frame(this.onFrame) - } - - onFrame = () => { - if (this.closed) return - this.sink.next(undefined) - this.schedule() - } - - unsubscribe(): void { - this.closed = true +class RAFSubscription extends Periodic { + constructor(readonly sink: ISafeObserver, scheduler: IScheduler) { + super() + this.sub = scheduler.frames(this.onEvent.bind(this)) } } @@ -36,6 +20,7 @@ class FrameObservable implements IObservable { return new RAFSubscription(safeObserver(observer), scheduler) } } + export function frames(): IObservable { return new FrameObservable() } diff --git a/src/sources/FromArray.ts b/src/sources/FromArray.ts index a99b68d..b569eda 100644 --- a/src/sources/FromArray.ts +++ b/src/sources/FromArray.ts @@ -19,9 +19,10 @@ class FromArraySubscription implements ISubscription { this.subscription = scheduler.asap(this.executeSafely.bind(this)) } + // TODO: use mixins private executeSafely() { const r = tryCatch(this.execute).call(this) - if (r.isError()) this.sink.error(r.getError()) + if (r.isError) this.sink.error(r.getError()) } execute() { diff --git a/src/sources/Interval.ts b/src/sources/Interval.ts index 0f89f07..4eabdd2 100644 --- a/src/sources/Interval.ts +++ b/src/sources/Interval.ts @@ -3,29 +3,19 @@ */ import {IObservable} from '../lib/Observable' import {IObserver} from '../lib/Observer' -import {safeObserver} from '../lib/SafeObserver' +import {Periodic} from '../lib/Periodic' +import {ISafeObserver, safeObserver} from '../lib/SafeObserver' import {IScheduler} from '../lib/Scheduler' import {ISubscription} from '../lib/Subscription' -class TimerSubscription implements ISubscription { - closed = false - subscription: ISubscription - +class TimerSubscription extends Periodic { constructor( - private sink: IObserver, + readonly sink: ISafeObserver, scheduler: IScheduler, interval: number ) { - this.subscription = scheduler.periodic(this.onFrame.bind(this), interval) - } - - onFrame() { - this.sink.next(undefined) - } - - unsubscribe(): void { - this.closed = true - this.subscription.unsubscribe() + super() + this.sub = scheduler.periodic(this.onEvent.bind(this), interval) } } diff --git a/src/testing/TestScheduler.ts b/src/testing/TestScheduler.ts index d40cc38..ca5cec1 100644 --- a/src/testing/TestScheduler.ts +++ b/src/testing/TestScheduler.ts @@ -88,6 +88,10 @@ export class TestScheduler implements IScheduler { return this.delay(task, this.now() + this.rafTimeout, 0) } + frames(task: () => void): ISubscription { + return this.periodic(task, this.rafTimeout) + } + periodic(task: () => void, interval: number): ISubscription { var closed = false const repeatedTask = () => { @@ -97,7 +101,9 @@ export class TestScheduler implements IScheduler { } this.delay(repeatedTask, interval) return { - closed, + get closed() { + return closed + }, unsubscribe() { closed = true } diff --git a/test/test.Frames.ts b/test/test.Frames.ts index f7358dc..b4fa18b 100644 --- a/test/test.Frames.ts +++ b/test/test.Frames.ts @@ -6,6 +6,7 @@ import {scan} from '../src/operators/Scan' import {frames} from '../src/sources/Frames' import {toMarble} from '../src/testing/Marble' import {createTestScheduler} from '../src/testing/TestScheduler' +import {throwError} from '../src/testing/Thrower' describe('frames()', () => { it('should emit requestAnimationFrame events', () => { @@ -13,4 +14,14 @@ describe('frames()', () => { const {results} = sh.start(() => scan(i => i + 1, -1, frames()), 200, 250) t.strictEqual(toMarble(results), '-0123') }) + + it('should capture internal errors', () => { + const sh = createTestScheduler(20) + const reduce = (i: number) => { + if (i === 5) throwError('Yo Air') + return i + 1 + } + const {results} = sh.start(() => scan(reduce, -1, frames()), 200, 500) + t.strictEqual(toMarble(results), '--0-1-2-3-4-5-#') + }) }) diff --git a/test/test.FromArray.ts b/test/test.FromArray.ts index d0a3e41..6fb41f4 100644 --- a/test/test.FromArray.ts +++ b/test/test.FromArray.ts @@ -12,6 +12,9 @@ import {ERROR_MESSAGE, throwError} from '../src/testing/Thrower' const {next, error} = EVENT describe('fromArray()', () => { + const throwMessage = (message: string) => { + throw message + } it('should emit array values as events', () => { const sh = createTestScheduler() const testFunction = (x: any) => @@ -19,4 +22,12 @@ describe('fromArray()', () => { const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3]))) t.deepEqual(results, [next(201, 100), error(201, new Error(ERROR_MESSAGE))]) }) + + it('should handle thrown non Error exceptions', () => { + const sh = createTestScheduler() + const testFunction = (x: any) => + x === 2 ? throwMessage(ERROR_MESSAGE) : x * 100 + const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3]))) + t.deepEqual(results, [next(201, 100), error(201, ERROR_MESSAGE as any)]) + }) }) diff --git a/test/test.Interval.ts b/test/test.Interval.ts index a23e430..a9b44a8 100644 --- a/test/test.Interval.ts +++ b/test/test.Interval.ts @@ -8,7 +8,7 @@ import {interval} from '../src/sources/Interval' import {EVENT, EventError} from '../src/testing/Events' import {toMarble} from '../src/testing/Marble' import {createTestScheduler} from '../src/testing/TestScheduler' -import {ERROR_MESSAGE, thrower} from '../src/testing/Thrower' +import {ERROR_MESSAGE, thrower, throwError} from '../src/testing/Thrower' const {error} = EVENT describe('interval()', () => { @@ -32,4 +32,13 @@ describe('interval()', () => { ERROR_MESSAGE ) }) + + it('should stop after error', () => { + const sh = createTestScheduler() + const {results} = sh.start(() => + scan(i => (i === 5 ? throwError('Yay!') : i + 1), 0, interval(20)) + ) + const expected = '--1-2-3-4-5-#' + t.strictEqual(toMarble(results), expected) + }) })