mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
- fix tests 'mongo-livedata - doc fetcher'
This commit is contained in:
@@ -14,7 +14,7 @@ export class DocFetcher {
|
||||
//
|
||||
// You may assume that callback is never called synchronously (and in fact
|
||||
// OplogObserveDriver does so).
|
||||
async fetch(collectionName, id, op) {
|
||||
async fetch(collectionName, id, op, callback) {
|
||||
const self = this;
|
||||
|
||||
check(collectionName, String);
|
||||
@@ -22,33 +22,36 @@ export class DocFetcher {
|
||||
|
||||
// If there's already an in-progress fetch for this cache key, yield until
|
||||
// it's done and return whatever it returns.
|
||||
const inProgress = self._callbacksForOp.has(op);
|
||||
const em = self._callbacksForOp.get(op);
|
||||
const { promise: callback, emitter } = EmitterPromise.newPromiseResolver({
|
||||
emitter: em
|
||||
});
|
||||
if (!inProgress) {
|
||||
self._callbacksForOp.set(op, emitter);
|
||||
if (self._callbacksForOp.has(op)) {
|
||||
self._callbacksForOp.get(op).push(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
if (inProgress) {
|
||||
return callback;
|
||||
}
|
||||
Meteor._runAsync(async function () {
|
||||
try {
|
||||
var doc = await self._mongoConnection.findOneAsync(
|
||||
collectionName, {_id: id}) || null;
|
||||
// Return doc to all relevant callbacks. Note that this array can
|
||||
// continue to grow during callback excecution.
|
||||
emitter.emit('data', doc);
|
||||
} catch (e) {
|
||||
emitter.emit('error', e);
|
||||
} finally {
|
||||
// XXX consider keeping the doc around for a period of time before
|
||||
// removing from the cache
|
||||
self._callbacksForOp.delete(op);
|
||||
const callbacks = [callback];
|
||||
self._callbacksForOp.set(op, callbacks);
|
||||
|
||||
try {
|
||||
var doc =
|
||||
(await self._mongoConnection.findOneAsync(collectionName, {
|
||||
_id: id,
|
||||
})) || null;
|
||||
// Return doc to all relevant callbacks. Note that this array can
|
||||
// continue to grow during callback excecution.
|
||||
while (callbacks.length > 0) {
|
||||
// Clone the document so that the various calls to fetch don't return
|
||||
// objects that are intertwingled with each other. Clone before
|
||||
// popping the future, so that if clone throws, the error gets passed
|
||||
// to the next callback.
|
||||
callbacks.pop()(null, EJSON.clone(doc));
|
||||
}
|
||||
});
|
||||
return callback;
|
||||
} catch (e) {
|
||||
while (callbacks.length > 0) {
|
||||
callbacks.pop()(e);
|
||||
}
|
||||
} finally {
|
||||
// XXX consider keeping the doc around for a period of time before
|
||||
// removing from the cache
|
||||
self._callbacksForOp.delete(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,41 +1,55 @@
|
||||
var Fiber = Npm.require('fibers');
|
||||
var Future = Npm.require('fibers/future');
|
||||
import { DocFetcher } from "./doc_fetcher.js";
|
||||
import { DocFetcher } from './doc_fetcher.js';
|
||||
|
||||
testAsyncMulti("mongo-livedata - doc fetcher", [
|
||||
function (test, expect) {
|
||||
testAsyncMulti('mongo-livedata - doc fetcher', [
|
||||
async function(test, expect) {
|
||||
var self = this;
|
||||
var collName = "docfetcher-" + Random.id();
|
||||
var collName = 'docfetcher-' + Random.id();
|
||||
var collection = new Mongo.Collection(collName);
|
||||
var id1 = collection.insert({x: 1});
|
||||
var id2 = collection.insert({y: 2});
|
||||
var id1 = await collection.insertAsync({ x: 1 });
|
||||
var id2 = await collection.insertAsync({ y: 2 });
|
||||
|
||||
var fetcher = new DocFetcher(
|
||||
MongoInternals.defaultRemoteCollectionDriver().mongo);
|
||||
MongoInternals.defaultRemoteCollectionDriver().mongo
|
||||
);
|
||||
|
||||
// Test basic operation.
|
||||
const fakeOp1 = {};
|
||||
const fakeOp2 = {};
|
||||
fetcher.fetch(collName, id1, fakeOp1, expect(null, {_id: id1, x: 1}));
|
||||
fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null));
|
||||
await fetcher.fetch(
|
||||
collName,
|
||||
id1,
|
||||
fakeOp1,
|
||||
expect(null, { _id: id1, x: 1 })
|
||||
);
|
||||
await fetcher.fetch(collName, 'nonexistent!', fakeOp2, expect(null, null));
|
||||
|
||||
|
||||
var fetched = false;
|
||||
var fakeOp3 = {};
|
||||
var expected = {_id: id2, y: 2};
|
||||
fetcher.fetch(collName, id2, fakeOp3, expect(function (e, d) {
|
||||
fetched = true;
|
||||
test.isFalse(e);
|
||||
test.equal(d, expected);
|
||||
}));
|
||||
var expected = { _id: id2, y: 2 };
|
||||
fetcher.fetch(
|
||||
collName,
|
||||
id2,
|
||||
fakeOp3,
|
||||
expect(function(e, d) {
|
||||
fetched = true;
|
||||
test.isFalse(e);
|
||||
test.equal(d, expected);
|
||||
})
|
||||
);
|
||||
// The fetcher yields.
|
||||
test.isFalse(fetched);
|
||||
|
||||
// Now ask for another document with the same op reference. Because a
|
||||
// fetch for that op is in flight, we will get the other fetch's
|
||||
// document, not this random document.
|
||||
fetcher.fetch(collName, Random.id(), fakeOp3, expect(function (e, d) {
|
||||
test.isFalse(e);
|
||||
test.equal(d, expected);
|
||||
}));
|
||||
}
|
||||
await fetcher.fetch(
|
||||
collName,
|
||||
Random.id(),
|
||||
fakeOp3,
|
||||
expect(function(e, d) {
|
||||
test.isFalse(e);
|
||||
test.equal(d, expected);
|
||||
})
|
||||
);
|
||||
},
|
||||
]);
|
||||
|
||||
Reference in New Issue
Block a user