Refactor Mongo driver code to be less callbacky.

This commit is contained in:
David Glasser
2013-04-01 22:59:56 -07:00
parent 0b61d878da
commit b464ebcd60

View File

@@ -123,16 +123,18 @@ _Mongo = function (url) {
});
};
// callback: lambda (err, collection) called when
// collection is ready to go, or on error.
_Mongo.prototype._withCollection = function(collection_name, callback) {
// Returns the Mongo Collection object; may yield.
_Mongo.prototype._getCollection = function(collectionName) {
var self = this;
var future = new Future;
if (self.db) {
self.db.collection(collection_name, callback);
self.db.collection(collectionName, future.resolver());
} else {
self.collection_queue.push({name: collection_name, callback: callback});
self.collection_queue.push({name: collectionName,
callback: future.resolver()});
}
return future.wait();
};
// This should be called synchronously with a write, to create a
@@ -178,25 +180,17 @@ _Mongo.prototype.insert = function (collection_name, document) {
var write = self._maybeBeginWrite();
var future = new Future;
self._withCollection(collection_name, function (err, collection) {
if (err) {
future.ret(err);
return;
}
try {
var collection = self._getCollection(collection_name);
var future = new Future;
collection.insert(replaceTypes(document, replaceMeteorAtomWithMongo),
{safe: true}, function (err) {
future.ret(err);
});
});
var err = future.wait();
// XXX do we need this to run this at all on error?
Meteor.refresh({collection: collection_name, id: document._id});
write.committed();
if (err)
throw err;
{safe: true}, future.resolver());
future.wait();
// XXX We don't have to run this on error, right?
Meteor.refresh({collection: collection_name, id: document._id});
} finally {
write.committed();
}
};
// Cause queries that may be affected by the selector to poll in this write
@@ -229,24 +223,17 @@ _Mongo.prototype.remove = function (collection_name, selector) {
var write = self._maybeBeginWrite();
var future = new Future;
self._withCollection(collection_name, function (err, collection) {
if (err) {
future.ret(err);
return;
}
try {
var collection = self._getCollection(collection_name);
var future = new Future;
collection.remove(replaceTypes(selector, replaceMeteorAtomWithMongo),
{safe: true}, function (err) {
future.ret(err);
});
});
var err = future.wait();
self._refresh(collection_name, selector);
write.committed();
if (err)
throw err;
{safe: true}, future.resolver());
future.wait();
// XXX We don't have to run this on error, right?
self._refresh(collection_name, selector);
} finally {
write.committed();
}
};
_Mongo.prototype.update = function (collection_name, selector, mod, options) {
@@ -266,34 +253,24 @@ _Mongo.prototype.update = function (collection_name, selector, mod, options) {
if (!mod || typeof mod !== 'object')
throw new Error("Invalid modifier. Modifier must be an object.");
var write = self._maybeBeginWrite();
if (!options) options = {};
var future = new Future;
self._withCollection(collection_name, function (err, collection) {
if (err) {
future.ret(err);
return;
}
var opts = {safe: true};
var write = self._maybeBeginWrite();
try {
var collection = self._getCollection(collection_name);
var mongoOpts = {safe: true};
// explictly enumerate options that minimongo supports
if (options.upsert) opts.upsert = true;
if (options.multi) opts.multi = true;
if (options.upsert) mongoOpts.upsert = true;
if (options.multi) mongoOpts.multi = true;
var future = new Future;
collection.update(replaceTypes(selector, replaceMeteorAtomWithMongo),
replaceTypes(mod, replaceMeteorAtomWithMongo),
opts, function (err) {
future.ret(err);
});
});
var err = future.wait();
self._refresh(collection_name, selector);
write.committed();
if (err)
throw err;
mongoOpts, future.resolver());
future.wait();
self._refresh(collection_name, selector);
} finally {
write.committed();
}
};
_Mongo.prototype.find = function (collectionName, selector, options) {
@@ -323,21 +300,9 @@ _Mongo.prototype._ensureIndex = function (collectionName, index, options) {
// We expect this function to be called at startup, not from within a method,
// so we don't interact with the write fence.
var collection = self._getCollection(collectionName);
var future = new Future;
self._withCollection(collectionName, function (err, collection) {
if (err) {
future.throw(err);
return;
}
// XXX do we have to bindEnv or Fiber.run this callback?
collection.ensureIndex(index, options, function (err, indexName) {
if (err) {
future.throw(err);
return;
}
future.ret();
});
});
var indexName = collection.ensureIndex(index, options, future.resolver());
future.wait();
};
_Mongo.prototype._dropIndex = function (collectionName, index) {
@@ -345,20 +310,9 @@ _Mongo.prototype._dropIndex = function (collectionName, index) {
// This function is only used by test code, not within a method, so we don't
// interact with the write fence.
var collection = self._getCollection(collectionName);
var future = new Future;
self._withCollection(collectionName, function (err, collection) {
if (err) {
future.throw(err);
return;
}
collection.dropIndex(index, function (err) {
if (err) {
future.throw(err);
return;
}
future.ret();
});
});
var indexName = collection.dropIndex(index, future.resolver());
future.wait();
};
@@ -473,29 +427,17 @@ _Mongo.prototype._createSynchronousCursor = function (cursorDescription,
useTransform) {
var self = this;
var future = new Future;
self._withCollection(
cursorDescription.collectionName, function (err, collection) {
if (err) {
future.ret([false, err]);
return;
}
var options = cursorDescription.options;
var dbCursor = collection.find(
replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo),
options.fields, {
sort: options.sort,
limit: options.limit,
skip: options.skip
});
future.ret([true, dbCursor]);
var collection = self._getCollection(cursorDescription.collectionName);
var options = cursorDescription.options;
var dbCursor = collection.find(
replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo),
options.fields, {
sort: options.sort,
limit: options.limit,
skip: options.skip
});
var result = future.wait();
if (!result[0])
throw result[1];
return new SynchronousCursor(result[1],
return new SynchronousCursor(dbCursor,
useTransform &&
cursorDescription.options &&
cursorDescription.options.transform);