diff --git a/History.md b/History.md index fdba2405..b7c59cf0 100644 --- a/History.md +++ b/History.md @@ -1,3 +1,7 @@ +3.0.1 / 2016-07-06 +================== + - Fix find cursor pause and resume + 3.0.0 / 2016-07-06 ================== - remove Mongoskin dependency diff --git a/lib/collection.js b/lib/collection.js index 240679b0..5cfa4158 100644 --- a/lib/collection.js +++ b/lib/collection.js @@ -333,7 +333,6 @@ Collection.prototype.find = function (query, opts, fn) { }).then(thenFn(fn)).catch(catchFn(fn)) } - var didClose = false var promise = this.executeWhenOpened().then(function (col) { return col.find(query, opts) }).then(function (cursor) { @@ -341,37 +340,41 @@ Collection.prototype.find = function (query, opts, fn) { return cursor.toArray().then(thenFn(fn)).catch(catchFn(fn)) } + if (typeof opts.stream === 'function') { + promise.eachListener = opts.stream + } + + var didClose = false + function close () { didClose = true - cursor = cursor.cursor || cursor cursor.close() } return new Promise(function (resolve, reject) { - cursor.each(function (err, doc) { - if (didClose && !err) { - // emit success - err = doc = null - } - - if (err) { - if (fn) { - fn(err) - } - reject(err) - } else if (doc) { + cursor.on('data', function (doc) { + if (!didClose) { promise.eachListener(doc, { close: close, - pause: cursor.pause, - resume: cursor.resume + pause: cursor.pause.bind(cursor), + resume: cursor.resume.bind(cursor) }) - } else { - if (fn) { - fn() - } - resolve() } }) + + cursor.on('end', function () { + if (fn) { + fn() + } + resolve() + }) + + cursor.on('error', function (err) { + if (fn) { + fn(err) + } + reject(err) + }) }) }) diff --git a/package.json b/package.json index 00e13158..3eca179f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "monk", - "version": "3.0.0", + "version": "3.0.1", "main": "lib/monk.js", "tags": [ "mongodb", diff --git a/test/collection.js b/test/collection.js index b64b7fa4..7511519c 100644 --- a/test/collection.js +++ b/test/collection.js @@ -235,23 +235,41 @@ test('find > should allow stream cursor destroy', (t) => { const query = { cursor: { $exists: true } } let found = 0 return users.insert([{ cursor: true }, { cursor: true }, { cursor: true }, { cursor: true }]).then(() => { - return users.count(query).then((total) => { - if (total <= 1) throw new Error('Bad test') - return users.find(query) - .each((doc, {close}) => { - t.not(doc.cursor, null) - found++ - if (found === 2) close() - }) - .then(() => { - return new Promise((resolve) => { - setTimeout(() => { - t.is(found, 2) - resolve() - }, 100) - }) + return users.find(query) + .each((doc, {close}) => { + console.log(found) + t.not(doc.cursor, null) + found++ + if (found === 2) close() + }) + .then(() => { + return new Promise((resolve) => { + setTimeout(() => { + t.is(found, 2) + resolve() + }, 100) }) - }) + }) + }) +}) + +test('find > stream pause and continue', (t) => { + const query = { stream: 4 } + return users.insert([{ stream: 4 }, { stream: 4 }, { stream: 4 }, { stream: 4 }]).then(() => { + const start = Date.now() + let index = 0 + return users.find(query) + .each((doc, {pause, resume}) => { + pause() + const duration = Date.now() - start + t.true(duration > index * 1000) + index += 1 + setTimeout(resume, 1000) + }) + .then(() => { + const duration = Date.now() - start + t.true(duration > 3000) + }) }) })