Convert livedata and stream from closures to objects

This commit is contained in:
Geoff Schmidt
2012-02-08 20:49:48 -08:00
parent 6c7a31b1fb
commit 5f27e47c9f
4 changed files with 727 additions and 634 deletions

View File

@@ -1,44 +1,144 @@
if (typeof Meteor === "undefined") Meteor = {};
(function () {
var collections = {}; // name -> Collection-type object
// list of subscription tokens outstanding during a
// captureDependencies run. only set when we're doing a run. The fact
// that this is a singleton means we can't do recursive
// Meteor.subscriptions(). But who wants that? What does that even mean?
// XXX namespacing
Meteor._capture_subs = null;
var subs = new Collection();
Meteor.Server = function (url) {
var self = this;
self.url = url;
self.collections = {}; // name -> Collection-type object
self.subs = new Collection;
// keyed by subs._id. value is unset or an array. if set, sub is not
// yet ready.
var sub_ready_callbacks = {};
// list of subscription tokens outstanding during a
// captureDependencies run. only set when we're doing a run. The fact
// that this is a singleton means we can't do recursive
// Meteor.subscriptions(). But who wants that? What does that even mean?
var capture_subs;
self.sub_ready_callbacks = {};
self.stream = new Meteor._Stream(self.url);
// all socket.io traffic is framed as a "livedata" message.
Meteor._stream.on('livedata', function (msg) {
self.stream.on('livedata', function (msg) {
if (typeof(msg) !== 'object' || !msg.msg) {
Meteor._debug("discarding invalid livedata message", msg);
return;
}
if (msg.msg === 'connected')
livedata_connected(msg);
self._livedata_connected(msg);
else if (msg.msg === 'data')
livedata_data(msg);
self._livedata_data(msg);
else if (msg.msg === 'nosub')
livedata_nosub(msg);
self._livedata_nosub(msg);
else if (msg.msg === 'result')
livedata_result(msg);
self._livedata_result(msg);
else
Meteor._debug("discarding unknown livedata message type", msg);
});
var livedata_connected = function (msg) {
// Meteor._debug("CONNECTED", msg);
};
self.stream.reset(function (msg_list) {
// remove all 'livedata' message except 'method'
msg_list = _.filter(msg_list, function (elem) {
return (elem && (elem[0] !== "livedata" ||
(elem[1] && elem[1].msg === "method")));
});
// Send a connect message at the beginning of the stream.
// NOTE: reset is called even on the first connection, so this is
// the only place we send this message.
msg_list.unshift(['livedata', {msg: 'connect'}]);
// add new subscriptions at the end. this way they take effect after
// the handlers and we don't see flicker.
self.subs.find().forEach(function (sub) {
msg_list.push(
['livedata',
{msg: 'sub', id: sub._id, name: sub.name, params: sub.args}]);
});
// clear out the local database!
_.each(self.collections, function (col) {
col._collection.remove({});
});
return msg_list;
});
// we never terminate the observe(), since there is no way to
// destroy a Server.. but this shouldn't matter, since we're the
// only one that holds a reference to the self.subs collection
self.subs_token = self.subs.find({}).observe({
added: function (sub) {
self.stream.emit('livedata', {
msg: 'sub', id: sub._id, name: sub.name, params: sub.args});
},
changed: function (sub) {
if (sub.count <= 0) {
// minimongo not re-entrant.
_.defer(function () { self.subs.remove({_id: sub._id}); });
}
},
removed: function (id) {
self.stream.emit('livedata', {msg: 'unsub', id: id});
}
});
};
_.extend(Meteor.Server.prototype, {
subscribe: function (name, args, callback) {
var self = this;
var id;
var existing = self.subs.find({name: name, args: args}, {reactive: false}).fetch();
if (existing && existing[0]) {
// already subbed, inc count.
id = existing[0]._id;
self.subs.update({_id: id}, {$inc: {count: 1}});
if (callback) {
if (self.sub_ready_callbacks[id])
self.sub_ready_callbacks[id].push(callback);
else
callback(); // XXX maybe _.defer?
}
} else {
// new sub, add object.
// generate our own id so we can know it w/ a find afterwards.
id = Collection.uuid();
self.subs.insert({_id: id, name: name, args: args, count: 1});
self.sub_ready_callbacks[id] = [];
if (callback)
self.sub_ready_callbacks[id].push(callback);
}
// return an object with a stop method.
var token = {stop: function () {
if (!id) return; // must have an id (local from above).
// just update the database. observe takes care of the rest.
self.subs.update({_id: id}, {$inc: {count: -1}});
}};
if (Meteor._capture_subs)
Meteor._capture_subs.push(token);
return token;
},
_livedata_connected: function (msg) {
var self = this;
// Meteor._debug("CONNECTED", msg);
},
_livedata_data: function (msg) {
var self = this;
var livedata_data = function (msg) {
if (msg.collection && msg.id) {
var meteor_coll = collections[msg.collection];
var meteor_coll = self.collections[msg.collection];
if (!meteor_coll) {
Meteor._debug(
@@ -72,235 +172,186 @@ if (typeof Meteor === "undefined") Meteor = {};
if (msg.subs) {
_.each(msg.subs, function (id) {
var arr = sub_ready_callbacks[id];
var arr = self.sub_ready_callbacks[id];
if (arr) _.each(arr, function (c) { c(); });
delete sub_ready_callbacks[id];
delete self.sub_ready_callbacks[id];
});
}
if (msg.methods) {
// Meteor._debug("METHODCOMPLETE", msg.methods);
}
};
},
var livedata_nosub = function (msg) {
_livedata_nosub: function (msg) {
var self = this;
// Meteor._debug("NOSUB", msg);
};
},
var livedata_result = function (msg) {
_livedata_result: function (msg) {
var self = this;
// Meteor._debug("RESULT", msg);
};
}
});
Meteor._stream.reset(function (msg_list) {
// remove all 'livedata' message except 'method'
msg_list = _.filter(msg_list, function (elem) {
return (elem && (elem[0] !== "livedata" ||
(elem[1] && elem[1].msg === "method")));
});
Meteor._Collection = function (name, server) {
var self = this;
// Send a connect message at the beginning of the stream.
// NOTE: reset is called even on the first connection, so this is
// the only place we send this message.
msg_list.unshift(['livedata', {msg: 'connect'}]);
if (name && (name in server.collections))
// maybe should just return server.collections[name]?
throw new Error("There is already a remote collection '" + name + "'");
// add new subscriptions at the end. this way they take effect after
// the handlers and we don't see flicker.
subs.find().forEach(function (sub) {
msg_list.push(
['livedata',
{msg: 'sub', id: sub._id, name: sub.name, params: sub.args}]);
});
self._name = name;
self._collection = new Collection;
self._server = server;
// clear out the local database!
_.each(collections, function (col) {
col._collection.remove({});
});
if (name)
server.collections[name] = self;
};
return msg_list;
});
_.extend(Meteor._Collection.prototype, {
find: function (/* selector, options */) {
var self = this;
// Collection.find() (return all docs) behaves differently
// from Collection.find(undefined) (return 0 docs). so be
// careful about preserving the length of arguments when
// descending into minimongo.
return self._collection.find.apply(self._collection, Array.prototype.slice.call(arguments));
},
var subsToken = subs.find({}).observe({
added: function (sub) {
Meteor._stream.emit('livedata', {
msg: 'sub', id: sub._id, name: sub.name, params: sub.args});
findOne: function (/* selector, options */) {
var self = this;
// as above
return self._collection.findOne.apply(self._collection, Array.prototype.slice.call(arguments));
},
insert: function (obj) {
var self = this;
// Generate an id for the object.
// XXX mutates the object passed in. that is not cool.
if (obj._id)
Meteor._debug("WARNING: trying to insert object w/ _id set");
var _id = Collection.uuid();
obj._id = _id;
if (self._name)
self._server.stream.emit('livedata', {
msg: 'method',
method: '/' + self._name + '/insert',
params: [obj], id: Meteor.uuid()});
self._collection.insert(obj);
return obj;
},
changed: function (sub) {
if (sub.count <= 0) {
// minimongo not re-entrant.
_.defer(function () { subs.remove({_id: sub._id}); });
}
},
removed: function (id) {
Meteor._stream.emit('livedata', {msg: 'unsub', id: id});
}
});
// XXX let it take a second argument, the URL of the domain that
// hosts the collection :)
Meteor.Collection = function (name) {
if (name && (name in collections))
// maybe should just return collections[name]?
throw new Error("There is already a remote collection '" + name + "'");
update: function (selector, mutator, options) {
var self = this;
if (self._name)
self._server.stream.emit('livedata', {
msg: 'method',
method: '/' + self._name + '/update',
params: [selector, mutator, options],
id: Meteor.uuid()});
self._collection.update(selector, mutator, options);
},
var ret = {
_name: name,
_collection: new Collection(),
remove: function (selector) {
var self = this;
find: function (/* selector, options */) {
// Collection.find() (return all docs) behaves differently
// from Collection.find(undefined) (return 0 docs). so be
// careful about preserving the length of arguments when
// descending into minimongo.
return this._collection.find.apply(this._collection, Array.prototype.slice.call(arguments));
},
if (arguments.length === 0)
selector = {};
findOne: function (/* selector, options */) {
// as above
return this._collection.findOne.apply(this._collection, Array.prototype.slice.call(arguments));
},
if (self._name)
self._server.stream.emit('livedata', {
msg: 'method',
method: '/' + self._name + '/remove',
params: [selector],
id: Meteor.uuid()});
self._collection.remove(selector);
},
insert: function (obj) {
// Generate an id for the object.
// XXX mutates the object passed in. that is not cool.
if (obj._id)
Meteor._debug("WARNING: trying to insert object w/ _id set");
var _id = Collection.uuid();
obj._id = _id;
schema: function () {
// XXX not implemented yet
},
if (this._name)
Meteor._stream.emit('livedata', {
api: function (methods) {
var self = this;
_.each(methods, function (func, method) {
self[method] = function (/* arguments */) {
// (must turn 'arguments' into a plain array so as not to
// confuse stringify)
var params = [].slice.call(arguments);
// run the handler ourselves
methods[method].apply(null, params);
// tell the server to run the handler
if (self._name)
self._server.stream.emit('livedata', {
msg: 'method',
method: '/' + this._name + '/insert',
params: [obj], id: Meteor.uuid()});
this._collection.insert(obj);
return obj;
},
update: function (selector, mutator, options) {
if (this._name)
Meteor._stream.emit('livedata', {
msg: 'method',
method: '/' + this._name + '/update',
params: [selector, mutator, options],
method: '/' + self._name + '/' + method,
params: params,
id: Meteor.uuid()});
this._collection.update(selector, mutator, options);
},
};
}, self);
}
});
remove: function (selector) {
if (arguments.length === 0)
selector = {};
if (this._name)
Meteor._stream.emit('livedata', {
msg: 'method',
method: '/' + this._name + '/remove',
params: [selector],
id: Meteor.uuid()});
this._collection.remove(selector);
},
App = new Meteor.Server('/');
schema: function () {
// XXX not implemented yet
},
_.extend(Meteor, {
is_server: false,
is_client: true,
api: function (methods) {
_.each(methods, function (func, method) {
this[method] = function (/* arguments */) {
// (must turn 'arguments' into a plain array so as not to
// confuse stringify)
var params = [].slice.call(arguments);
// XXX these are wrong
status: function () {
return App.stream.status();
},
// run the handler ourselves
methods[method].apply(null, params);
reconnect: function () {
return App.stream.reconnect();
},
// tell the server to run the handler
if (this._name)
Meteor._stream.emit('livedata', {
msg: 'method',
method: '/' + this._name + '/' + method,
params: params,
id: Meteor.uuid()});
};
}, this);
reset: function (callback) {
return App.stream.reset(callback);
},
publish: function() {
// ignored on the client
},
// XXX make the user create it directly, with 'new'
Collection: function (name) {
return new Meteor._Collection(name, App);
},
subscribe: function (/* arguments */) {
return App.subscribe.apply(App, _.toArray(arguments));
},
autosubscribe: function (sub_func) {
var local_subs = [];
var context = new Meteor.deps.Context();
context.on_invalidate(function () {
// recurse.
Meteor.autosubscribe(sub_func);
// unsub after re-subbing, to avoid bouncing.
_.each(local_subs, function (x) { x.stop() });
});
context.run(function () {
if (Meteor._capture_subs)
throw new Error("Meteor.autosubscribe may not be called recursively");
Meteor._capture_subs = [];
try {
sub_func();
} finally {
local_subs = Meteor._capture_subs;
Meteor._capture_subs = null;
}
};
if (name)
collections[name] = ret;
return ret;
};
_.extend(Meteor, {
is_server: false,
is_client: true,
publish: function() {
// ignored on the client
},
subscribe: function (name, args, callback) {
var id;
var existing = subs.find({name: name, args: args}, {reactive: false}).fetch();
if (existing && existing[0]) {
// already subbed, inc count.
id = existing[0]._id;
subs.update({_id: id}, {$inc: {count: 1}});
if (callback) {
if (sub_ready_callbacks[id])
sub_ready_callbacks[id].push(callback);
else
callback(); // XXX maybe _.defer?
}
} else {
// new sub, add object.
// generate our own id so we can know it w/ a find afterwards.
id = Collection.uuid();
subs.insert({_id: id, name: name, args: args, count: 1});
sub_ready_callbacks[id] = [];
if (callback)
sub_ready_callbacks[id].push(callback);
}
// return an object with a stop method.
var token = {stop: function () {
if (!id) return; // must have an id (local from above).
// just update the database. observe takes care of the rest.
subs.update({_id: id}, {$inc: {count: -1}});
}};
if (capture_subs) capture_subs.push(token);
return token;
},
autosubscribe: function (sub_func) {
var local_subs = [];
var context = new Meteor.deps.Context();
context.on_invalidate(function () {
// recurse.
Meteor.autosubscribe(sub_func);
// unsub after re-subbing, to avoid bouncing.
_.each(local_subs, function (x) { x.stop() });
});
context.run(function () {
if (capture_subs)
throw new Error("Meteor.autosubscribe may not be called recursively");
capture_subs = [];
try {
sub_func();
} finally {
local_subs = capture_subs;
capture_subs = undefined;
}
});
}
});
})();
});
}
});

View File

@@ -1,14 +1,50 @@
if (typeof Meteor === "undefined") Meteor = {};
(function () {
Meteor._LivedataServer = function () {
var self = this;
////////// Internals //////////
self.publishes = {};
self.collections = {};
self.methods = {};
self.stream_server = new Meteor._StreamServer;
var publishes = {};
var collections = {};
var methods = {};
self.stream_server.register(function (socket) {
socket.meteor = {};
socket.meteor.subs = [];
socket.meteor.cache = {};
socket.meteor.pending_method_ids = [];
socket.on('livedata', function (msg) {
if (typeof(msg) !== 'object' || !msg.msg) {
Meteor._debug("discarding invalid livedata message", msg);
return;
}
if (msg.msg === 'connect')
self._livedata_connect(socket, msg);
else if (msg.msg === 'sub')
self._livedata_sub(socket, msg);
else if (msg.msg === 'unsub')
self._livedata_unsub(socket, msg);
else if (msg.msg === 'method')
self._livedata_method(socket, msg);
else
Meteor._debug("discarding unknown livedata message type", msg);
});
// 5/sec updates tops, once every 10sec min.
socket.meteor.throttled_poll = _.throttle(function () {
self._poll_subscriptions(socket)
}, 50); // XXX only 50ms! for great speed. might want higher in prod.
socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000);
});
};
_.extend(Meteor._LivedataServer.prototype, {
_poll_subscriptions: function (socket) {
var self = this;
var poll_subscriptions = function (socket) {
Fiber(function () {
// holds a clean copy of client's data. channel.send will
// populate new_cache, then we compute the difference with the old
@@ -40,7 +76,7 @@ if (typeof Meteor === "undefined") Meteor = {};
// actually run the subscriptions.
_.each(socket.meteor.subs, function (sub) {
var pub = publishes[sub.name];
var pub = self.publishes[sub.name];
if (!pub) {
// XXX error unknown publish
console.log("ERROR UNKNOWN PUBLISH " + sub.name);
@@ -129,15 +165,18 @@ if (typeof Meteor === "undefined") Meteor = {};
socket.meteor.pending_method_ids = [];
}).run();
};
},
var livedata_connect = function (socket, msg) {
_livedata_connect: function (socket, msg) {
var self = this;
// Always start a new session. We don't support any reconnection.
socket.emit('livedata', {msg: 'connected', session: Meteor.uuid()});
};
},
var livedata_sub = function (socket, msg) {
if (!publishes[msg.name]) {
_livedata_sub: function (socket, msg) {
var self = this;
if (!self.publishes[msg.name]) {
// can't sub to unknown publish name
// XXX error value
socket.emit('livedata', {
@@ -146,25 +185,27 @@ if (typeof Meteor === "undefined") Meteor = {};
}
socket.meteor.subs.push({_id: msg.id, name: msg.name, params: msg.params});
poll_subscriptions(socket);
};
self._poll_subscriptions(socket);
},
var livedata_unsub = function (socket, msg) {
_livedata_unsub: function (socket, msg) {
var self = this;
socket.emit('livedata', {msg: 'nosub', id: msg.id});
socket.meteor.subs = _.filter(socket.meteor.subs, function (x) {
return x._id !== msg.id;
});
poll_subscriptions(socket);
};
self._poll_subscriptions(socket);
},
var livedata_method = function (socket, msg) {
_livedata_method: function (socket, msg) {
var self = this;
// XXX note that running this in a fiber means that two serial
// requests from the client can try to execute in parallel.. we're
// going to have to think that through at some point. also, consider
// races against Meteor.Collection(), though this shouldn't happen in
// most normal use cases
Fiber(function () {
var func = msg.method && methods[msg.method];
var func = msg.method && self.methods[msg.method];
if (!func) {
socket.emit('livedata', {
msg: 'result', id: msg.id,
@@ -190,196 +231,179 @@ if (typeof Meteor === "undefined") Meteor = {};
// after the method, rerun all the subscriptions as stuff may have
// changed.
_.each(Meteor._stream.all_sockets(), function(x) {
_.each(self.stream_server.all_sockets(), function(x) {
if (x && x.meteor)
x.meteor.throttled_poll();
});
}).run();
};
},
Meteor._stream.register(function (socket) {
socket.meteor = {};
socket.meteor.subs = [];
socket.meteor.cache = {};
socket.meteor.pending_method_ids = [];
/**
* Defines a live dataset that clients can subscribe to.
*
* @param name {String} identifier for query
* @param options {Object}
*
* options to contain:
* - collection {Collection} collection; defaults to the collection
* named 'name' on disk in mongodb
* - selector {Function<args> OR Object} either a mongodb selector,
* or a function that takes the argument object passed to
* Meteor.subscribe and returns a mongodb selector. default {}
*/
publish: function (name, options) {
var self = this;
socket.on('livedata', function (msg) {
if (typeof(msg) !== 'object' || !msg.msg) {
Meteor._debug("discarding invalid livedata message", msg);
return;
}
if (msg.msg === 'connect')
livedata_connect(socket, msg);
else if (msg.msg === 'sub')
livedata_sub(socket, msg);
else if (msg.msg === 'unsub')
livedata_unsub(socket, msg);
else if (msg.msg === 'method')
livedata_method(socket, msg);
else
Meteor._debug("discarding unknown livedata message type", msg);
});
// 5/sec updates tops, once every 10sec min.
socket.meteor.throttled_poll = _.throttle(function () {
poll_subscriptions(socket)
}, 50); // XXX only 50ms! for great speed. might want higher in prod.
socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000);
});
////////// User visible API //////////
_.extend(Meteor, {
is_server: true,
is_client: false,
/**
* Defines a live dataset that clients can subscribe to.
*
* @param name {String} identifier for query
* @param options {Object}
*
* options to contain:
* - collection {Collection} collection; defaults to the collection
* named 'name' on disk in mongodb
* - selector {Function<args> OR Object} either a mongodb selector,
* or a function that takes the argument object passed to
* Meteor.subscribe and returns a mongodb selector. default {}
*/
publish: function (name, options) {
if (name in publishes) {
// XXX error duplicate publish
console.log("ERROR DUPLICATE PUBLISH " + name);
return;
}
options = options || {};
var collection = options.collection || collections[name];
if (!collection)
throw new Error("No collection '" + name + "' found to publish. " +
"You can specify the collection explicitly with the " +
"'collection' option.");
var selector = options.selector || {};
var func = function (channel, params) {
var opt = function (key, or) {
var x = options[key] || or;
return (x instanceof Function) ? x(params) : x;
};
channel.send(collection._name, collection.find(opt("selector", {}), {
sort: opt("sort"),
skip: opt("skip"),
limit: opt("limit")
}).fetch());
};
publishes[name] = func;
},
subscribe: function () {
// ignored on server
},
autosubscribe: function () {
// ignored on server
if (name in self.publishes) {
// XXX error duplicate publish
console.log("ERROR DUPLICATE PUBLISH " + name);
return;
}
});
Meteor.Collection = function (name) {
if (!name)
// XXX maybe support this using minimongo?
throw new Error("Anonymous collections aren't allowed on the server");
var ret = {
_name: name,
_api: {},
// XXX there are probably a lot of little places where this API
// and minimongo diverge. we should track each of those down and
// kill it.
find: function (selector, options) {
if (arguments.length === 0)
selector = {};
return new Meteor._mongo_driver.Cursor(this._name, selector, options);
},
findOne: function (selector, options) {
if (arguments.length === 0)
selector = {};
// XXX when implementing observe() on the server, either
// support limit or remove this performance hack.
options = options || {};
options.limit = 1;
return this.find(selector, options).fetch()[0];
},
insert: function (doc) {
// do id allocation here, so we never end up with an ObjectID.
// This only happens if some calls this directly on the server,
// since normally ids are allocated on the client and sent over
// the wire to us.
if (! doc._id) {
// copy doc because we mess with it. only shallow copy.
new_doc = {};
_.extend(new_doc, doc);
doc = new_doc;
doc._id = Meteor.uuid();
}
Meteor._mongo_driver.insert(this._name, doc);
// return the doc w/ _id, so we can use it.
return doc;
},
update: function (selector, mod, options) {
return Meteor._mongo_driver.update(this._name, selector, mod, options);
},
remove: function (selector) {
if (arguments.length === 0)
selector = {};
return Meteor._mongo_driver.remove(this._name, selector);
},
schema: function () {
// XXX not implemented yet
},
// Backwards compatibility for old handler API.
// Still put the function in the Collection object, also make a
// method entry for calls coming in over the wire.
api: function (local_methods) {
for (var method in local_methods) {
this[method] = _.bind(local_methods[method], null);
if (name)
methods['/' + name + '/' + method] = this[method];
}
}
options = options || {};
var collection = options.collection || self.collections[name];
if (!collection)
throw new Error("No collection '" + name + "' found to publish. " +
"You can specify the collection explicitly with the " +
"'collection' option.");
var selector = options.selector || {};
var func = function (channel, params) {
var opt = function (key, or) {
var x = options[key] || or;
return (x instanceof Function) ? x(params) : x;
};
channel.send(collection._name, collection.find(opt("selector", {}), {
sort: opt("sort"),
skip: opt("skip"),
limit: opt("limit")
}).fetch());
};
if (name) {
collections[name] = ret;
// XXX temporary automatically generated methods for mongo mutators
methods['/' + name + '/insert'] = function (obj) {
ret.insert(obj);
};
methods['/' + name + '/update'] = function (selector, mutator, options) {
ret.update(selector, mutator, options);
};
methods['/' + name + '/remove'] = function (selector) {
ret.remove(selector);
};
self.publishes[name] = func;
}
});
Meteor._Collection = function (name, server) {
var self = this;
if (!name)
// XXX maybe support this using minimongo?
throw new Error("Anonymous collections aren't allowed on the server");
self._name = name;
self._api = {};
self._server = server;
if (name) {
self._server.collections[name] = self;
// XXX temporary automatically generated methods for mongo mutators
self._server.methods['/' + name + '/insert'] = function (obj) {
ret.insert(obj);
};
self._server.methods['/' + name + '/update'] = function (selector, mutator, options) {
ret.update(selector, mutator, options);
};
self._server.methods['/' + name + '/remove'] = function (selector) {
ret.remove(selector);
};
}
};
_.extend(Meteor._Collection.prototype, {
// XXX there are probably a lot of little places where this API
// and minimongo diverge. we should track each of those down and
// kill it.
find: function (selector, options) {
var self = this;
if (arguments.length === 0)
selector = {};
return new Meteor._mongo_driver.Cursor(self._name, selector, options);
},
findOne: function (selector, options) {
var self = this;
if (arguments.length === 0)
selector = {};
// XXX when implementing observe() on the server, either
// support limit or remove this performance hack.
options = options || {};
options.limit = 1;
return self.find(selector, options).fetch()[0];
},
insert: function (doc) {
var self = this;
// do id allocation here, so we never end up with an ObjectID.
// This only happens if some calls this directly on the server,
// since normally ids are allocated on the client and sent over
// the wire to us.
if (! doc._id) {
// copy doc because we mess with it. only shallow copy.
new_doc = {};
_.extend(new_doc, doc);
doc = new_doc;
doc._id = Meteor.uuid();
}
return ret;
};
})();
Meteor._mongo_driver.insert(self._name, doc);
// return the doc w/ _id, so we can use it.
return doc;
},
update: function (selector, mod, options) {
var self = this;
return Meteor._mongo_driver.update(self._name, selector, mod, options);
},
remove: function (selector) {
var self = this;
if (arguments.length === 0)
selector = {};
return Meteor._mongo_driver.remove(self._name, selector);
},
schema: function () {
// XXX not implemented yet
},
// Backwards compatibility for old handler API.
// Still put the function in the Collection object, also make a
// method entry for calls coming in over the wire.
api: function (local_methods) {
var self = this;
for (var method in local_methods) {
self[method] = _.bind(local_methods[method], null);
if (self._name)
self._server.methods['/' + self._name + '/' + method] = self[method];
}
}
});
// XXX temporary -- rename
TheServer = new Meteor._LivedataServer;
_.extend(Meteor, {
is_server: true,
is_client: false,
publish: _.bind(TheServer.publish, TheServer),
// XXX eliminate shim; have app do it directly
Collection: function (name) {
return new Meteor._Collection(name, TheServer);
},
// these are ignored on the server
subscribe: function () {},
autosubscribe: function () {}
});

View File

@@ -15,275 +15,296 @@ if (typeof Meteor === "undefined") Meteor = {};
// already:
// https://github.com/3rd-Eden/Socket.IO/tree/bugs/reconnect
Meteor._Stream = function (url) {
var self = this;
(function () {
self.url = url;
self.socket = null;
self.event_callbacks = {}; // name -> [callback]
self.reset_callbacks = [];
self.message_queue = {}; // id -> message
self.next_message_id = 0;
self.server_id = null;
////////// Constants //////////
//// Constants
// how long to wait until we declare the connection attempt
// failed. socket.io doesn't tell us sometimes.
// https://github.com/LearnBoost/socket.io-client/issues/214
// https://github.com/LearnBoost/socket.io-client/issues/311
var CONNECT_TIMEOUT = 10000;
self.CONNECT_TIMEOUT = 10000;
// extra time to make sure our timer and socket.ios timer don't
// collide.
var CONNECT_TIMEOUT_SLOP = 1000;
self.CONNECT_TIMEOUT_SLOP = 1000;
// time for initial reconnect attempt.
var RETRY_BASE_TIMEOUT = 1000;
self.RETRY_BASE_TIMEOUT = 1000;
// exponential factor to increase timeout each attempt.
var RETRY_EXPONENT = 2.2;
self.RETRY_EXPONENT = 2.2;
// maximum time between reconnects.
var RETRY_MAX_TIMEOUT = 1800000; // 30min.
self.RETRY_MAX_TIMEOUT = 1800000; // 30min.
// time to wait for the first 2 retries. this helps page reload
// speed during dev mode restarts, but doesn't hurt prod too
// much (due to CONNECT_TIMEOUT)
var RETRY_MIN_TIMEOUT = 10;
self.RETRY_MIN_TIMEOUT = 10;
// fuzz factor to randomize reconnect times by. avoid reconnect
// storms.
var RETRY_FUZZ = 0.5; // +- 25%
self.RETRY_FUZZ = 0.5; // +- 25%
////////// Internals //////////
var socket;
var event_callbacks = {}; // name -> [callback]
var reset_callbacks = [];
var message_queue = {}; // id -> message
var next_message_id = 0;
var server_id;
//// reactive status stuff
var status = {
//// Reactive status
self.status = {
status: "waiting", connected: false, retry_count: 0
};
var status_listeners = {}; // context.id -> context
var status_changed = function () {
_.each(status_listeners, function (context) {
self.status_listeners = {}; // context.id -> context
self.status_changed = function () {
_.each(self.status_listeners, function (context) {
context.invalidate();
});
};
//// retry logic
var retry_timer;
var connection_timer;
//// Retry logic
self.retry_timer = null;
self.connection_timer = null;
var connected = function (welcome_data) {
if (connection_timer) {
clearTimeout(connection_timer);
connection_timer = undefined;
//// Saving and restoring state
Meteor._reload.on_migrate('stream', function () {
return { message_list: _.toArray(self.message_queue) };
});
var migration_data = Meteor._reload.migration_data('stream');
if (migration_data && migration_data.message_list) {
_.each(migration_data.message_list, function (msg) {
self.message_queue[self.next_message_id++] = msg;
});
}
//// Kickoff!
self._launch_connection();
};
_.extend(Meteor._Stream.prototype, {
on: function (name, callback) {
var self = this;
if (!self.event_callbacks[name])
self.event_callbacks[name] = [];
self.event_callbacks[name].push(callback);
if (self.socket)
self.socket.on(name, callback);
},
emit: function (/* arguments */) {
var self = this;
var args = _.toArray(arguments);
var id = self.next_message_id++;
self.message_queue[id] = args;
if (self.status.connected) {
self.socket.json.send(args, function () {
delete self.message_queue[id];
});
}
},
// provide a hook for modules to re-initialize state upon new
// connection. callback is a function that takes a message list and
// returns a message list. modules use this to strip out unneeded
// messages and/or insert new messages. NOTE: this API is weird! We
// probably want to revisit this, potentially adding some sort of
// namespacing so multiple modules can share the stream more
// gracefully.
reset: function (callback) {
var self = this;
self.reset_callbacks.push(callback);
},
status: function () {
var self = this;
var context = Meteor.deps && Meteor.deps.Context.current;
if (context && !(context.id in self.status_listeners)) {
self.status_listeners[context.id] = context;
context.on_invalidate(function () {
delete self.status_listeners[context.id];
});
}
return self.status;
},
reconnect: function () {
var self = this;
if (self.status.connected)
return; // already connected. noop.
// if we're mid-connection, stop it.
if (self.status.status === "connecting") {
self._fake_connect_failed();
}
if (status.connected) {
if (self.retry_timer)
clearTimeout(self.retry_timer);
self.retry_timer = undefined;
self.status.retry_count -= 1; // don't count manual retries
self._retry_now();
},
_connected: function (welcome_data) {
var self = this;
if (self.connection_timer) {
clearTimeout(self.connection_timer);
self.connection_timer = undefined;
}
if (self.status.connected) {
// already connected. do nothing. this probably shouldn't happen.
return;
}
// inspect the welcome data and decide if we have to reload
if (welcome_data && welcome_data.server_id) {
if (server_id && server_id !== welcome_data.server_id) {
if (self.server_id && self.server_id !== welcome_data.server_id) {
Meteor._reload.reload();
// world's about to end, just leave the connection 'connecting'
// until it does.
return;
}
server_id = welcome_data.server_id;
self.server_id = welcome_data.server_id;
} else {
Meteor._debug("DEBUG: invalid welcome packet", welcome_data);
}
// give everyone a chance to munge the message queue.
var msg_list = _.toArray(message_queue);
_.each(reset_callbacks, function (callback) {
var msg_list = _.toArray(self.message_queue);
_.each(self.reset_callbacks, function (callback) {
msg_list = callback(msg_list);
});
message_queue = {};
self.message_queue = {};
_.each(msg_list, function (msg) {
message_queue[next_message_id++] = msg;
self.message_queue[self.next_message_id++] = msg;
});
// send the pending message queue. this should always be in
// order, since the keys are ordered numerically and they are added
// in order.
_.each(message_queue, function (msg, id) {
socket.json.send(msg, function () {
delete message_queue[id];
_.each(self.message_queue, function (msg, id) {
self.socket.json.send(msg, function () {
delete self.message_queue[id];
});
});
status.status = "connected";
status.connected = true;
status.retry_count = 0;
status_changed();
self.status.status = "connected";
self.status.connected = true;
self.status.retry_count = 0;
self.status_changed();
},
_cleanup_socket: function () {
var self = this;
};
var cleanup_socket = function () {
if (socket) {
if (self.socket) {
if (socket.$events) {
_.each(socket.$events, function (v, k) {
socket.removeAllListeners(k);
if (self.socket.$events) {
_.each(self.socket.$events, function (v, k) {
self.socket.removeAllListeners(k);
});
}
socket.disconnect();
self.socket.disconnect();
var old_socket = socket;
socket = undefined;
var old_socket = self.socket;
self.socket = null;
old_socket.on('connect', function () {
Meteor._debug("DEBUG: OLD SOCKET RECONNECTED", old_socket);
old_socket.disconnect();
});
}
};
},
var disconnected = function () {
if (connection_timer) {
clearTimeout(connection_timer);
connection_timer = undefined;
_disconnected: function () {
var self = this;
if (self.connection_timer) {
clearTimeout(self.connection_timer);
self.connection_timer = undefined;
}
cleanup_socket();
retry_later(); // sets status. no need to do it here.
};
var fake_connect_failed = function () {
self._cleanup_socket();
self._retry_later(); // sets status. no need to do it here.
},
_fake_connect_failed: function () {
var self = this;
// sometimes socket.io just doesn't tell us when it failed. we
// detect this with a timer and force failure.
cleanup_socket();
disconnected();
};
self._cleanup_socket();
self._disconnected();
},
_retry_timeout: function (count) {
var self = this;
var retry_timeout = function (count) {
if (count < 2)
return RETRY_MIN_TIMEOUT;
return self.RETRY_MIN_TIMEOUT;
var timeout = Math.min(
RETRY_MAX_TIMEOUT,
RETRY_BASE_TIMEOUT * Math.pow(RETRY_EXPONENT, count));
self.RETRY_MAX_TIMEOUT,
self.RETRY_BASE_TIMEOUT * Math.pow(self.RETRY_EXPONENT, count));
// fuzz the timeout randomly, to avoid reconnect storms when a
// server goes down.
timeout = timeout * ((Math.random() * RETRY_FUZZ) + (1 - RETRY_FUZZ/2));
timeout = timeout * ((Math.random() * self.RETRY_FUZZ) +
(1 - self.RETRY_FUZZ/2));
return timeout;
};
var retry_later = function () {
var timeout = retry_timeout(status.retry_count)
if (retry_timer) { clearTimeout(retry_timer); }
retry_timer = setTimeout(retry_now, timeout);
},
status.status = "waiting"
status.connected = false;
status.retry_time = (new Date()).getTime() + timeout;
status_changed();
};
var retry_now = function () {
status.retry_count += 1;
status.status = "connecting";
status.connected = false;
delete status.retry_time;
status_changed();
_retry_later: function () {
var self = this;
launch_connection();
};
var timeout = self._retry_timeout(self.status.retry_count)
if (self.retry_timer)
clearTimeout(self.retry_timer);
self.retry_timer = setTimeout(_.bind(self._retry_now, self), timeout);
var launch_connection = function () {
cleanup_socket(); // cleanup the old socket, if there was one.
self.status.status = "waiting"
self.status.connected = false;
self.status.retry_time = (new Date()).getTime() + timeout;
self.status_changed();
},
socket = io.connect('/', { reconnect: false,
'connect timeout': CONNECT_TIMEOUT,
'force new connection': true } );
socket.once('welcome', connected);
socket.on('disconnect', disconnected);
socket.on('connect_failed', disconnected);
_retry_now: function () {
var self = this;
_.each(event_callbacks, function (callbacks, name) {
self.status.retry_count += 1;
self.status.status = "connecting";
self.status.connected = false;
delete self.status.retry_time;
self.status_changed();
self._launch_connection();
},
_launch_connection: function () {
var self = this;
self._cleanup_socket(); // cleanup the old socket, if there was one.
self.socket = io.connect(self.url, {
reconnect: false,
'connect timeout': self.CONNECT_TIMEOUT,
'force new connection': true
});
self.socket.once('welcome', _.bind(self._connected, self));
self.socket.on('disconnect', _.bind(self._disconnected, self));
self.socket.on('connect_failed', _.bind(self._disconnected, self));
_.each(self.event_callbacks, function (callbacks, name) {
_.each(callbacks, function (callback) {
socket.on(name, callback);
self.socket.on(name, callback);
});
});
if (connection_timer) clearTimeout(connection_timer);
connection_timer = setTimeout(fake_connect_failed,
CONNECT_TIMEOUT + CONNECT_TIMEOUT_SLOP);
};
////////// Save and restore state //////////
Meteor._reload.on_migrate('stream', function () {
return { message_list: _.toArray(message_queue) };
});
var migration_data = Meteor._reload.migration_data('stream');
if (migration_data && migration_data.message_list) {
_.each(migration_data.message_list, function (msg) {
message_queue[next_message_id++] = msg;
});
if (self.connection_timer)
clearTimeout(self.connection_timer);
var timeout = self.CONNECT_TIMEOUT + self.CONNECT_TIMEOUT_SLOP;
self.connection_timer = setTimeout(_.bind(self._fake_connect_failed, self),
timeout);
}
////////// User facing API //////////
Meteor.status = function () {
var context = Meteor.deps.Context.current;
if (context && !(context.id in status_listeners)) {
status_listeners[context.id] = context;
context.on_invalidate(function () {
delete status_listeners[context.id];
});
}
return status;
};
Meteor.reconnect = function () {
if (status.connected) return; // already connected. noop.
// if we're mid-connection, stop it.
if (status.status === "connecting") {
fake_connect_failed();
}
if (retry_timer) clearTimeout(retry_timer);
retry_timer = undefined;
status.retry_count -= 1; // don't count manual retries
retry_now();
};
////////// API for other packages //////////
Meteor._stream = {
on: function (name, callback) {
if (!event_callbacks[name]) event_callbacks[name] = [];
event_callbacks[name].push(callback);
if (socket) socket.on(name, callback);
},
emit: function (/* var args */) {
var args = _.toArray(arguments);
var id = next_message_id++;
message_queue[id] = args;
if (status.connected) {
socket.json.send(args, function () {
delete message_queue[id];
});
}
},
// provide a hook for modules to re-initialize state upon new
// connection. callback is a function that takes a message list and
// returns a message list. modules use this to strip out unneeded
// messages and/or insert new messages. NOTE: this API is weird! We
// probably want to revisit this, potentially adding some sort of
// namespacing so multiple modules can share the stream more
// gracefully.
reset: function (callback) {
reset_callbacks.push(callback);
}
};
////////// Kickoff! //////////
launch_connection();
})();
});

View File

@@ -1,41 +1,39 @@
if (typeof Meteor === "undefined") Meteor = {};
(function () {
////////// Internals //////////
var registration_callbacks = [];
Meteor._StreamServer = function () {
var self = this;
self.registration_callbacks = [];
// unique id for this instantiation of the server. If this changes
// between client reconnects, the client will reload. In production,
// we might want to make this the bundle id, so that if runner restarts
// we don't force clients to reload unneccesarily. Or we could integrate
// with the bundler and have this be a hash of all the code.
var server_id = Meteor.uuid();
self.server_id = Meteor.uuid();
// basic socketio setup
// set up socket.io
var socketio = __meteor_bootstrap__.require('socket.io');
var io = socketio.listen(__meteor_bootstrap__.app);
io.configure(function() {
self.io = socketio.listen(__meteor_bootstrap__.app);
self.io.configure(function() {
// Don't serve static files from socket.io. We serve them separately
// to get gzip and other fun things.
io.set('browser client', false);
self.io.set('browser client', false);
io.set('log level', 1);
self.io.set('log level', 1);
// XXX disable websockets! they break chrome both debugging
// and node-http-proxy (used in outer app)
io.set('transports', _.without(io.transports(), 'websocket'));
self.io.set('transports', _.without(self.io.transports(), 'websocket'));
});
io.sockets.on('connection', function (socket) {
self.io.sockets.on('connection', function (socket) {
// Send a welcome message with the server_id. Client uses this to
// reload if needed.
socket.emit('welcome', {server_id: server_id});
socket.emit('welcome', {server_id: self.server_id});
// call all our callbacks when we get a new socket. they will do the
// work of setting up handlers and such for specific messages.
_.each(registration_callbacks, function (callback) {
_.each(self.registration_callbacks, function (callback) {
callback(socket);
});
@@ -45,23 +43,22 @@ if (typeof Meteor === "undefined") Meteor = {};
socket.$emit.apply(socket, msg);
});
});
};
////////// API for other packages //////////
_.extend(Meteor._StreamServer.prototype, {
// call my callback when a new socket connects.
// also call it for all current connections.
register: function (callback) {
var self = this;
self.registration_callbacks.push(callback);
_.each(self.io.sockets.sockets, function (socket) {
callback(socket);
});
},
Meteor._stream = {
// call my callback when a new socket connects.
// also call it for all current connections.
register: function (callback) {
registration_callbacks.push(callback);
_.each(io.sockets.sockets, function (socket) {
callback(socket);
});
},
// get a list of all sockets
all_sockets: function () {
return io.sockets.sockets;
}
};
})();
// get a list of all sockets
all_sockets: function () {
var self = this;
return self.io.sockets.sockets;
}
});