diff --git a/packages/mongo/doc_fetcher.js b/packages/mongo/doc_fetcher.js index 3d4740bf34..c7e20f8a41 100644 --- a/packages/mongo/doc_fetcher.js +++ b/packages/mongo/doc_fetcher.js @@ -1,3 +1,5 @@ +import { EventEmitter } from 'events'; + export class DocFetcher { constructor(mongoConnection) { this._mongoConnection = mongoConnection; @@ -14,7 +16,7 @@ export class DocFetcher { // // You may assume that callback is never called synchronously (and in fact // OplogObserveDriver does so). - fetch(collectionName, id, op, callback) { + async fetch(collectionName, id, op) { const self = this; check(collectionName, String); @@ -22,36 +24,48 @@ export class DocFetcher { // If there's already an in-progress fetch for this cache key, yield until // it's done and return whatever it returns. - if (self._callbacksForOp.has(op)) { - self._callbacksForOp.get(op).push(callback); - return; + // console.log(self._callbacksForOp, {op}); + // if (self._callbacksForOp.has(op)) { + // self._callbacksForOp.get(op).push(callback); + // return callback; + // } + // self._callbacksForOp.set(op, [callback]); + const inProgress = self._callbacksForOp.has(op); + if (!inProgress) { + self._callbacksForOp.set(op, new EventEmitter()); } - - const callbacks = [callback]; - self._callbacksForOp.set(op, callbacks); - - Meteor._runAsync(async function () { - try { - var doc = await self._mongoConnection.findOne( - collectionName, {_id: id}) || null; - // Return doc to all relevant callbacks. Note that this array can - // continue to grow during callback excecution. - while (callbacks.length > 0) { - // Clone the document so that the various calls to fetch don't return - // objects that are intertwingled with each other. Clone before - // popping the future, so that if clone throws, the error gets passed - // to the next callback. - await callbacks.pop()(null, EJSON.clone(doc)); - } - } catch (e) { - while (callbacks.length > 0) { - await callbacks.pop()(e); - } - } finally { - // XXX consider keeping the doc around for a period of time before - // removing from the cache - self._callbacksForOp.delete(op); - } + const emitter = self._callbacksForOp.get(op); + const callback = new Promise((resolve, reject) => { + emitter.once('data', (data) => { + resolve(data); + }); + emitter.once('error', reject); }); + if (inProgress) { + return callback; + } + Meteor._runAsync(async function () { + try { + var doc = await self._mongoConnection.findOne( + collectionName, {_id: id}) || null; + // Return doc to all relevant callbacks. Note that this array can + // continue to grow during callback excecution. + const evEmmiter = self._callbacksForOp.get(op); + if (evEmmiter) { + evEmmiter.emit('data', doc); + } + } catch (e) { + const evEmmiter = self._callbacksForOp.get(op); + if (evEmmiter) { + evEmmiter.emit('error', e); + } + } finally { + // XXX consider keeping the doc around for a period of time before + // removing from the cache + self._callbacksForOp.delete(op); + } + } + ); + return callback; } } diff --git a/packages/mongo/doc_fetcher_tests.js b/packages/mongo/doc_fetcher_tests.js index 40cb214884..1ad4cbcff9 100644 --- a/packages/mongo/doc_fetcher_tests.js +++ b/packages/mongo/doc_fetcher_tests.js @@ -2,7 +2,6 @@ import { DocFetcher } from "./doc_fetcher.js"; testAsyncMulti("mongo-livedata - doc fetcher", [ async function (test, expect) { - var self = this; var collName = "docfetcher-" + Random.id(); var collection = new Mongo.Collection(collName); var id1 = await collection.insert({x: 1}); @@ -14,26 +13,27 @@ testAsyncMulti("mongo-livedata - doc fetcher", [ // Test basic operation. const fakeOp1 = {}; const fakeOp2 = {}; - // fetcher.fetch(collName, id1, fakeOp1, expect(null, {_id: id1, x: 1})); - // fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null)); - // - // var fetched = false; - // var fakeOp3 = {}; - // var expected = {_id: id2, y: 2}; - // fetcher.fetch(collName, id2, fakeOp3, expect(function (e, d) { - // fetched = true; - // test.isFalse(e); - // test.equal(d, expected); - // })); - // // The fetcher yields. - // test.isFalse(fetched); - // - // // Now ask for another document with the same op reference. Because a - // // fetch for that op is in flight, we will get the other fetch's - // // document, not this random document. - // fetcher.fetch(collName, Random.id(), fakeOp3, expect(function (e, d) { - // test.isFalse(e); - // test.equal(d, expected); - // })); + const promise1 = await fetcher.fetch(collName, id1, fakeOp1).then(expect({_id: id1, x: 1})); + const promise2 = fetcher.fetch(collName, "nonexistent!", fakeOp2).then(expect(null)); + + var fetched = false; + var fakeOp3 = {}; + var expected = {_id: id2, y: 2}; + const promise3 = fetcher.fetch(collName, id2, fakeOp3).then(expect(function (d) { + fetched = true; + test.equal(d, expected); + })).catch(expect(function (e) { + test.isFalse(e); + })); + test.isFalse(fetched); + // Now ask for another document with the same op reference. Because a + // fetch for that op is in flight, we will get the other fetch's + // document, not this random document. + const promise4 = fetcher.fetch(collName, Random.id(), fakeOp3).then(expect(function (d) { + test.equal(d, expected); + })).catch(expect(function (e) { + test.isFalse(e); + })); + await Promise.all([promise1, promise2, promise3, promise4]); } ]);