From 4bcfa66b429b52940daab350cdc7ec6ffd9119d9 Mon Sep 17 00:00:00 2001 From: denihs Date: Mon, 13 Feb 2023 10:41:01 -0400 Subject: [PATCH] - fix tests 'mongo-livedata - doc fetcher' --- packages/mongo/doc_fetcher.js | 55 +++++++++++++------------- packages/mongo/doc_fetcher_tests.js | 60 ++++++++++++++++++----------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/packages/mongo/doc_fetcher.js b/packages/mongo/doc_fetcher.js index 303c3267c0..fdffcbb4d3 100644 --- a/packages/mongo/doc_fetcher.js +++ b/packages/mongo/doc_fetcher.js @@ -14,7 +14,7 @@ export class DocFetcher { // // You may assume that callback is never called synchronously (and in fact // OplogObserveDriver does so). - async fetch(collectionName, id, op) { + async fetch(collectionName, id, op, callback) { const self = this; check(collectionName, String); @@ -22,33 +22,36 @@ export class DocFetcher { // If there's already an in-progress fetch for this cache key, yield until // it's done and return whatever it returns. - const inProgress = self._callbacksForOp.has(op); - const em = self._callbacksForOp.get(op); - const { promise: callback, emitter } = EmitterPromise.newPromiseResolver({ - emitter: em - }); - if (!inProgress) { - self._callbacksForOp.set(op, emitter); + if (self._callbacksForOp.has(op)) { + self._callbacksForOp.get(op).push(callback); + return; } - if (inProgress) { - return callback; - } - Meteor._runAsync(async function () { - try { - var doc = await self._mongoConnection.findOneAsync( - collectionName, {_id: id}) || null; - // Return doc to all relevant callbacks. Note that this array can - // continue to grow during callback excecution. - emitter.emit('data', doc); - } catch (e) { - emitter.emit('error', e); - } finally { - // XXX consider keeping the doc around for a period of time before - // removing from the cache - self._callbacksForOp.delete(op); + const callbacks = [callback]; + self._callbacksForOp.set(op, callbacks); + + try { + var doc = + (await self._mongoConnection.findOneAsync(collectionName, { + _id: id, + })) || null; + // Return doc to all relevant callbacks. Note that this array can + // continue to grow during callback excecution. + while (callbacks.length > 0) { + // Clone the document so that the various calls to fetch don't return + // objects that are intertwingled with each other. Clone before + // popping the future, so that if clone throws, the error gets passed + // to the next callback. + callbacks.pop()(null, EJSON.clone(doc)); } - }); - return callback; + } catch (e) { + while (callbacks.length > 0) { + callbacks.pop()(e); + } + } finally { + // XXX consider keeping the doc around for a period of time before + // removing from the cache + self._callbacksForOp.delete(op); + } } } diff --git a/packages/mongo/doc_fetcher_tests.js b/packages/mongo/doc_fetcher_tests.js index 484b5f6d03..0a68752467 100644 --- a/packages/mongo/doc_fetcher_tests.js +++ b/packages/mongo/doc_fetcher_tests.js @@ -1,41 +1,55 @@ -var Fiber = Npm.require('fibers'); -var Future = Npm.require('fibers/future'); -import { DocFetcher } from "./doc_fetcher.js"; +import { DocFetcher } from './doc_fetcher.js'; -testAsyncMulti("mongo-livedata - doc fetcher", [ - function (test, expect) { +testAsyncMulti('mongo-livedata - doc fetcher', [ + async function(test, expect) { var self = this; - var collName = "docfetcher-" + Random.id(); + var collName = 'docfetcher-' + Random.id(); var collection = new Mongo.Collection(collName); - var id1 = collection.insert({x: 1}); - var id2 = collection.insert({y: 2}); + var id1 = await collection.insertAsync({ x: 1 }); + var id2 = await collection.insertAsync({ y: 2 }); var fetcher = new DocFetcher( - MongoInternals.defaultRemoteCollectionDriver().mongo); + MongoInternals.defaultRemoteCollectionDriver().mongo + ); // Test basic operation. const fakeOp1 = {}; const fakeOp2 = {}; - fetcher.fetch(collName, id1, fakeOp1, expect(null, {_id: id1, x: 1})); - fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null)); + await fetcher.fetch( + collName, + id1, + fakeOp1, + expect(null, { _id: id1, x: 1 }) + ); + await fetcher.fetch(collName, 'nonexistent!', fakeOp2, expect(null, null)); + var fetched = false; var fakeOp3 = {}; - var expected = {_id: id2, y: 2}; - fetcher.fetch(collName, id2, fakeOp3, expect(function (e, d) { - fetched = true; - test.isFalse(e); - test.equal(d, expected); - })); + var expected = { _id: id2, y: 2 }; + fetcher.fetch( + collName, + id2, + fakeOp3, + expect(function(e, d) { + fetched = true; + test.isFalse(e); + test.equal(d, expected); + }) + ); // The fetcher yields. - test.isFalse(fetched); // Now ask for another document with the same op reference. Because a // fetch for that op is in flight, we will get the other fetch's // document, not this random document. - fetcher.fetch(collName, Random.id(), fakeOp3, expect(function (e, d) { - test.isFalse(e); - test.equal(d, expected); - })); - } + await fetcher.fetch( + collName, + Random.id(), + fakeOp3, + expect(function(e, d) { + test.isFalse(e); + test.equal(d, expected); + }) + ); + }, ]);