Skip to content

Commit

Permalink
fix find cursor pause and resume
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieudutour committed Jul 6, 2016
1 parent afb3b55 commit c98e9a8
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 38 deletions.
4 changes: 4 additions & 0 deletions History.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
3.0.1 / 2016-07-06
==================
- Fix find cursor pause and resume

3.0.0 / 2016-07-06
==================
- remove Mongoskin dependency
Expand Down
45 changes: 24 additions & 21 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -333,45 +333,48 @@ Collection.prototype.find = function (query, opts, fn) {
}).then(thenFn(fn)).catch(catchFn(fn))
}

var didClose = false
var promise = this.executeWhenOpened().then(function (col) {
return col.find(query, opts)
}).then(function (cursor) {
if (!opts.stream && !promise.eachListener) {
return cursor.toArray().then(thenFn(fn)).catch(catchFn(fn))
}

if (typeof opts.stream === 'function') {
promise.eachListener = opts.stream
}

var didClose = false

function close () {
didClose = true
cursor = cursor.cursor || cursor
cursor.close()
}

return new Promise(function (resolve, reject) {
cursor.each(function (err, doc) {
if (didClose && !err) {
// emit success
err = doc = null
}

if (err) {
if (fn) {
fn(err)
}
reject(err)
} else if (doc) {
cursor.on('data', function (doc) {
if (!didClose) {
promise.eachListener(doc, {
close: close,
pause: cursor.pause,
resume: cursor.resume
pause: cursor.pause.bind(cursor),
resume: cursor.resume.bind(cursor)
})
} else {
if (fn) {
fn()
}
resolve()
}
})

cursor.on('end', function () {
if (fn) {
fn()
}
resolve()
})

cursor.on('error', function (err) {
if (fn) {
fn(err)
}
reject(err)
})
})
})

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "monk",
"version": "3.0.0",
"version": "3.0.1",
"main": "lib/monk.js",
"tags": [
"mongodb",
Expand Down
50 changes: 34 additions & 16 deletions test/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,41 @@ test('find > should allow stream cursor destroy', (t) => {
const query = { cursor: { $exists: true } }
let found = 0
return users.insert([{ cursor: true }, { cursor: true }, { cursor: true }, { cursor: true }]).then(() => {
return users.count(query).then((total) => {
if (total <= 1) throw new Error('Bad test')
return users.find(query)
.each((doc, {close}) => {
t.not(doc.cursor, null)
found++
if (found === 2) close()
})
.then(() => {
return new Promise((resolve) => {
setTimeout(() => {
t.is(found, 2)
resolve()
}, 100)
})
return users.find(query)
.each((doc, {close}) => {
console.log(found)
t.not(doc.cursor, null)
found++
if (found === 2) close()
})
.then(() => {
return new Promise((resolve) => {
setTimeout(() => {
t.is(found, 2)
resolve()
}, 100)
})
})
})
})
})

test('find > stream pause and continue', (t) => {
const query = { stream: 4 }
return users.insert([{ stream: 4 }, { stream: 4 }, { stream: 4 }, { stream: 4 }]).then(() => {
const start = Date.now()
let index = 0
return users.find(query)
.each((doc, {pause, resume}) => {
pause()
const duration = Date.now() - start
t.true(duration > index * 1000)
index += 1
setTimeout(resume, 1000)
})
.then(() => {
const duration = Date.now() - start
t.true(duration > 3000)
})
})
})

Expand Down

0 comments on commit c98e9a8

Please sign in to comment.