Method calls can now be forced to wait

Meteor.apply and Meteor._LivedataConnection.apply now receive an options
parameter which can be used to set the `wait` flag:

(Client only) If true, don't send any subsequent method calls until this
one is completed. Only run the callback for this method once all previous
method calls have completed.
This commit is contained in:
Avital Oliver
2012-05-23 10:42:15 -07:00
committed by Nick Martin
parent b7c393a9fe
commit 55b6bb0dea
9 changed files with 342 additions and 110 deletions

View File

@@ -1157,7 +1157,7 @@ Matches a particular type of event, such as 'click'.
{{#dtdd "<em>eventtype selector</em>"}}
Matches a particular type of event, but only when it appears on
an element that matches a certain CSS selector.
an element that matches a certain CSS selector.
{{/dtdd}}
{{#dtdd "<em>event1, event2</em>"}}

View File

@@ -209,7 +209,7 @@ Template.api.meteor_call = {
Template.api.meteor_apply = {
id: "meteor_apply",
name: "Meteor.apply(name, params [, asyncCallback])",
name: "Meteor.apply(name, params [, options] [, asyncCallback])",
locus: "Anywhere",
descr: ["Invoke a method passing an array of arguments."],
args: [
@@ -222,6 +222,12 @@ Template.api.meteor_apply = {
{name: "asyncCallback",
type: "Function",
descr: "Optional callback. If passed, the method runs asynchronously, instead of synchronously, and calls asyncCallback passing either the error or the result."}
],
options: [
{name: "wait",
type: "Boolean",
descr: "(Client only) If true, don't send any subsequent method calls until this one is completed. "
+ "Only run the callback for this method once all previous method calls have been completed."}
]
};
@@ -755,7 +761,7 @@ Template.api.equals = {
Template.api.httpcall = {
id: "meteor_http_call",
name: "Meteor.http.call(method, url, [options], [asyncCallback])",
name: "Meteor.http.call(method, url [, options] [, asyncCallback])",
locus: "Anywhere",
descr: ["Perform an outbound HTTP request."],
args: [

View File

@@ -4,11 +4,12 @@ Meteor._MethodInvocation = function (is_simulation, unblock) {
var self = this;
// true if we're running not the actual method, but a stub (that is,
// if we're on the client and presently running a simulation of a
// server-side method for latency compensation purposes). never true
// except in a client such as a browser, since there's no point in
// running stubs unless you have a zero-latency connection to the
// user.
// if we're on a client (which may be a browser, or in the future a
// server connecting to another server) and presently running a
// simulation of a server-side method for latency compensation
// purposes). not current true except in a client such as a browser,
// since there's usually no point in running stubs unless you have a
// zero-latency connection to the user.
this.is_simulation = is_simulation;
// call this function to allow other method invocations (from the

View File

@@ -31,6 +31,13 @@ Meteor._LivedataConnection = function (url, restart_on_update) {
self.next_method_id = 1;
// waiting for results of method
self.outstanding_methods = []; // each item has keys: msg, callback
// the sole outstanding method that needs to be waited on, or null
self.outstanding_wait_method = null; // same keys as outstanding_methods
// stores response from `outstanding_wait_method` while we wait for
// previous method calls to complete
self.outstanding_wait_method_response = null;
// methods blocked on outstanding_wait_method being completed.
self.blocked_methods = []; // each item has keys: msg, callback, wait
// waiting for data from method
self.unsatisfied_methods = {}; // map from method_id -> true
// sub was ready, is no longer (due to reconnect)
@@ -51,31 +58,16 @@ Meteor._LivedataConnection = function (url, restart_on_update) {
// just for testing
self.quiesce_callbacks = [];
// Setup auto-reload persistence.
var reload_key = "Server-" + url;
var reload_data = Meteor._reload.migration_data(reload_key);
if (typeof reload_data === "object") {
if (typeof reload_data.next_method_id === "number")
self.next_method_id = reload_data.next_method_id;
if (typeof reload_data.outstanding_methods === "object")
self.outstanding_methods = reload_data.outstanding_methods;
// pending messages will be transmitted on initial stream 'reset'
}
Meteor._reload.on_migrate(reload_key, function (retry) {
if (!self._readyToMigrate()) {
if (self.retry_migrate)
throw new Error("Two migrations in progress?");
self.retry_migrate = retry;
return false;
} else {
return [true];
}
var methods = _.map(self.outstanding_methods, function (m) {
return {msg: m.msg};
});
return [true, {next_method_id: self.next_method_id,
outstanding_methods: methods}];
});
// Setup stream (if not overriden above)
@@ -129,10 +121,7 @@ Meteor._LivedataConnection = function (url, restart_on_update) {
// immediately before disconnection.. do we need to add app-level
// acking of data messages?
// Send pending methods.
_.each(self.outstanding_methods, function (m) {
self.stream.send(JSON.stringify(m.msg));
});
self._sendOutstandingMessages();
// add new subscriptions at the end. this way they take effect after
// the handlers and we don't see flicker.
@@ -255,11 +244,23 @@ _.extend(Meteor._LivedataConnection.prototype, {
return this.apply(name, args, callback);
},
apply: function (name, args, callback) {
// @param options {Optional Object}
// wait: Boolean - Should we block subsequent method calls on this
// method's result having been received?
// (does not affect methods called from within this method)
// @param callback {Optional Function}
apply: function (name, args, options, callback) {
var self = this;
var enclosing = Meteor._CurrentInvocation.get();
if (callback)
// We were passed 3 arguments. They may be either (name, args, options)
// or (name, args, callback)
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
if (callback) {
// XXX would it be better form to do the binding in stream.on,
// or caller, instead of here?
callback = Meteor.bindEnvironment(callback, function (e) {
@@ -267,8 +268,8 @@ _.extend(Meteor._LivedataConnection.prototype, {
Meteor._debug("Exception while delivering result of invoking '" +
name + "'", e.stack);
});
}
var is_simulation = enclosing && enclosing.is_simulation;
if (Meteor.is_client) {
// If on a client, run the stub, if we have one. The stub is
// supposed to make some temporary writes to the database to
@@ -296,10 +297,10 @@ _.extend(Meteor._LivedataConnection.prototype, {
}
// If we're in a simulation, stop and return the result we have,
// rather than going on to do an RPC. This can only happen on
// the client (since we only bother with stubs and simulations
// on the client.) If there was not stub, we'll end up returning
// undefined.
// rather than going on to do an RPC. If there was no stub,
// we'll end up returning undefined.
var enclosing = Meteor._CurrentInvocation.get();
var is_simulation = enclosing && enclosing.is_simulation;
if (is_simulation) {
if (callback) {
callback(exception, ret);
@@ -350,9 +351,30 @@ _.extend(Meteor._LivedataConnection.prototype, {
params: args,
id: '' + (self.next_method_id++)
};
self.outstanding_methods.push({msg: msg, callback: callback});
if (self.outstanding_wait_method) {
self.blocked_methods.push({
msg: msg,
callback: callback,
wait: options.wait
});
} else {
var method_object = {
msg: msg,
callback: callback
};
if (options.wait)
self.outstanding_wait_method = method_object;
else
self.outstanding_methods.push(method_object);
self.stream.send(JSON.stringify(msg));
}
// Even if we are waiting on other method calls mark this method
// as unsatisfied so that the user never ends up seeing
// intermediate versions of the server's datastream
self.unsatisfied_methods[msg.id] = true;
self.stream.send(JSON.stringify(msg));
// If we're using the default callback on the server,
// synchronously return the result from the remote host.
@@ -535,36 +557,73 @@ _.extend(Meteor._LivedataConnection.prototype, {
},
_livedata_result: function (msg) {
var self = this;
// id, result or error. error has error (code), reason, details
var self = this;
// find the outstanding request
// should be O(1) in nearly all realistic use cases
for (var i = 0; i < self.outstanding_methods.length; i++) {
var m = self.outstanding_methods[i];
if (m.msg.id === msg.id)
break;
var m;
if (self.outstanding_wait_method &&
self.outstanding_wait_method.msg.id === msg.id) {
m = self.outstanding_wait_method;
self.outstanding_wait_method_response = msg;
} else {
for (var i = 0; i < self.outstanding_methods.length; i++) {
m = self.outstanding_methods[i];
if (m.msg.id === msg.id)
break;
}
// remove
self.outstanding_methods.splice(i, 1);
}
if (!m) {
// XXX write a better error
Meteor._debug("Can't interpret method response message");
return;
}
// remove
self.outstanding_methods.splice(i, 1);
if (self.outstanding_wait_method) {
// Wait until we have completed all outstanding methods.
if (self.outstanding_methods.length === 0 &&
self.outstanding_wait_method_response) {
// Fire necessary outstanding method callbacks, making sure we
// only fire the outstanding wait method after all other outstanding
// methods' callbacks were fired
if (m === self.outstanding_wait_method) {
self._deliverMethodResponse(self.outstanding_wait_method,
self.outstanding_wait_method_response /*(=== msg)*/);
} else {
self._deliverMethodResponse(m, msg);
self._deliverMethodResponse(self.outstanding_wait_method,
self.outstanding_wait_method_response /*(!== msg)*/);
}
// deliver result
if (m.callback) {
// callback will have already been bindEnvironment'd by apply(),
// so no need to catch exceptions
if ('error' in msg)
m.callback(new Meteor.Error(msg.error.error, msg.error.reason,
msg.error.details));
else
// msg.result may be undefined if the method didn't return a
// value
m.callback(undefined, msg.result);
self.outstanding_wait_method = null;
self.outstanding_wait_method_response = null;
// Find first blocked method with wait: true
var i;
for (i = 0; i < self.blocked_methods.length; i++)
if (self.blocked_methods[i].wait)
break;
// Move as many blocked methods as we can into outstanding_methods
// and outstanding_wait_method if needed
self.outstanding_methods = _.first(self.blocked_methods, i);
if (i !== self.blocked_methods.length) {
self.outstanding_wait_method = self.blocked_methods[i];
self.blocked_methods = _.rest(self.blocked_methods, i+1);
}
self._sendOutstandingMessages();
} else {
if (m !== self.outstanding_wait_method)
self._deliverMethodResponse(m, msg);
}
} else {
self._deliverMethodResponse(m, msg);
}
// if we were blocking a migration, see if it's now possible to
@@ -575,21 +634,42 @@ _.extend(Meteor._LivedataConnection.prototype, {
}
},
// @param method {Object} as in `outstanding_methods`
// @param response {Object{id, result | error}}
_deliverMethodResponse: function(method, response) {
// callback will have already been bindEnvironment'd by apply(),
// so no need to catch exceptions
if ('error' in response) {
method.callback(new Meteor.Error(
response.error.error, response.error.reason,
response.error.details));
} else {
// msg.result may be undefined if the method didn't return a
// value
method.callback(undefined, response.result);
}
},
_sendOutstandingMessages: function() {
var self = this;
_.each(self.outstanding_methods, function (m) {
self.stream.send(JSON.stringify(m.msg));
});
if (self.outstanding_wait_method) {
self.stream.send(JSON.stringify(self.outstanding_wait_method.msg));
}
},
_livedata_error: function (msg) {
Meteor._debug("Received error from server: ", msg.reason);
if (msg.offending_message)
Meteor._debug("For: ", msg.offending_message);
},
// true if we're OK for a migration to happen
_readyToMigrate: function () {
var self = this;
return _.all(self.outstanding_methods, function (m) {
// Callbacks can't be preserved across migrations, so we can't
// migrate as long as there is an outstanding requests with a
// callback.
return !m.callback;
});
_readyToMigrate: function() {
return self.outstanding_methods.length === 0 &&
!self.outstanding_wait_method &&
self.blocking_methods.length === 0;
}
});

View File

@@ -12,12 +12,7 @@ var test_got_message = function (test, stream, expected) {
test.equal(got, expected);
};
var SESSION_ID = '17';
Tinytest.add("livedata stub - receive data", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
var startAndConnect = function(test, stream) {
stream.reset(); // initial connection start.
test_got_message(test, stream, {msg: 'connect'});
@@ -25,6 +20,15 @@ Tinytest.add("livedata stub - receive data", function (test) {
stream.receive({msg: 'connected', session: SESSION_ID});
test.length(stream.sent, 0);
};
var SESSION_ID = '17';
Tinytest.add("livedata stub - receive data", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
startAndConnect(test, stream);
// data comes in for unknown collection.
var coll_name = Meteor.uuid();
@@ -46,19 +50,11 @@ Tinytest.add("livedata stub - receive data", function (test) {
test.isUndefined(conn.queued[coll_name]);
});
Tinytest.add("livedata stub - subscribe", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
stream.reset(); // initial connection start.
test_got_message(test, stream, {msg: 'connect'});
test.length(stream.sent, 0);
stream.receive({msg: 'connected', session: SESSION_ID});
test.length(stream.sent, 0);
startAndConnect(test, stream);
// subscribe
var callback_fired = false;
@@ -82,11 +78,7 @@ Tinytest.add("livedata stub - this", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
stream.reset(); // initial connection start.
test_got_message(test, stream, {msg: 'connect'});
stream.receive({msg: 'connected', session: SESSION_ID});
test.length(stream.sent, 0);
startAndConnect(test, stream);
conn.methods({test_this: function() {
test.isTrue(this.is_simulation);
@@ -112,13 +104,7 @@ Tinytest.add("livedata stub - methods", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
stream.reset(); // initial connection start.
test_got_message(test, stream, {msg: 'connect'});
test.length(stream.sent, 0);
stream.receive({msg: 'connected', session: SESSION_ID});
test.length(stream.sent, 0);
startAndConnect(test, stream);
var coll_name = Meteor.uuid();
var coll = new Meteor.Collection(coll_name, conn);
@@ -211,13 +197,7 @@ Tinytest.add("livedata stub - sub methods", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
stream.reset(); // initial connection start.
test_got_message(test, stream, {msg: 'connect'});
test.length(stream.sent, 0);
stream.receive({msg: 'connected', session: SESSION_ID});
test.length(stream.sent, 0);
startAndConnect(test, stream);
var coll_name = Meteor.uuid();
var coll = new Meteor.Collection(coll_name, conn);
@@ -287,13 +267,7 @@ Tinytest.add("livedata stub - reconnect", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
stream.reset(); // initial connection start.
test_got_message(test, stream, {msg: 'connect'});
test.length(stream.sent, 0);
stream.receive({msg: 'connected', session: SESSION_ID});
test.length(stream.sent, 0);
startAndConnect(test, stream);
var coll_name = Meteor.uuid();
var coll = new Meteor.Collection(coll_name, conn);
@@ -342,9 +316,12 @@ Tinytest.add("livedata stub - reconnect", function (test) {
conn.call('do_something', function () {
method_callback_fired = true;
});
conn.apply('do_something', [], {wait: true});
test.isFalse(method_callback_fired);
var method_message = JSON.parse(stream.sent.shift());
var wait_method_message = JSON.parse(stream.sent.shift());
test.equal(method_message, {msg: 'method', method: 'do_something',
params: [], id:method_message.id});
@@ -354,13 +331,13 @@ Tinytest.add("livedata stub - reconnect", function (test) {
test.equal(coll.find({c:3}).count(), 0);
test.equal(counts, {added: 1, removed: 0, changed: 1, moved: 0});
// stream reset. reconnect!
// we send a connect, our pending messages, and our subs.
stream.reset();
test_got_message(test, stream, {msg: 'connect', session: SESSION_ID});
test_got_message(test, stream, method_message);
test_got_message(test, stream, wait_method_message);
test_got_message(test, stream, sub_message);
// reconnect with different session id
@@ -376,10 +353,12 @@ Tinytest.add("livedata stub - reconnect", function (test) {
test.equal(counts, {added: 1, removed: 0, changed: 1, moved: 0});
// satisfy and return method callback
stream.receive({msg: 'data', methods: [method_message.id]});
stream.receive({msg: 'data',
methods: [method_message.id, wait_method_message.id]});
test.isFalse(method_callback_fired);
stream.receive({msg: 'result', id:method_message.id, result:"bupkis"});
stream.receive({msg: 'result', id:wait_method_message.id, result:"bupkis"});
test.isTrue(method_callback_fired);
// still no update.
@@ -406,6 +385,109 @@ Tinytest.add("livedata connection - reactive userId", function (test) {
test.equal(conn.userId(), 1337);
});
Tinytest.add("livedata connection - two wait methods with reponse in order", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
startAndConnect(test, stream);
// setup method
conn.methods({do_something: function (x) {}});
var responses = [];
conn.apply('do_something', ['one!'], function() { responses.push('one'); });
var one_message = JSON.parse(stream.sent.shift());
test.equal(one_message.params, ['one!']);
conn.apply('do_something', ['two!'], {wait: true}, function() {
responses.push('two');
});
var two_message = JSON.parse(stream.sent.shift());
test.equal(two_message.params, ['two!']);
test.equal(responses, []);
conn.apply('do_something', ['three!'], function() {
responses.push('three');
});
conn.apply('do_something', ['four!'], {wait: true}, function() {
responses.push('four');
});
conn.apply('do_something', ['five!'], function() { responses.push('five'); });
// Verify that we did not send "three!" since we're waiting for
// "one!" and "two!" to send their response back
test.equal(stream.sent.length, 0);
stream.receive({msg: 'result', id: one_message.id});
test.equal(responses, ['one']);
test.equal(stream.sent.length, 0);
stream.receive({msg: 'result', id: two_message.id});
test.equal(responses, ['one', 'two']);
// Verify that we just sent "three!" and "four!" now that we got
// responses for "one!" and "two!"
test.equal(stream.sent.length, 2);
var three_message = JSON.parse(stream.sent.shift());
test.equal(three_message.params, ['three!']);
var four_message = JSON.parse(stream.sent.shift());
test.equal(four_message.params, ['four!']);
stream.receive({msg: 'result', id: three_message.id});
test.equal(responses, ['one', 'two', 'three']);
test.equal(stream.sent.length, 0);
stream.receive({msg: 'result', id: four_message.id});
test.equal(responses, ['one', 'two', 'three', 'four']);
// Verify that we just sent "five!"
test.equal(stream.sent.length, 1);
var five_message = JSON.parse(stream.sent.shift());
test.equal(five_message.params, ['five!']);
});
Tinytest.add("livedata connection - one wait method with response out of order", function (test) {
var stream = new Meteor._StubStream();
var conn = new Meteor._LivedataConnection(stream);
startAndConnect(test, stream);
// setup method
conn.methods({do_something: function (x) {}});
var responses = [];
conn.apply('do_something', ['one!'], function() { responses.push('one'); });
var one_message = JSON.parse(stream.sent.shift());
test.equal(one_message.params, ['one!']);
conn.apply('do_something', ['two!'], {wait: true}, function() {
responses.push('two');
});
var two_message = JSON.parse(stream.sent.shift());
test.equal(two_message.params, ['two!']);
test.equal(responses, []);
conn.apply('do_something', ['three!']);
// Verify that we did not send "three!" since we're waiting for
// "one!" and "two!" to send their response back
test.equal(stream.sent.length, 0);
stream.receive({msg: 'result', id: two_message.id});
test.equal(responses, []);
test.equal(stream.sent.length, 0);
stream.receive({msg: 'result', id: one_message.id});
test.equal(responses, ['one', 'two']); // Namely not two, one
// Verify that we just sent "three!" now that we got responses for
// "one!" and "two!"
test.equal(stream.sent.length, 1);
var three_message = JSON.parse(stream.sent.shift());
test.equal(three_message.params, ['three!']);
stream.receive({msg: 'result', id: three_message.id});
test.equal(stream.sent.length, 0);
});
// XXX also test:
// - reconnect, with session resume.
// - restart on update flag

View File

@@ -755,9 +755,19 @@ _.extend(Meteor._LivedataServer.prototype, {
return this.apply(name, args, callback);
},
apply: function (name, args, callback) {
// @param options {Optional Object}
// @param callback {Optional Function}
apply: function (name, args, options, callback) {
var self = this;
// We were passed 3 arguments. They may be either (name, args, options)
// or (name, args, callback)
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
if (callback)
// It's not really necessary to do this, since we immediately
// run the callback in this fiber before returning, but we do it

View File

@@ -22,6 +22,36 @@ Meteor.methods({
}
});
// Methods to help test applying methods with `wait: true`: delayedTrue
// returns true 500ms after being run unless makeDelayedTrueImmediatelyReturnFalse
// was run in the meanwhile
if (Meteor.is_server) {
var delayed_true_future;
var delayed_true_times;
Meteor.methods({
delayedTrue: function() {
delayed_true_future = new Future();
delayed_true_times = Meteor.setTimeout(function() {
delayed_true_future['return'](true);
delayed_true_future = null;
delayed_true_times = null;
}, 500);
this.unblock();
return delayed_true_future.wait();
},
makeDelayedTrueImmediatelyReturnFalse: function() {
if (!delayed_true_future)
return; // since delayedTrue's timeout had already run
if (delayed_true_times) clearTimeout(delayed_true_times);
delayed_true_future['return'](false);
delayed_true_future = null;
delayed_true_times = null;
}
});
}
/*****/
Ledger = new Meteor.Collection("ledger");

View File

@@ -105,6 +105,26 @@ testAsyncMulti("livedata - basic method invocation", [
expect(undefined, [12, {x: 13}])), undefined);
},
// test that `wait: false` is respected
function (test, expect) {
if (Meteor.is_client) {
Meteor.apply("delayedTrue", [], {wait: false}, expect(function(err, res) {
test.equal(res, false);
}));
Meteor.apply("makeDelayedTrueImmediatelyReturnFalse", []);
}
},
// test that `wait: true` is respected
function(test, expect) {
if (Meteor.is_client) {
Meteor.apply("delayedTrue", [], {wait: true}, expect(function(err, res) {
test.equal(res, true);
}));
Meteor.apply("makeDelayedTrueImmediatelyReturnFalse", []);
}
},
function (test, expect) {
// No callback

View File

@@ -27,6 +27,9 @@
});
Meteor.bindEnvironment = function (func, onException, _this) {
// needed in order to be able to create closures inside func and
// have the closed variables not change back to their original
// values
var boundValues = _.clone(currentValues);
if (!onException)