diff --git a/packages/mongo-livedata/doc_fetcher.js b/packages/mongo-livedata/doc_fetcher.js new file mode 100644 index 0000000000..11051b6129 --- /dev/null +++ b/packages/mongo-livedata/doc_fetcher.js @@ -0,0 +1,60 @@ +var Future = Npm.require('fibers/future'); + +DocFetcher = function (mongoConnection) { + var self = this; + self._mongoConnection = mongoConnection; + // Map from cache key -> [Future] + self._futuresForCacheKey = {}; +}; + +_.extend(DocFetcher.prototype, { + // Fetches document "id" from collectionName, returning it or null if not + // found. Throws other errors. Can yield. + // + // If you make multiple calls to fetch() with the same cacheKey (a string), + // DocFetcher may assume that they all return the same document. (It does + // not check to see if collectionName/id match.) + fetch: function (collectionName, id, cacheKey) { + var self = this; + + check(collectionName, String); + // id is some sort of scalar + check(cacheKey, String); + + // If there's already an in-progress fetch for this cache key, yield until + // it's done and return whatever it returns. + if (_.has(self._futuresForCacheKey, cacheKey)) { + var f = new Future; + self._futuresForCacheKey.push(f); + return f.wait(); + } + + var futures = self._futuresForCacheKey[cacheKey] = []; + + try { + var doc = self._mongoConnection.findOne( + collectionName, {_id: id}) || null; + // Return doc to all fibers that are blocking on us. Note that this array + // can continue to grow during calls to Future.return. + while (!_.isEmpty(futures)) { + // Clone the document so that the various calls to fetch don't return + // objects that are intertwingled with each other. Clone before popping + // the future, so that if clone throws, the error gets thrown to the + // next future instead of that fiber hanging. + var clonedDoc = EJSON.clone(doc); + futures.pop().return(clonedDoc); + } + } catch (e) { + while (!_.isEmpty(futures)) { + futures.pop().throw(e); + } + throw e; + } finally { + // XXX consider keeping the doc around for a period of time before + // removing from the cache + delete self._futuresForCacheKey[cacheKey]; + } + + return doc; + } +}); diff --git a/packages/mongo-livedata/id_map.js b/packages/mongo-livedata/id_map.js index 160ee505c4..57ee9cd8d9 100644 --- a/packages/mongo-livedata/id_map.js +++ b/packages/mongo-livedata/id_map.js @@ -1,36 +1,36 @@ IdMap = function () { var self = this; - self.map = {}; + self._map = {}; }; _.extend(IdMap.prototype, { get: function (id) { var self = this; var key = LocalCollection._idStringify(id); - return self.map[key]; + return self._map[key]; }, set: function (id, value) { var self = this; var key = LocalCollection._idStringify(id); - self.map[key] = value; + self._map[key] = value; }, remove: function (id) { var self = this; var key = LocalCollection._idStringify(id); - delete self.map[key]; + delete self._map[key]; }, has: function (id) { var self = this; var key = LocalCollection._idStringify(id); - return _.has(self.map, key); + return _.has(self._map, key); }, // XXX used? setDefault: function (id, def) { var self = this; var key = LocalCollection._idStringify(id); - if (_.has(self.map, key)) - return self.map[key]; - self.map[key] = def; + if (_.has(self._map, key)) + return self._map[key]; + self._map[key] = def; return def; } }); diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index 3827dbb929..5389cd1157 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -38,7 +38,7 @@ Package.on_use(function (api) { // For tests only. api.export('MongoTest', 'server'); - api.add_files('id_map.js', 'server'); + api.add_files(['id_map.js', 'doc_fetcher.js'], 'server'); api.add_files('mongo_driver.js', 'server'); api.add_files('local_collection_driver.js', ['client', 'server']); api.add_files('remote_collection_driver.js', 'server');