Skip to content

Commit

Permalink
Merge pull request #238 from tusharmath/frames-scheduler
Browse files Browse the repository at this point in the history
Frames scheduler
  • Loading branch information
tusharmath authored Mar 20, 2018
2 parents 326d61c + ff1d9d0 commit 2ae171b
Show file tree
Hide file tree
Showing 16 changed files with 132 additions and 61 deletions.
3 changes: 2 additions & 1 deletion .prettierrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"printWidth": 80,
"semi": false,
"singleQuote": true
"singleQuote": true,
"bracketSpacing": false
}
2 changes: 1 addition & 1 deletion API.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ Takes in multiple sources and a sample source and returns a new observable which

```ts
O.sample(
O.interval(1000),
(a, b) => [a, b],
O.interval(1000),
[
O.mapTo('A', O.interval(100)),
O.mapTo('B', O.interval(200))
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/bm.concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export function bm_concat(suite: Suite) {
return suite.add(
'(file, file) -> concat',
(d: IDeferred) => run(concat(fromArray(a), fromArray(b)), d),
{defer: true}
{
defer: true
}
)
}
4 changes: 3 additions & 1 deletion benchmarks/bm.mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export function bm_mergeMap(suite: Suite) {
return suite.add(
'file -> mergeMap',
(d: IDeferred) => run(mergeMap(just(1e2), fromArray, fromArray(a)), d),
{defer: true}
{
defer: true
}
)
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"build": "rollup -c ./config/rollup.config.js",
"lint": "git ls-files | grep '.ts$' | xargs tslint",
"lint:fix": "git ls-files | grep '.ts$' | xargs tslint --fix",
"prettier": "git ls-files | grep '.ts$' | xargs prettier --print-width 80 --write --single-quote --no-semi --no-bracket-spacing",
"prettier": "git ls-files | grep '.ts$' | xargs prettier --write --config=.prettierrc",
"travis-deploy-once": "travis-deploy-once",
"semantic-release": "semantic-release"
},
Expand Down
20 changes: 20 additions & 0 deletions src/lib/Periodic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {ISafeObserver} from './SafeObserver'
import {ISubscription} from './Subscription'

export class Periodic implements ISubscription {
protected sub: ISubscription
protected sink: ISafeObserver<void>

onEvent() {
this.sink.next(undefined)
if (this.sink.erred) this.unsubscribe()
}

unsubscribe(): void {
this.sub.unsubscribe()
}

get closed() {
return this.sub.closed
}
}
13 changes: 7 additions & 6 deletions src/lib/SafeObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import {CompleteMixin, ErrorMixin, Virgin} from './Mixins'
import {IObserver} from './Observer'
import {ISafeFunction, tryCatch} from './Utils'

export interface ISafeObserver<T> extends IObserver<T> {
erred: boolean
}
class SafeObserver<T> extends ErrorMixin(CompleteMixin(Virgin))
implements IObserver<T> {
private _next: ISafeFunction<void, IObserver<T>>
erred = false

constructor(public sink: IObserver<T>) {
super()
Expand All @@ -16,17 +20,14 @@ class SafeObserver<T> extends ErrorMixin(CompleteMixin(Virgin))

next(val: T): void {
const r = this._next.call(this.sink, val)
if (r.isError()) this.sink.error(r.getError())
if (r.isError) this.error(r.getError())
}

error(err: Error): void {
this.erred = true
this.sink.error(err)
}

complete(): void {
this.sink.complete()
}
}

export const safeObserver = <T>(observer: IObserver<T>): IObserver<T> =>
export const safeObserver = <T>(observer: IObserver<T>): ISafeObserver<T> =>
observer instanceof SafeObserver ? observer : new SafeObserver(observer)
38 changes: 37 additions & 1 deletion src/lib/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import {ISubscription} from './Subscription'

export interface IScheduler {
delay(task: () => void, relativeTime: number): ISubscription

periodic(task: () => void, interval: number): ISubscription

frame(task: () => void): ISubscription

frames(task: () => void): ISubscription

asap(task: () => void): ISubscription

now(): number
}

Expand All @@ -33,6 +39,7 @@ class Periodic implements ISubscription {
clearInterval(this.id)
}
}

class Delay implements ISubscription {
closed = false

Expand All @@ -52,6 +59,7 @@ class Delay implements ISubscription {
}
}
}

class ASAP implements ISubscription {
closed = false

Expand All @@ -69,7 +77,8 @@ class ASAP implements ISubscription {
if (this.closed === false) this.closed = true
}
}
class Frames implements ISubscription {

class Frame implements ISubscription {
closed = false
private frame: number

Expand All @@ -89,8 +98,34 @@ class Frames implements ISubscription {
cancelAnimationFrame(this.frame)
}
}

class Frames implements ISubscription {
closed = false
private frame: number

constructor(private task: () => void) {
this.onEvent = this.onEvent.bind(this)
this.frame = requestAnimationFrame(this.onEvent)
}

onEvent() {
this.task()
this.frame = requestAnimationFrame(this.onEvent)
}

unsubscribe(): void {
if (this.closed) return
this.closed = true
cancelAnimationFrame(this.frame)
}
}

class Scheduler implements IScheduler {
frame(task: () => void): ISubscription {
return new Frame(task)
}

frames(task: () => void): ISubscription {
return new Frames(task)
}

Expand All @@ -110,4 +145,5 @@ class Scheduler implements IScheduler {
return Date.now()
}
}

export const createScheduler = (): IScheduler => new Scheduler()
12 changes: 4 additions & 8 deletions src/lib/Utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export function curry(f: Function, l: number = f.length): any {
}

export interface ISafeValue<T> {
isError(): boolean
isError: boolean
getValue(): T
getError(): Error
}
Expand All @@ -22,11 +22,7 @@ export interface ISafeFunction<V, C> {
}

class Guarded<T> implements ISafeValue<T> {
constructor(private value: Error | T) {}

isError(): boolean {
return this.value instanceof Error
}
constructor(private value: Error | T, readonly isError: boolean) {}

getValue() {
return this.value as T
Expand All @@ -43,9 +39,9 @@ class BaseSafeFunction<T extends Function, V, C>

call(ctx: C, ...t: any[]): ISafeValue<V> {
try {
return new Guarded(this.f.apply(ctx, t))
return new Guarded(this.f.apply(ctx, t), false)
} catch (e) {
return new Guarded(e)
return new Guarded(e, true)
}
}
}
Expand Down
29 changes: 7 additions & 22 deletions src/sources/Frames.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,15 @@
*/
import {IObservable} from '../lib/Observable'
import {IObserver} from '../lib/Observer'
import {safeObserver} from '../lib/SafeObserver'
import {Periodic} from '../lib/Periodic'
import {ISafeObserver, safeObserver} from '../lib/SafeObserver'
import {IScheduler} from '../lib/Scheduler'
import {ISubscription} from '../lib/Subscription'

class RAFSubscription implements ISubscription {
observer: IObserver<number>
subscription: ISubscription
closed = false

constructor(private sink: IObserver<void>, private scheduler: IScheduler) {
this.schedule()
}

private schedule() {
this.subscription = this.scheduler.frame(this.onFrame)
}

onFrame = () => {
if (this.closed) return
this.sink.next(undefined)
this.schedule()
}

unsubscribe(): void {
this.closed = true
class RAFSubscription extends Periodic {
constructor(readonly sink: ISafeObserver<void>, scheduler: IScheduler) {
super()
this.sub = scheduler.frames(this.onEvent.bind(this))
}
}

Expand All @@ -36,6 +20,7 @@ class FrameObservable implements IObservable<void> {
return new RAFSubscription(safeObserver(observer), scheduler)
}
}

export function frames(): IObservable<void> {
return new FrameObservable()
}
3 changes: 2 additions & 1 deletion src/sources/FromArray.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ class FromArraySubscription<T> implements ISubscription {
this.subscription = scheduler.asap(this.executeSafely.bind(this))
}

// TODO: use mixins
private executeSafely() {
const r = tryCatch(this.execute).call(this)
if (r.isError()) this.sink.error(r.getError())
if (r.isError) this.sink.error(r.getError())
}

execute() {
Expand Down
22 changes: 6 additions & 16 deletions src/sources/Interval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,19 @@
*/
import {IObservable} from '../lib/Observable'
import {IObserver} from '../lib/Observer'
import {safeObserver} from '../lib/SafeObserver'
import {Periodic} from '../lib/Periodic'
import {ISafeObserver, safeObserver} from '../lib/SafeObserver'
import {IScheduler} from '../lib/Scheduler'
import {ISubscription} from '../lib/Subscription'

class TimerSubscription implements ISubscription {
closed = false
subscription: ISubscription

class TimerSubscription extends Periodic {
constructor(
private sink: IObserver<void>,
readonly sink: ISafeObserver<void>,
scheduler: IScheduler,
interval: number
) {
this.subscription = scheduler.periodic(this.onFrame.bind(this), interval)
}

onFrame() {
this.sink.next(undefined)
}

unsubscribe(): void {
this.closed = true
this.subscription.unsubscribe()
super()
this.sub = scheduler.periodic(this.onEvent.bind(this), interval)
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/testing/TestScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export class TestScheduler implements IScheduler {
return this.delay(task, this.now() + this.rafTimeout, 0)
}

frames(task: () => void): ISubscription {
return this.periodic(task, this.rafTimeout)
}

periodic(task: () => void, interval: number): ISubscription {
var closed = false
const repeatedTask = () => {
Expand All @@ -97,7 +101,9 @@ export class TestScheduler implements IScheduler {
}
this.delay(repeatedTask, interval)
return {
closed,
get closed() {
return closed
},
unsubscribe() {
closed = true
}
Expand Down
11 changes: 11 additions & 0 deletions test/test.Frames.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@ import {scan} from '../src/operators/Scan'
import {frames} from '../src/sources/Frames'
import {toMarble} from '../src/testing/Marble'
import {createTestScheduler} from '../src/testing/TestScheduler'
import {throwError} from '../src/testing/Thrower'

describe('frames()', () => {
it('should emit requestAnimationFrame events', () => {
const sh = createTestScheduler(10)
const {results} = sh.start(() => scan(i => i + 1, -1, frames()), 200, 250)
t.strictEqual(toMarble(results), '-0123')
})

it('should capture internal errors', () => {
const sh = createTestScheduler(20)
const reduce = (i: number) => {
if (i === 5) throwError('Yo Air')
return i + 1
}
const {results} = sh.start(() => scan(reduce, -1, frames()), 200, 500)
t.strictEqual(toMarble(results), '--0-1-2-3-4-5-#')
})
})
11 changes: 11 additions & 0 deletions test/test.FromArray.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@ import {ERROR_MESSAGE, throwError} from '../src/testing/Thrower'
const {next, error} = EVENT

describe('fromArray()', () => {
const throwMessage = (message: string) => {
throw message
}
it('should emit array values as events', () => {
const sh = createTestScheduler()
const testFunction = (x: any) =>
x === 2 ? throwError(ERROR_MESSAGE) : x * 100
const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3])))
t.deepEqual(results, [next(201, 100), error(201, new Error(ERROR_MESSAGE))])
})

it('should handle thrown non Error exceptions', () => {
const sh = createTestScheduler()
const testFunction = (x: any) =>
x === 2 ? throwMessage(ERROR_MESSAGE) : x * 100
const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3])))
t.deepEqual(results, [next(201, 100), error(201, ERROR_MESSAGE as any)])
})
})
Loading

0 comments on commit 2ae171b

Please sign in to comment.