From b464ebcd60d761541db4139699dc5a6f2b103994 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Mon, 1 Apr 2013 22:59:56 -0700 Subject: [PATCH] Refactor Mongo driver code to be less callbacky. --- packages/mongo-livedata/mongo_driver.js | 164 ++++++++---------------- 1 file changed, 53 insertions(+), 111 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index e501786cef..4ed5bf83cd 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -123,16 +123,18 @@ _Mongo = function (url) { }); }; -// callback: lambda (err, collection) called when -// collection is ready to go, or on error. -_Mongo.prototype._withCollection = function(collection_name, callback) { +// Returns the Mongo Collection object; may yield. +_Mongo.prototype._getCollection = function(collectionName) { var self = this; + var future = new Future; if (self.db) { - self.db.collection(collection_name, callback); + self.db.collection(collectionName, future.resolver()); } else { - self.collection_queue.push({name: collection_name, callback: callback}); + self.collection_queue.push({name: collectionName, + callback: future.resolver()}); } + return future.wait(); }; // This should be called synchronously with a write, to create a @@ -178,25 +180,17 @@ _Mongo.prototype.insert = function (collection_name, document) { var write = self._maybeBeginWrite(); - var future = new Future; - self._withCollection(collection_name, function (err, collection) { - if (err) { - future.ret(err); - return; - } - + try { + var collection = self._getCollection(collection_name); + var future = new Future; collection.insert(replaceTypes(document, replaceMeteorAtomWithMongo), - {safe: true}, function (err) { - future.ret(err); - }); - }); - - var err = future.wait(); - // XXX do we need this to run this at all on error? - Meteor.refresh({collection: collection_name, id: document._id}); - write.committed(); - if (err) - throw err; + {safe: true}, future.resolver()); + future.wait(); + // XXX We don't have to run this on error, right? + Meteor.refresh({collection: collection_name, id: document._id}); + } finally { + write.committed(); + } }; // Cause queries that may be affected by the selector to poll in this write @@ -229,24 +223,17 @@ _Mongo.prototype.remove = function (collection_name, selector) { var write = self._maybeBeginWrite(); - var future = new Future; - self._withCollection(collection_name, function (err, collection) { - if (err) { - future.ret(err); - return; - } - + try { + var collection = self._getCollection(collection_name); + var future = new Future; collection.remove(replaceTypes(selector, replaceMeteorAtomWithMongo), - {safe: true}, function (err) { - future.ret(err); - }); - }); - - var err = future.wait(); - self._refresh(collection_name, selector); - write.committed(); - if (err) - throw err; + {safe: true}, future.resolver()); + future.wait(); + // XXX We don't have to run this on error, right? + self._refresh(collection_name, selector); + } finally { + write.committed(); + } }; _Mongo.prototype.update = function (collection_name, selector, mod, options) { @@ -266,34 +253,24 @@ _Mongo.prototype.update = function (collection_name, selector, mod, options) { if (!mod || typeof mod !== 'object') throw new Error("Invalid modifier. Modifier must be an object."); - var write = self._maybeBeginWrite(); - if (!options) options = {}; - var future = new Future; - self._withCollection(collection_name, function (err, collection) { - if (err) { - future.ret(err); - return; - } - - var opts = {safe: true}; + var write = self._maybeBeginWrite(); + try { + var collection = self._getCollection(collection_name); + var mongoOpts = {safe: true}; // explictly enumerate options that minimongo supports - if (options.upsert) opts.upsert = true; - if (options.multi) opts.multi = true; - + if (options.upsert) mongoOpts.upsert = true; + if (options.multi) mongoOpts.multi = true; + var future = new Future; collection.update(replaceTypes(selector, replaceMeteorAtomWithMongo), replaceTypes(mod, replaceMeteorAtomWithMongo), - opts, function (err) { - future.ret(err); - }); - }); - - var err = future.wait(); - self._refresh(collection_name, selector); - write.committed(); - if (err) - throw err; + mongoOpts, future.resolver()); + future.wait(); + self._refresh(collection_name, selector); + } finally { + write.committed(); + } }; _Mongo.prototype.find = function (collectionName, selector, options) { @@ -323,21 +300,9 @@ _Mongo.prototype._ensureIndex = function (collectionName, index, options) { // We expect this function to be called at startup, not from within a method, // so we don't interact with the write fence. + var collection = self._getCollection(collectionName); var future = new Future; - self._withCollection(collectionName, function (err, collection) { - if (err) { - future.throw(err); - return; - } - // XXX do we have to bindEnv or Fiber.run this callback? - collection.ensureIndex(index, options, function (err, indexName) { - if (err) { - future.throw(err); - return; - } - future.ret(); - }); - }); + var indexName = collection.ensureIndex(index, options, future.resolver()); future.wait(); }; _Mongo.prototype._dropIndex = function (collectionName, index) { @@ -345,20 +310,9 @@ _Mongo.prototype._dropIndex = function (collectionName, index) { // This function is only used by test code, not within a method, so we don't // interact with the write fence. + var collection = self._getCollection(collectionName); var future = new Future; - self._withCollection(collectionName, function (err, collection) { - if (err) { - future.throw(err); - return; - } - collection.dropIndex(index, function (err) { - if (err) { - future.throw(err); - return; - } - future.ret(); - }); - }); + var indexName = collection.dropIndex(index, future.resolver()); future.wait(); }; @@ -473,29 +427,17 @@ _Mongo.prototype._createSynchronousCursor = function (cursorDescription, useTransform) { var self = this; - var future = new Future; - self._withCollection( - cursorDescription.collectionName, function (err, collection) { - if (err) { - future.ret([false, err]); - return; - } - var options = cursorDescription.options; - var dbCursor = collection.find( - replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo), - options.fields, { - sort: options.sort, - limit: options.limit, - skip: options.skip - }); - future.ret([true, dbCursor]); + var collection = self._getCollection(cursorDescription.collectionName); + var options = cursorDescription.options; + var dbCursor = collection.find( + replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo), + options.fields, { + sort: options.sort, + limit: options.limit, + skip: options.skip }); - var result = future.wait(); - if (!result[0]) - throw result[1]; - - return new SynchronousCursor(result[1], + return new SynchronousCursor(dbCursor, useTransform && cursorDescription.options && cursorDescription.options.transform);