-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsource-test.js
78 lines (71 loc) · 2.14 KB
/
source-test.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
var S = require('pull-stream')
// the source stream takes two params: abort and cb
// it calls the cb with two params: (end, value)
// the source stream has a contract: if it is passed a truthy `abort`
// param, it must call the callback with a truthy `end` param
function SourceTest (test, source) {
test('sink that aborts', function (t) {
AbortingSink(t)(source())
})
test('Async Sink', function (t) {
AsyncSink(t)(source())
})
test('Slow Sink', function (t) {
SlowSink(t)(source())
})
}
function AbortingSink (t, count) {
t.plan(1)
count = count || 0
var i = 0
return S.drain(function onData (d) {
if (i === count) return false
i++
}, function onEnd (err) {
t.pass('should end after the sink aborts')
})
}
function AsyncSink (t) {
t.plan(1)
return function asyncSink (source) {
var isResolving = false
function onNext (end, data) {
if (isResolving) {
t.fail('should not callback before the sink is ready')
}
isResolving = true
if (end) return t.pass('should not callback out of turn')
process.nextTick(function () {
isResolving = false
source(null, onNext)
})
}
process.nextTick(function () {
source(null, onNext)
})
}
}
function SlowSink (t, time) {
t.plan(1)
time = time || 50
return function asyncSink (source) {
var isResolving = false
setTimeout(function () {
source(null, function onNext (end, data) {
if (isResolving) {
t.fail('should not callback before the sink is ready')
}
isResolving = true
if (end) return t.pass('should not callback out of turn')
setTimeout(function () {
isResolving = false
source(null, onNext)
}, time)
})
}, time)
}
}
SourceTest.AbortingSink = AbortingSink
SourceTest.AsyncSink = AsyncSink
SourceTest.SlowSink = SlowSink
module.exports = SourceTest