Make LocalCollection and _Mongo writes meteor-style async.

Also fixes a mongo-livedata test to pass the required 2 arguments to update.
This commit is contained in:
Emily Stark
2013-06-28 23:16:38 -07:00
parent ca497d5440
commit ad97d4df44
6 changed files with 73 additions and 34 deletions

View File

@@ -74,7 +74,7 @@ _.extend(Meteor, {
var self = this;
var callback;
var fut;
var newArgs = Array.prototype.slice.call(arguments);
var newArgs = _.toArray(arguments);
var logErr = function (err) {
if (err)

View File

@@ -61,3 +61,15 @@ testAsyncMulti("environment - wrapAsync async", [
test.equal(wrapped4(cb(3)), undefined);
}
]);
Tinytest.addAsync("environment - wrapAsync callback is " +
"in fiber", function (test, onComplete) {
var cb = function (err, result) {
if (Meteor.isServer) {
var Fiber = Npm.require('fibers');
test.isTrue(Fiber.current);
}
onComplete();
};
wrapped1(3, cb);
});

View File

@@ -451,6 +451,8 @@ LocalCollection.prototype.insert = function (doc, callback) {
LocalCollection._recomputeResults(self.queries[qid]);
});
self._observeQueue.drain();
// Defer in case the callback returns on a future; gives the caller time to
// wait on the future.
if (callback) Meteor.defer(function () { callback(null, doc._id); });
return doc._id;
};
@@ -509,6 +511,8 @@ LocalCollection.prototype.remove = function (selector, callback) {
LocalCollection._recomputeResults(query);
});
self._observeQueue.drain();
// Defer in case the callback returns on a future; gives the caller time to
// wait on the future.
if (callback) Meteor.defer(callback);
};
@@ -557,6 +561,8 @@ LocalCollection.prototype.update = function (selector, mod, options, callback) {
qidToOriginalResults[qid]);
});
self._observeQueue.drain();
// Defer in case the callback returns on a future; gives the caller time to
// wait on the future.
if (callback) Meteor.defer(callback);
};

View File

@@ -376,22 +376,14 @@ _.each(["insert", "update", "remove"], function (name) {
throwIfSelectorIsNotId(args[0], name);
}
if (wrappedCallback) {
// asynchronous: on success, callback should return ret
// (document ID for insert, undefined for update and
// remove), not the method's result.
self._connection.apply(self._prefix + name, args, wrappedCallback);
} else {
// synchronous: propagate exception
self._connection.apply(self._prefix + name, args);
}
self._connection.apply(self._prefix + name, args, wrappedCallback);
} else {
// it's my collection. descend into the collection object
// and propagate any exception.
args.push(wrappedCallback);
try {
Meteor._wrapAsync(self._collection[name]).apply(self._collection, args);
self._collection[name].apply(self._collection, args);
} catch (e) {
if (callback) {
callback(e);
@@ -575,8 +567,8 @@ Meteor.Collection.prototype._defineMutationMethods = function() {
self[validatedMethodName].apply(self, argsWithUserId);
} else if (self._isInsecure()) {
// In insecure mode, allow any mutation (with a simple selector).
Meteor._wrapAsync(self._collection[method]).
apply(self._collection, _.toArray(arguments));
self._collection[method].apply(self._collection,
_.toArray(arguments));
} else {
// In secure mode, if we haven't called allow or deny, then nothing
// is permitted.

View File

@@ -176,11 +176,10 @@ _Mongo.prototype._maybeBeginWrite = function () {
//////////// Public API //////////
// The write methods block until the database has confirmed the write
// (it may not be replicated or stable on disk, but one server has
// confirmed it.) (In the future we might have an option to turn this
// off, ie, to enqueue the request on the wire and return
// immediately.) They return nothing on success, and raise an
// The write methods block until the database has confirmed the write (it may
// not be replicated or stable on disk, but one server has confirmed it) if no
// callback is provided. If a callback is provided, then they call the callback
// when the write is confirmed. They return nothing on success, and raise an
// exception on failure.
//
// After making a write (with insert, update, remove), observers are
@@ -202,17 +201,22 @@ var writeCallback = function (write, refresh, callback) {
write.committed();
if (callback)
callback(err, result);
else
throw err;
}, function (err) {
Meteor._debug("Error in Mongo write:", err.stack);
});
};
_Mongo.prototype.insert = function (collection_name, document, callback) {
_Mongo.prototype._insert = function (collection_name, document, callback) {
var self = this;
if (collection_name === "___meteor_failure_test_collection") {
var e = new Error("Failure test");
e.expected = true;
throw e;
if (callback)
return callback(e);
else
throw e;
}
var write = self._maybeBeginWrite();
@@ -226,6 +230,7 @@ _Mongo.prototype.insert = function (collection_name, document, callback) {
{safe: true}, callback);
} catch (e) {
write.committed();
throw e;
}
};
@@ -248,13 +253,16 @@ _Mongo.prototype._refresh = function (collectionName, selector) {
}
};
_Mongo.prototype.remove = function (collection_name, selector, callback) {
_Mongo.prototype._remove = function (collection_name, selector, callback) {
var self = this;
if (collection_name === "___meteor_failure_test_collection") {
var e = new Error("Failure test");
e.expected = true;
throw e;
if (callback)
return callback(e);
else
throw e;
}
var write = self._maybeBeginWrite();
@@ -265,29 +273,32 @@ _Mongo.prototype.remove = function (collection_name, selector, callback) {
try {
var collection = self._getCollection(collection_name);
var future = new Future;
collection.remove(replaceTypes(selector, replaceMeteorAtomWithMongo),
{safe: true}, callback);
} catch (e) {
write.committed();
throw e;
}
};
_Mongo.prototype.update = function (collection_name, selector, mod,
_Mongo.prototype._update = function (collection_name, selector, mod,
options, callback) {
var self = this;
if (collection_name === "___meteor_failure_test_collection") {
var e = new Error("Failure test");
e.expected = true;
throw e;
}
if (! callback && options instanceof Function) {
callback = options;
options = null;
}
if (collection_name === "___meteor_failure_test_collection") {
var e = new Error("Failure test");
e.expected = true;
if (callback)
return callback(e);
else
throw e;
}
// explicit safety check. null and undefined can crash the mongo
// driver. Although the node driver and minimongo do 'support'
// non-object modifier in that they don't crash, they are not
@@ -314,9 +325,17 @@ _Mongo.prototype.update = function (collection_name, selector, mod,
mongoOpts, callback);
} catch (e) {
write.committed();
throw e;
}
};
_.each(["insert", "update", "remove"], function (method) {
_Mongo.prototype[method] = function (/* arguments */) {
var self = this;
return Meteor._wrapAsync(self["_" + method]).apply(self, arguments);
};
});
_Mongo.prototype.find = function (collectionName, selector, options) {
var self = this;

View File

@@ -63,20 +63,30 @@ testAsyncMulti("mongo-livedata - database error reporting. " + idGeneration, [
_.each(["insert", "remove", "update"], function (op) {
var arg = (op === "insert" ? {} : 'bla');
var arg2 = {};
var callOp = function (callback) {
if (op === "update") {
ftc[op](arg, arg2, callback);
} else {
ftc[op](arg, callback);
}
};
if (Meteor.isServer) {
test.throws(function () {
ftc[op](arg);
callOp();
});
ftc[op](arg, expect(exception));
callOp(expect(exception));
}
if (Meteor.isClient) {
ftc[op](arg, expect(exception));
callOp(expect(exception));
// This would log to console in normal operation.
Meteor._suppress_log(1);
ftc[op](arg);
callOp();
}
});
}