mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Make DocFetcher more async.
This should use fewer fibers. nim, can you benchmark?
This commit is contained in:
@@ -1,20 +1,21 @@
|
||||
var Fiber = Npm.require('fibers');
|
||||
var Future = Npm.require('fibers/future');
|
||||
|
||||
DocFetcher = function (mongoConnection) {
|
||||
var self = this;
|
||||
self._mongoConnection = mongoConnection;
|
||||
// Map from cache key -> [Future]
|
||||
self._futuresForCacheKey = {};
|
||||
// Map from cache key -> [callback]
|
||||
self._callbacksForCacheKey = {};
|
||||
};
|
||||
|
||||
_.extend(DocFetcher.prototype, {
|
||||
// Fetches document "id" from collectionName, returning it or null if not
|
||||
// found. Throws other errors. Can yield.
|
||||
// found.
|
||||
//
|
||||
// If you make multiple calls to fetch() with the same cacheKey (a string),
|
||||
// DocFetcher may assume that they all return the same document. (It does
|
||||
// not check to see if collectionName/id match.)
|
||||
fetch: function (collectionName, id, cacheKey) {
|
||||
fetch: function (collectionName, id, cacheKey, callback) {
|
||||
var self = this;
|
||||
|
||||
check(collectionName, String);
|
||||
@@ -23,38 +24,36 @@ _.extend(DocFetcher.prototype, {
|
||||
|
||||
// If there's already an in-progress fetch for this cache key, yield until
|
||||
// it's done and return whatever it returns.
|
||||
if (_.has(self._futuresForCacheKey, cacheKey)) {
|
||||
var f = new Future;
|
||||
self._futuresForCacheKey[cacheKey].push(f);
|
||||
return f.wait();
|
||||
if (_.has(self._callbacksForCacheKey, cacheKey)) {
|
||||
self._callbacksForCacheKey[cacheKey].push(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
var futures = self._futuresForCacheKey[cacheKey] = [];
|
||||
var callbacks = self._callbacksForCacheKey[cacheKey] = [callback];
|
||||
|
||||
try {
|
||||
var doc = self._mongoConnection.findOne(
|
||||
collectionName, {_id: id}) || null;
|
||||
// Return doc to all fibers that are blocking on us. Note that this array
|
||||
// can continue to grow during calls to Future.return.
|
||||
while (!_.isEmpty(futures)) {
|
||||
// Clone the document so that the various calls to fetch don't return
|
||||
// objects that are intertwingled with each other. Clone before popping
|
||||
// the future, so that if clone throws, the error gets thrown to the
|
||||
// next future instead of that fiber hanging.
|
||||
var clonedDoc = EJSON.clone(doc);
|
||||
futures.pop().return(clonedDoc);
|
||||
Fiber(function () {
|
||||
try {
|
||||
var doc = self._mongoConnection.findOne(
|
||||
collectionName, {_id: id}) || null;
|
||||
// Return doc to all relevant callbacks. Note that this array can
|
||||
// continue to grow during callback excecution.
|
||||
while (!_.isEmpty(callbacks)) {
|
||||
// Clone the document so that the various calls to fetch don't return
|
||||
// objects that are intertwingled with each other. Clone before
|
||||
// popping the future, so that if clone throws, the error gets passed
|
||||
// to the next callback.
|
||||
var clonedDoc = EJSON.clone(doc);
|
||||
callbacks.pop()(null, clonedDoc);
|
||||
}
|
||||
} catch (e) {
|
||||
while (!_.isEmpty(callbacks)) {
|
||||
callbacks.pop()(e);
|
||||
}
|
||||
} finally {
|
||||
// XXX consider keeping the doc around for a period of time before
|
||||
// removing from the cache
|
||||
delete self._callbacksForCacheKey[cacheKey];
|
||||
}
|
||||
} catch (e) {
|
||||
while (!_.isEmpty(futures)) {
|
||||
futures.pop().throw(e);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
// XXX consider keeping the doc around for a period of time before
|
||||
// removing from the cache
|
||||
delete self._futuresForCacheKey[cacheKey];
|
||||
}
|
||||
|
||||
return doc;
|
||||
}).run();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,38 +1,38 @@
|
||||
var Fiber = Npm.require('fibers');
|
||||
var Future = Npm.require('fibers/future');
|
||||
|
||||
Tinytest.add("mongo-livedata - doc fetcher", function (test) {
|
||||
var collName = "docfetcher-" + Random.id();
|
||||
var collection = new Meteor.Collection(collName);
|
||||
var id1 = collection.insert({x: 1});
|
||||
var id2 = collection.insert({y: 2});
|
||||
testAsyncMulti("mongo-livedata - doc fetcher", [
|
||||
function (test, expect) {
|
||||
var self = this;
|
||||
var collName = "docfetcher-" + Random.id();
|
||||
var collection = new Meteor.Collection(collName);
|
||||
var id1 = collection.insert({x: 1});
|
||||
var id2 = collection.insert({y: 2});
|
||||
|
||||
var fetcher = new MongoTest.DocFetcher(
|
||||
MongoInternals.defaultRemoteCollectionDriver().mongo);
|
||||
var fetcher = new MongoTest.DocFetcher(
|
||||
MongoInternals.defaultRemoteCollectionDriver().mongo);
|
||||
|
||||
// Test basic operation.
|
||||
test.equal(fetcher.fetch(collName, id1, Random.id()),
|
||||
{_id: id1, x: 1});
|
||||
test.equal(fetcher.fetch(collName, "nonexistent!", Random.id()), null);
|
||||
// Test basic operation.
|
||||
fetcher.fetch(collName, id1, Random.id(), expect(null, {_id: id1, x: 1}));
|
||||
fetcher.fetch(collName, "nonexistent!", Random.id(), expect(null, null));
|
||||
|
||||
var future = new Future;
|
||||
var fetched = false;
|
||||
var cacheKey = Random.id();
|
||||
Fiber(function () {
|
||||
var d = fetcher.fetch(collName, id2, cacheKey);
|
||||
fetched = true;
|
||||
future.return(d);
|
||||
}).run();
|
||||
// The fetcher yields:
|
||||
test.isFalse(fetched);
|
||||
var fetched = false;
|
||||
var cacheKey = Random.id();
|
||||
var expected = {_id: id2, y: 2};
|
||||
fetcher.fetch(collName, id2, cacheKey, expect(function (e, d) {
|
||||
fetched = true;
|
||||
test.isFalse(e);
|
||||
test.equal(d, expected);
|
||||
}));
|
||||
// The fetcher yields.
|
||||
test.isFalse(fetched);
|
||||
|
||||
// Now ask for another document with the same cache key. Because a fetch for
|
||||
// that cache key is in flight, we will get the other fetch's document, not
|
||||
// this random document.
|
||||
var doc2a = fetcher.fetch(collName, Random.id(), cacheKey);
|
||||
// Finally, wait for the original fetch to return:
|
||||
var doc2b = future.wait();
|
||||
var expected = {_id: id2, y: 2};
|
||||
test.equal(doc2a, expected);
|
||||
test.equal(doc2b, expected);
|
||||
});
|
||||
// Now ask for another document with the same cache key. Because a fetch for
|
||||
// that cache key is in flight, we will get the other fetch's document, not
|
||||
// this random document.
|
||||
fetcher.fetch(collName, Random.id(), cacheKey, expect(function (e, d) {
|
||||
test.isFalse(e);
|
||||
test.equal(d, expected);
|
||||
}));
|
||||
}
|
||||
]);
|
||||
|
||||
@@ -81,28 +81,31 @@ MongoConnection.prototype._observeChangesWithOplog = function (
|
||||
if (phase !== PHASE.FETCHING)
|
||||
throw new Error("Surprising phase in fetchModifiedDocuments: " + phase);
|
||||
|
||||
var futures = [];
|
||||
currentlyFetching = needToFetch;
|
||||
needToFetch = new IdMap;
|
||||
currentlyFetching.each(function (cacheKey, id) {
|
||||
// Run each until they yield. This implies that needToFetch will not be
|
||||
// updated during this loop.
|
||||
Fiber(function () {
|
||||
var f = new Future;
|
||||
futures.push(f);
|
||||
var doc = self._docFetcher.fetch(cursorDescription.collectionName, id,
|
||||
cacheKey);
|
||||
if (!stopped)
|
||||
handleDoc(id, doc);
|
||||
f.return();
|
||||
}).run();
|
||||
});
|
||||
Future.wait(futures);
|
||||
// Throw if any throw.
|
||||
// XXX this means the observe will now be stalled
|
||||
_.each(futures, function (f) {
|
||||
f.get();
|
||||
});
|
||||
var waiting = 0;
|
||||
var error = null;
|
||||
var fut = new Future;
|
||||
Fiber(function () {
|
||||
currentlyFetching.each(function (cacheKey, id) {
|
||||
// currentlyFetching will not be updated during this loop.
|
||||
waiting++;
|
||||
self._docFetcher.fetch(cursorDescription.collectionName, id, cacheKey, function (err, doc) {
|
||||
if (err) {
|
||||
if (!error)
|
||||
error = err;
|
||||
} else if (!stopped) {
|
||||
handleDoc(id, doc);
|
||||
}
|
||||
waiting--;
|
||||
if (waiting == 0)
|
||||
fut.return();
|
||||
});
|
||||
});
|
||||
}).run();
|
||||
fut.wait();
|
||||
if (error)
|
||||
throw error;
|
||||
currentlyFetching = new IdMap;
|
||||
}
|
||||
beSteady();
|
||||
|
||||
Reference in New Issue
Block a user