From 5f27e47c9f03c6ed2ac4110c86d6ea46aa8056eb Mon Sep 17 00:00:00 2001 From: Geoff Schmidt Date: Wed, 8 Feb 2012 20:49:48 -0800 Subject: [PATCH] Convert livedata and stream from closures to objects --- packages/livedata/livedata_client.js | 489 +++++++++++++++------------ packages/livedata/livedata_server.js | 420 ++++++++++++----------- packages/stream/stream_client.js | 389 +++++++++++---------- packages/stream/stream_server.js | 63 ++-- 4 files changed, 727 insertions(+), 634 deletions(-) diff --git a/packages/livedata/livedata_client.js b/packages/livedata/livedata_client.js index 2b6dfd5624..aed864e7a4 100644 --- a/packages/livedata/livedata_client.js +++ b/packages/livedata/livedata_client.js @@ -1,44 +1,144 @@ if (typeof Meteor === "undefined") Meteor = {}; -(function () { - var collections = {}; // name -> Collection-type object +// list of subscription tokens outstanding during a +// captureDependencies run. only set when we're doing a run. The fact +// that this is a singleton means we can't do recursive +// Meteor.subscriptions(). But who wants that? What does that even mean? +// XXX namespacing +Meteor._capture_subs = null; - var subs = new Collection(); +Meteor.Server = function (url) { + var self = this; + self.url = url; + self.collections = {}; // name -> Collection-type object + + self.subs = new Collection; // keyed by subs._id. value is unset or an array. if set, sub is not // yet ready. - var sub_ready_callbacks = {}; - // list of subscription tokens outstanding during a - // captureDependencies run. only set when we're doing a run. The fact - // that this is a singleton means we can't do recursive - // Meteor.subscriptions(). But who wants that? What does that even mean? - var capture_subs; + self.sub_ready_callbacks = {}; + + self.stream = new Meteor._Stream(self.url); // all socket.io traffic is framed as a "livedata" message. - Meteor._stream.on('livedata', function (msg) { + self.stream.on('livedata', function (msg) { if (typeof(msg) !== 'object' || !msg.msg) { Meteor._debug("discarding invalid livedata message", msg); return; } if (msg.msg === 'connected') - livedata_connected(msg); + self._livedata_connected(msg); else if (msg.msg === 'data') - livedata_data(msg); + self._livedata_data(msg); else if (msg.msg === 'nosub') - livedata_nosub(msg); + self._livedata_nosub(msg); else if (msg.msg === 'result') - livedata_result(msg); + self._livedata_result(msg); else Meteor._debug("discarding unknown livedata message type", msg); }); - var livedata_connected = function (msg) { - // Meteor._debug("CONNECTED", msg); - }; + self.stream.reset(function (msg_list) { + // remove all 'livedata' message except 'method' + msg_list = _.filter(msg_list, function (elem) { + return (elem && (elem[0] !== "livedata" || + (elem[1] && elem[1].msg === "method"))); + }); + + // Send a connect message at the beginning of the stream. + // NOTE: reset is called even on the first connection, so this is + // the only place we send this message. + msg_list.unshift(['livedata', {msg: 'connect'}]); + + // add new subscriptions at the end. this way they take effect after + // the handlers and we don't see flicker. + self.subs.find().forEach(function (sub) { + msg_list.push( + ['livedata', + {msg: 'sub', id: sub._id, name: sub.name, params: sub.args}]); + }); + + // clear out the local database! + _.each(self.collections, function (col) { + col._collection.remove({}); + }); + + return msg_list; + }); + + // we never terminate the observe(), since there is no way to + // destroy a Server.. but this shouldn't matter, since we're the + // only one that holds a reference to the self.subs collection + self.subs_token = self.subs.find({}).observe({ + added: function (sub) { + self.stream.emit('livedata', { + msg: 'sub', id: sub._id, name: sub.name, params: sub.args}); + }, + changed: function (sub) { + if (sub.count <= 0) { + // minimongo not re-entrant. + _.defer(function () { self.subs.remove({_id: sub._id}); }); + } + }, + removed: function (id) { + self.stream.emit('livedata', {msg: 'unsub', id: id}); + } + }); +}; + +_.extend(Meteor.Server.prototype, { + subscribe: function (name, args, callback) { + var self = this; + var id; + var existing = self.subs.find({name: name, args: args}, {reactive: false}).fetch(); + + if (existing && existing[0]) { + // already subbed, inc count. + id = existing[0]._id; + self.subs.update({_id: id}, {$inc: {count: 1}}); + + if (callback) { + if (self.sub_ready_callbacks[id]) + self.sub_ready_callbacks[id].push(callback); + else + callback(); // XXX maybe _.defer? + } + } else { + // new sub, add object. + // generate our own id so we can know it w/ a find afterwards. + id = Collection.uuid(); + self.subs.insert({_id: id, name: name, args: args, count: 1}); + + self.sub_ready_callbacks[id] = []; + + if (callback) + self.sub_ready_callbacks[id].push(callback); + } + + // return an object with a stop method. + var token = {stop: function () { + if (!id) return; // must have an id (local from above). + // just update the database. observe takes care of the rest. + self.subs.update({_id: id}, {$inc: {count: -1}}); + }}; + + if (Meteor._capture_subs) + Meteor._capture_subs.push(token); + + return token; + }, + + _livedata_connected: function (msg) { + var self = this; + + // Meteor._debug("CONNECTED", msg); + }, + + _livedata_data: function (msg) { + var self = this; - var livedata_data = function (msg) { if (msg.collection && msg.id) { - var meteor_coll = collections[msg.collection]; + var meteor_coll = self.collections[msg.collection]; if (!meteor_coll) { Meteor._debug( @@ -72,235 +172,186 @@ if (typeof Meteor === "undefined") Meteor = {}; if (msg.subs) { _.each(msg.subs, function (id) { - var arr = sub_ready_callbacks[id]; + var arr = self.sub_ready_callbacks[id]; if (arr) _.each(arr, function (c) { c(); }); - delete sub_ready_callbacks[id]; + delete self.sub_ready_callbacks[id]; }); } if (msg.methods) { // Meteor._debug("METHODCOMPLETE", msg.methods); } - }; + }, - var livedata_nosub = function (msg) { + _livedata_nosub: function (msg) { + var self = this; // Meteor._debug("NOSUB", msg); - }; + }, - var livedata_result = function (msg) { + _livedata_result: function (msg) { + var self = this; // Meteor._debug("RESULT", msg); - }; + } +}); - Meteor._stream.reset(function (msg_list) { - // remove all 'livedata' message except 'method' - msg_list = _.filter(msg_list, function (elem) { - return (elem && (elem[0] !== "livedata" || - (elem[1] && elem[1].msg === "method"))); - }); +Meteor._Collection = function (name, server) { + var self = this; - // Send a connect message at the beginning of the stream. - // NOTE: reset is called even on the first connection, so this is - // the only place we send this message. - msg_list.unshift(['livedata', {msg: 'connect'}]); + if (name && (name in server.collections)) + // maybe should just return server.collections[name]? + throw new Error("There is already a remote collection '" + name + "'"); - // add new subscriptions at the end. this way they take effect after - // the handlers and we don't see flicker. - subs.find().forEach(function (sub) { - msg_list.push( - ['livedata', - {msg: 'sub', id: sub._id, name: sub.name, params: sub.args}]); - }); + self._name = name; + self._collection = new Collection; + self._server = server; - // clear out the local database! - _.each(collections, function (col) { - col._collection.remove({}); - }); + if (name) + server.collections[name] = self; +}; - return msg_list; - }); +_.extend(Meteor._Collection.prototype, { + find: function (/* selector, options */) { + var self = this; + // Collection.find() (return all docs) behaves differently + // from Collection.find(undefined) (return 0 docs). so be + // careful about preserving the length of arguments when + // descending into minimongo. + return self._collection.find.apply(self._collection, Array.prototype.slice.call(arguments)); + }, - var subsToken = subs.find({}).observe({ - added: function (sub) { - Meteor._stream.emit('livedata', { - msg: 'sub', id: sub._id, name: sub.name, params: sub.args}); + findOne: function (/* selector, options */) { + var self = this; + // as above + return self._collection.findOne.apply(self._collection, Array.prototype.slice.call(arguments)); + }, + + insert: function (obj) { + var self = this; + // Generate an id for the object. + // XXX mutates the object passed in. that is not cool. + if (obj._id) + Meteor._debug("WARNING: trying to insert object w/ _id set"); + var _id = Collection.uuid(); + obj._id = _id; + + if (self._name) + self._server.stream.emit('livedata', { + msg: 'method', + method: '/' + self._name + '/insert', + params: [obj], id: Meteor.uuid()}); + self._collection.insert(obj); + + return obj; }, - changed: function (sub) { - if (sub.count <= 0) { - // minimongo not re-entrant. - _.defer(function () { subs.remove({_id: sub._id}); }); - } - }, - removed: function (id) { - Meteor._stream.emit('livedata', {msg: 'unsub', id: id}); - } - }); - // XXX let it take a second argument, the URL of the domain that - // hosts the collection :) - Meteor.Collection = function (name) { - if (name && (name in collections)) - // maybe should just return collections[name]? - throw new Error("There is already a remote collection '" + name + "'"); + update: function (selector, mutator, options) { + var self = this; + if (self._name) + self._server.stream.emit('livedata', { + msg: 'method', + method: '/' + self._name + '/update', + params: [selector, mutator, options], + id: Meteor.uuid()}); + self._collection.update(selector, mutator, options); + }, - var ret = { - _name: name, - _collection: new Collection(), + remove: function (selector) { + var self = this; - find: function (/* selector, options */) { - // Collection.find() (return all docs) behaves differently - // from Collection.find(undefined) (return 0 docs). so be - // careful about preserving the length of arguments when - // descending into minimongo. - return this._collection.find.apply(this._collection, Array.prototype.slice.call(arguments)); - }, + if (arguments.length === 0) + selector = {}; - findOne: function (/* selector, options */) { - // as above - return this._collection.findOne.apply(this._collection, Array.prototype.slice.call(arguments)); - }, + if (self._name) + self._server.stream.emit('livedata', { + msg: 'method', + method: '/' + self._name + '/remove', + params: [selector], + id: Meteor.uuid()}); + self._collection.remove(selector); + }, - insert: function (obj) { - // Generate an id for the object. - // XXX mutates the object passed in. that is not cool. - if (obj._id) - Meteor._debug("WARNING: trying to insert object w/ _id set"); - var _id = Collection.uuid(); - obj._id = _id; + schema: function () { + // XXX not implemented yet + }, - if (this._name) - Meteor._stream.emit('livedata', { + api: function (methods) { + var self = this; + + _.each(methods, function (func, method) { + self[method] = function (/* arguments */) { + // (must turn 'arguments' into a plain array so as not to + // confuse stringify) + var params = [].slice.call(arguments); + + // run the handler ourselves + methods[method].apply(null, params); + + // tell the server to run the handler + if (self._name) + self._server.stream.emit('livedata', { msg: 'method', - method: '/' + this._name + '/insert', - params: [obj], id: Meteor.uuid()}); - this._collection.insert(obj); - - return obj; - }, - - update: function (selector, mutator, options) { - if (this._name) - Meteor._stream.emit('livedata', { - msg: 'method', - method: '/' + this._name + '/update', - params: [selector, mutator, options], + method: '/' + self._name + '/' + method, + params: params, id: Meteor.uuid()}); - this._collection.update(selector, mutator, options); - }, + }; + }, self); + } +}); - remove: function (selector) { - if (arguments.length === 0) - selector = {}; - if (this._name) - Meteor._stream.emit('livedata', { - msg: 'method', - method: '/' + this._name + '/remove', - params: [selector], - id: Meteor.uuid()}); - this._collection.remove(selector); - }, +App = new Meteor.Server('/'); - schema: function () { - // XXX not implemented yet - }, +_.extend(Meteor, { + is_server: false, + is_client: true, - api: function (methods) { - _.each(methods, function (func, method) { - this[method] = function (/* arguments */) { - // (must turn 'arguments' into a plain array so as not to - // confuse stringify) - var params = [].slice.call(arguments); + // XXX these are wrong + status: function () { + return App.stream.status(); + }, - // run the handler ourselves - methods[method].apply(null, params); + reconnect: function () { + return App.stream.reconnect(); + }, - // tell the server to run the handler - if (this._name) - Meteor._stream.emit('livedata', { - msg: 'method', - method: '/' + this._name + '/' + method, - params: params, - id: Meteor.uuid()}); - }; - }, this); + reset: function (callback) { + return App.stream.reset(callback); + }, + + publish: function() { + // ignored on the client + }, + + // XXX make the user create it directly, with 'new' + Collection: function (name) { + return new Meteor._Collection(name, App); + }, + + subscribe: function (/* arguments */) { + return App.subscribe.apply(App, _.toArray(arguments)); + }, + + autosubscribe: function (sub_func) { + var local_subs = []; + var context = new Meteor.deps.Context(); + + context.on_invalidate(function () { + // recurse. + Meteor.autosubscribe(sub_func); + // unsub after re-subbing, to avoid bouncing. + _.each(local_subs, function (x) { x.stop() }); + }); + + context.run(function () { + if (Meteor._capture_subs) + throw new Error("Meteor.autosubscribe may not be called recursively"); + + Meteor._capture_subs = []; + try { + sub_func(); + } finally { + local_subs = Meteor._capture_subs; + Meteor._capture_subs = null; } - }; - - if (name) - collections[name] = ret; - - return ret; - }; - - _.extend(Meteor, { - is_server: false, - is_client: true, - - publish: function() { - // ignored on the client - }, - - subscribe: function (name, args, callback) { - var id; - var existing = subs.find({name: name, args: args}, {reactive: false}).fetch(); - - if (existing && existing[0]) { - // already subbed, inc count. - id = existing[0]._id; - subs.update({_id: id}, {$inc: {count: 1}}); - - if (callback) { - if (sub_ready_callbacks[id]) - sub_ready_callbacks[id].push(callback); - else - callback(); // XXX maybe _.defer? - } - } else { - // new sub, add object. - // generate our own id so we can know it w/ a find afterwards. - id = Collection.uuid(); - subs.insert({_id: id, name: name, args: args, count: 1}); - - sub_ready_callbacks[id] = []; - - if (callback) - sub_ready_callbacks[id].push(callback); - } - - // return an object with a stop method. - var token = {stop: function () { - if (!id) return; // must have an id (local from above). - // just update the database. observe takes care of the rest. - subs.update({_id: id}, {$inc: {count: -1}}); - }}; - - if (capture_subs) capture_subs.push(token); - - return token; - }, - - autosubscribe: function (sub_func) { - var local_subs = []; - var context = new Meteor.deps.Context(); - - context.on_invalidate(function () { - // recurse. - Meteor.autosubscribe(sub_func); - // unsub after re-subbing, to avoid bouncing. - _.each(local_subs, function (x) { x.stop() }); - }); - - context.run(function () { - if (capture_subs) - throw new Error("Meteor.autosubscribe may not be called recursively"); - - capture_subs = []; - try { - sub_func(); - } finally { - local_subs = capture_subs; - capture_subs = undefined; - } - }); - } - }); -})(); + }); + } +}); diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 9d1189c4a1..58d35b46b1 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -1,14 +1,50 @@ if (typeof Meteor === "undefined") Meteor = {}; -(function () { +Meteor._LivedataServer = function () { + var self = this; - ////////// Internals ////////// + self.publishes = {}; + self.collections = {}; + self.methods = {}; + self.stream_server = new Meteor._StreamServer; - var publishes = {}; - var collections = {}; - var methods = {}; + self.stream_server.register(function (socket) { + socket.meteor = {}; + socket.meteor.subs = []; + socket.meteor.cache = {}; + socket.meteor.pending_method_ids = []; + + socket.on('livedata', function (msg) { + if (typeof(msg) !== 'object' || !msg.msg) { + Meteor._debug("discarding invalid livedata message", msg); + return; + } + + if (msg.msg === 'connect') + self._livedata_connect(socket, msg); + else if (msg.msg === 'sub') + self._livedata_sub(socket, msg); + else if (msg.msg === 'unsub') + self._livedata_unsub(socket, msg); + else if (msg.msg === 'method') + self._livedata_method(socket, msg); + else + Meteor._debug("discarding unknown livedata message type", msg); + }); + + + // 5/sec updates tops, once every 10sec min. + socket.meteor.throttled_poll = _.throttle(function () { + self._poll_subscriptions(socket) + }, 50); // XXX only 50ms! for great speed. might want higher in prod. + socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000); + }); +}; + +_.extend(Meteor._LivedataServer.prototype, { + _poll_subscriptions: function (socket) { + var self = this; - var poll_subscriptions = function (socket) { Fiber(function () { // holds a clean copy of client's data. channel.send will // populate new_cache, then we compute the difference with the old @@ -40,7 +76,7 @@ if (typeof Meteor === "undefined") Meteor = {}; // actually run the subscriptions. _.each(socket.meteor.subs, function (sub) { - var pub = publishes[sub.name]; + var pub = self.publishes[sub.name]; if (!pub) { // XXX error unknown publish console.log("ERROR UNKNOWN PUBLISH " + sub.name); @@ -129,15 +165,18 @@ if (typeof Meteor === "undefined") Meteor = {}; socket.meteor.pending_method_ids = []; }).run(); - }; + }, - var livedata_connect = function (socket, msg) { + _livedata_connect: function (socket, msg) { + var self = this; // Always start a new session. We don't support any reconnection. socket.emit('livedata', {msg: 'connected', session: Meteor.uuid()}); - }; + }, - var livedata_sub = function (socket, msg) { - if (!publishes[msg.name]) { + _livedata_sub: function (socket, msg) { + var self = this; + + if (!self.publishes[msg.name]) { // can't sub to unknown publish name // XXX error value socket.emit('livedata', { @@ -146,25 +185,27 @@ if (typeof Meteor === "undefined") Meteor = {}; } socket.meteor.subs.push({_id: msg.id, name: msg.name, params: msg.params}); - poll_subscriptions(socket); - }; + self._poll_subscriptions(socket); + }, - var livedata_unsub = function (socket, msg) { + _livedata_unsub: function (socket, msg) { + var self = this; socket.emit('livedata', {msg: 'nosub', id: msg.id}); socket.meteor.subs = _.filter(socket.meteor.subs, function (x) { return x._id !== msg.id; }); - poll_subscriptions(socket); - }; + self._poll_subscriptions(socket); + }, - var livedata_method = function (socket, msg) { + _livedata_method: function (socket, msg) { + var self = this; // XXX note that running this in a fiber means that two serial // requests from the client can try to execute in parallel.. we're // going to have to think that through at some point. also, consider // races against Meteor.Collection(), though this shouldn't happen in // most normal use cases Fiber(function () { - var func = msg.method && methods[msg.method]; + var func = msg.method && self.methods[msg.method]; if (!func) { socket.emit('livedata', { msg: 'result', id: msg.id, @@ -190,196 +231,179 @@ if (typeof Meteor === "undefined") Meteor = {}; // after the method, rerun all the subscriptions as stuff may have // changed. - _.each(Meteor._stream.all_sockets(), function(x) { + _.each(self.stream_server.all_sockets(), function(x) { if (x && x.meteor) x.meteor.throttled_poll(); }); }).run(); - }; + }, - Meteor._stream.register(function (socket) { - socket.meteor = {}; - socket.meteor.subs = []; - socket.meteor.cache = {}; - socket.meteor.pending_method_ids = []; + /** + * Defines a live dataset that clients can subscribe to. + * + * @param name {String} identifier for query + * @param options {Object} + * + * options to contain: + * - collection {Collection} collection; defaults to the collection + * named 'name' on disk in mongodb + * - selector {Function OR Object} either a mongodb selector, + * or a function that takes the argument object passed to + * Meteor.subscribe and returns a mongodb selector. default {} + */ + publish: function (name, options) { + var self = this; - socket.on('livedata', function (msg) { - if (typeof(msg) !== 'object' || !msg.msg) { - Meteor._debug("discarding invalid livedata message", msg); - return; - } - - if (msg.msg === 'connect') - livedata_connect(socket, msg); - else if (msg.msg === 'sub') - livedata_sub(socket, msg); - else if (msg.msg === 'unsub') - livedata_unsub(socket, msg); - else if (msg.msg === 'method') - livedata_method(socket, msg); - else - Meteor._debug("discarding unknown livedata message type", msg); - }); - - - // 5/sec updates tops, once every 10sec min. - socket.meteor.throttled_poll = _.throttle(function () { - poll_subscriptions(socket) - }, 50); // XXX only 50ms! for great speed. might want higher in prod. - socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000); - }); - - - ////////// User visible API ////////// - - _.extend(Meteor, { - is_server: true, - is_client: false, - - /** - * Defines a live dataset that clients can subscribe to. - * - * @param name {String} identifier for query - * @param options {Object} - * - * options to contain: - * - collection {Collection} collection; defaults to the collection - * named 'name' on disk in mongodb - * - selector {Function OR Object} either a mongodb selector, - * or a function that takes the argument object passed to - * Meteor.subscribe and returns a mongodb selector. default {} - */ - publish: function (name, options) { - if (name in publishes) { - // XXX error duplicate publish - console.log("ERROR DUPLICATE PUBLISH " + name); - return; - } - - options = options || {}; - var collection = options.collection || collections[name]; - if (!collection) - throw new Error("No collection '" + name + "' found to publish. " + - "You can specify the collection explicitly with the " + - "'collection' option."); - var selector = options.selector || {}; - var func = function (channel, params) { - var opt = function (key, or) { - var x = options[key] || or; - return (x instanceof Function) ? x(params) : x; - }; - channel.send(collection._name, collection.find(opt("selector", {}), { - sort: opt("sort"), - skip: opt("skip"), - limit: opt("limit") - }).fetch()); - }; - - publishes[name] = func; - }, - - subscribe: function () { - // ignored on server - }, - - autosubscribe: function () { - // ignored on server + if (name in self.publishes) { + // XXX error duplicate publish + console.log("ERROR DUPLICATE PUBLISH " + name); + return; } - }); - Meteor.Collection = function (name) { - if (!name) - // XXX maybe support this using minimongo? - throw new Error("Anonymous collections aren't allowed on the server"); - - var ret = { - _name: name, - _api: {}, - - // XXX there are probably a lot of little places where this API - // and minimongo diverge. we should track each of those down and - // kill it. - - find: function (selector, options) { - if (arguments.length === 0) - selector = {}; - - return new Meteor._mongo_driver.Cursor(this._name, selector, options); - }, - - findOne: function (selector, options) { - if (arguments.length === 0) - selector = {}; - - // XXX when implementing observe() on the server, either - // support limit or remove this performance hack. - options = options || {}; - options.limit = 1; - return this.find(selector, options).fetch()[0]; - }, - - insert: function (doc) { - // do id allocation here, so we never end up with an ObjectID. - // This only happens if some calls this directly on the server, - // since normally ids are allocated on the client and sent over - // the wire to us. - if (! doc._id) { - // copy doc because we mess with it. only shallow copy. - new_doc = {}; - _.extend(new_doc, doc); - doc = new_doc; - doc._id = Meteor.uuid(); - } - - Meteor._mongo_driver.insert(this._name, doc); - - // return the doc w/ _id, so we can use it. - return doc; - }, - - update: function (selector, mod, options) { - return Meteor._mongo_driver.update(this._name, selector, mod, options); - }, - - remove: function (selector) { - if (arguments.length === 0) - selector = {}; - - return Meteor._mongo_driver.remove(this._name, selector); - }, - - schema: function () { - // XXX not implemented yet - }, - - // Backwards compatibility for old handler API. - // Still put the function in the Collection object, also make a - // method entry for calls coming in over the wire. - api: function (local_methods) { - for (var method in local_methods) { - this[method] = _.bind(local_methods[method], null); - if (name) - methods['/' + name + '/' + method] = this[method]; - } - } + options = options || {}; + var collection = options.collection || self.collections[name]; + if (!collection) + throw new Error("No collection '" + name + "' found to publish. " + + "You can specify the collection explicitly with the " + + "'collection' option."); + var selector = options.selector || {}; + var func = function (channel, params) { + var opt = function (key, or) { + var x = options[key] || or; + return (x instanceof Function) ? x(params) : x; + }; + channel.send(collection._name, collection.find(opt("selector", {}), { + sort: opt("sort"), + skip: opt("skip"), + limit: opt("limit") + }).fetch()); }; - if (name) { - collections[name] = ret; - // XXX temporary automatically generated methods for mongo mutators - methods['/' + name + '/insert'] = function (obj) { - ret.insert(obj); - }; - methods['/' + name + '/update'] = function (selector, mutator, options) { - ret.update(selector, mutator, options); - }; - methods['/' + name + '/remove'] = function (selector) { - ret.remove(selector); - }; + self.publishes[name] = func; + } +}); +Meteor._Collection = function (name, server) { + var self = this; + + if (!name) + // XXX maybe support this using minimongo? + throw new Error("Anonymous collections aren't allowed on the server"); + + self._name = name; + self._api = {}; + self._server = server; + + if (name) { + self._server.collections[name] = self; + // XXX temporary automatically generated methods for mongo mutators + self._server.methods['/' + name + '/insert'] = function (obj) { + ret.insert(obj); + }; + self._server.methods['/' + name + '/update'] = function (selector, mutator, options) { + ret.update(selector, mutator, options); + }; + self._server.methods['/' + name + '/remove'] = function (selector) { + ret.remove(selector); + }; + } +}; + +_.extend(Meteor._Collection.prototype, { + // XXX there are probably a lot of little places where this API + // and minimongo diverge. we should track each of those down and + // kill it. + + find: function (selector, options) { + var self = this; + + if (arguments.length === 0) + selector = {}; + + return new Meteor._mongo_driver.Cursor(self._name, selector, options); + }, + + findOne: function (selector, options) { + var self = this; + + if (arguments.length === 0) + selector = {}; + + // XXX when implementing observe() on the server, either + // support limit or remove this performance hack. + options = options || {}; + options.limit = 1; + return self.find(selector, options).fetch()[0]; + }, + + insert: function (doc) { + var self = this; + + // do id allocation here, so we never end up with an ObjectID. + // This only happens if some calls this directly on the server, + // since normally ids are allocated on the client and sent over + // the wire to us. + if (! doc._id) { + // copy doc because we mess with it. only shallow copy. + new_doc = {}; + _.extend(new_doc, doc); + doc = new_doc; + doc._id = Meteor.uuid(); } - return ret; - }; -})(); + Meteor._mongo_driver.insert(self._name, doc); + + // return the doc w/ _id, so we can use it. + return doc; + }, + + update: function (selector, mod, options) { + var self = this; + return Meteor._mongo_driver.update(self._name, selector, mod, options); + }, + + remove: function (selector) { + var self = this; + + if (arguments.length === 0) + selector = {}; + + return Meteor._mongo_driver.remove(self._name, selector); + }, + + schema: function () { + // XXX not implemented yet + }, + + // Backwards compatibility for old handler API. + // Still put the function in the Collection object, also make a + // method entry for calls coming in over the wire. + api: function (local_methods) { + var self = this; + for (var method in local_methods) { + self[method] = _.bind(local_methods[method], null); + if (self._name) + self._server.methods['/' + self._name + '/' + method] = self[method]; + } + } +}); + +// XXX temporary -- rename +TheServer = new Meteor._LivedataServer; + +_.extend(Meteor, { + is_server: true, + is_client: false, + + publish: _.bind(TheServer.publish, TheServer), + + // XXX eliminate shim; have app do it directly + Collection: function (name) { + return new Meteor._Collection(name, TheServer); + }, + + // these are ignored on the server + subscribe: function () {}, + autosubscribe: function () {} +}); diff --git a/packages/stream/stream_client.js b/packages/stream/stream_client.js index 03feeea967..555dc9bf50 100644 --- a/packages/stream/stream_client.js +++ b/packages/stream/stream_client.js @@ -15,275 +15,296 @@ if (typeof Meteor === "undefined") Meteor = {}; // already: // https://github.com/3rd-Eden/Socket.IO/tree/bugs/reconnect +Meteor._Stream = function (url) { + var self = this; -(function () { + self.url = url; + self.socket = null; + self.event_callbacks = {}; // name -> [callback] + self.reset_callbacks = []; + self.message_queue = {}; // id -> message + self.next_message_id = 0; + self.server_id = null; - ////////// Constants ////////// + //// Constants // how long to wait until we declare the connection attempt // failed. socket.io doesn't tell us sometimes. // https://github.com/LearnBoost/socket.io-client/issues/214 // https://github.com/LearnBoost/socket.io-client/issues/311 - var CONNECT_TIMEOUT = 10000; + self.CONNECT_TIMEOUT = 10000; // extra time to make sure our timer and socket.ios timer don't // collide. - var CONNECT_TIMEOUT_SLOP = 1000; + self.CONNECT_TIMEOUT_SLOP = 1000; // time for initial reconnect attempt. - var RETRY_BASE_TIMEOUT = 1000; + self.RETRY_BASE_TIMEOUT = 1000; // exponential factor to increase timeout each attempt. - var RETRY_EXPONENT = 2.2; + self.RETRY_EXPONENT = 2.2; // maximum time between reconnects. - var RETRY_MAX_TIMEOUT = 1800000; // 30min. + self.RETRY_MAX_TIMEOUT = 1800000; // 30min. // time to wait for the first 2 retries. this helps page reload // speed during dev mode restarts, but doesn't hurt prod too // much (due to CONNECT_TIMEOUT) - var RETRY_MIN_TIMEOUT = 10; - + self.RETRY_MIN_TIMEOUT = 10; // fuzz factor to randomize reconnect times by. avoid reconnect // storms. - var RETRY_FUZZ = 0.5; // +- 25% + self.RETRY_FUZZ = 0.5; // +- 25% - ////////// Internals ////////// - - var socket; - var event_callbacks = {}; // name -> [callback] - var reset_callbacks = []; - var message_queue = {}; // id -> message - var next_message_id = 0; - var server_id; - - //// reactive status stuff - var status = { + //// Reactive status + self.status = { status: "waiting", connected: false, retry_count: 0 }; - var status_listeners = {}; // context.id -> context - var status_changed = function () { - _.each(status_listeners, function (context) { + self.status_listeners = {}; // context.id -> context + self.status_changed = function () { + _.each(self.status_listeners, function (context) { context.invalidate(); }); }; - //// retry logic - var retry_timer; - var connection_timer; + //// Retry logic + self.retry_timer = null; + self.connection_timer = null; - var connected = function (welcome_data) { - if (connection_timer) { - clearTimeout(connection_timer); - connection_timer = undefined; + //// Saving and restoring state + Meteor._reload.on_migrate('stream', function () { + return { message_list: _.toArray(self.message_queue) }; + }); + + var migration_data = Meteor._reload.migration_data('stream'); + if (migration_data && migration_data.message_list) { + _.each(migration_data.message_list, function (msg) { + self.message_queue[self.next_message_id++] = msg; + }); + } + + //// Kickoff! + self._launch_connection(); +}; + +_.extend(Meteor._Stream.prototype, { + on: function (name, callback) { + var self = this; + + if (!self.event_callbacks[name]) + self.event_callbacks[name] = []; + self.event_callbacks[name].push(callback); + if (self.socket) + self.socket.on(name, callback); + }, + + emit: function (/* arguments */) { + var self = this; + + var args = _.toArray(arguments); + var id = self.next_message_id++; + self.message_queue[id] = args; + + if (self.status.connected) { + self.socket.json.send(args, function () { + delete self.message_queue[id]; + }); + } + }, + + // provide a hook for modules to re-initialize state upon new + // connection. callback is a function that takes a message list and + // returns a message list. modules use this to strip out unneeded + // messages and/or insert new messages. NOTE: this API is weird! We + // probably want to revisit this, potentially adding some sort of + // namespacing so multiple modules can share the stream more + // gracefully. + reset: function (callback) { + var self = this; + self.reset_callbacks.push(callback); + }, + + status: function () { + var self = this; + var context = Meteor.deps && Meteor.deps.Context.current; + if (context && !(context.id in self.status_listeners)) { + self.status_listeners[context.id] = context; + context.on_invalidate(function () { + delete self.status_listeners[context.id]; + }); + } + return self.status; + }, + + reconnect: function () { + var self = this; + if (self.status.connected) + return; // already connected. noop. + + // if we're mid-connection, stop it. + if (self.status.status === "connecting") { + self._fake_connect_failed(); } - if (status.connected) { + if (self.retry_timer) + clearTimeout(self.retry_timer); + self.retry_timer = undefined; + self.status.retry_count -= 1; // don't count manual retries + self._retry_now(); + }, + + _connected: function (welcome_data) { + var self = this; + + if (self.connection_timer) { + clearTimeout(self.connection_timer); + self.connection_timer = undefined; + } + + if (self.status.connected) { // already connected. do nothing. this probably shouldn't happen. return; } // inspect the welcome data and decide if we have to reload if (welcome_data && welcome_data.server_id) { - if (server_id && server_id !== welcome_data.server_id) { + if (self.server_id && self.server_id !== welcome_data.server_id) { Meteor._reload.reload(); // world's about to end, just leave the connection 'connecting' // until it does. return; } - server_id = welcome_data.server_id; + self.server_id = welcome_data.server_id; } else { Meteor._debug("DEBUG: invalid welcome packet", welcome_data); } // give everyone a chance to munge the message queue. - var msg_list = _.toArray(message_queue); - _.each(reset_callbacks, function (callback) { + var msg_list = _.toArray(self.message_queue); + _.each(self.reset_callbacks, function (callback) { msg_list = callback(msg_list); }); - message_queue = {}; + self.message_queue = {}; _.each(msg_list, function (msg) { - message_queue[next_message_id++] = msg; + self.message_queue[self.next_message_id++] = msg; }); // send the pending message queue. this should always be in // order, since the keys are ordered numerically and they are added // in order. - _.each(message_queue, function (msg, id) { - socket.json.send(msg, function () { - delete message_queue[id]; + _.each(self.message_queue, function (msg, id) { + self.socket.json.send(msg, function () { + delete self.message_queue[id]; }); }); - status.status = "connected"; - status.connected = true; - status.retry_count = 0; - status_changed(); + self.status.status = "connected"; + self.status.connected = true; + self.status.retry_count = 0; + self.status_changed(); + }, + _cleanup_socket: function () { + var self = this; - }; - var cleanup_socket = function () { - if (socket) { + if (self.socket) { - if (socket.$events) { - _.each(socket.$events, function (v, k) { - socket.removeAllListeners(k); + if (self.socket.$events) { + _.each(self.socket.$events, function (v, k) { + self.socket.removeAllListeners(k); }); } - socket.disconnect(); + self.socket.disconnect(); - var old_socket = socket; - socket = undefined; + var old_socket = self.socket; + self.socket = null; old_socket.on('connect', function () { Meteor._debug("DEBUG: OLD SOCKET RECONNECTED", old_socket); old_socket.disconnect(); }); } - }; + }, - var disconnected = function () { - if (connection_timer) { - clearTimeout(connection_timer); - connection_timer = undefined; + _disconnected: function () { + var self = this; + + if (self.connection_timer) { + clearTimeout(self.connection_timer); + self.connection_timer = undefined; } - cleanup_socket(); - retry_later(); // sets status. no need to do it here. - }; - var fake_connect_failed = function () { + self._cleanup_socket(); + self._retry_later(); // sets status. no need to do it here. + }, + + _fake_connect_failed: function () { + var self = this; // sometimes socket.io just doesn't tell us when it failed. we // detect this with a timer and force failure. - cleanup_socket(); - disconnected(); - }; + self._cleanup_socket(); + self._disconnected(); + }, + + _retry_timeout: function (count) { + var self = this; - var retry_timeout = function (count) { if (count < 2) - return RETRY_MIN_TIMEOUT; + return self.RETRY_MIN_TIMEOUT; var timeout = Math.min( - RETRY_MAX_TIMEOUT, - RETRY_BASE_TIMEOUT * Math.pow(RETRY_EXPONENT, count)); + self.RETRY_MAX_TIMEOUT, + self.RETRY_BASE_TIMEOUT * Math.pow(self.RETRY_EXPONENT, count)); // fuzz the timeout randomly, to avoid reconnect storms when a // server goes down. - timeout = timeout * ((Math.random() * RETRY_FUZZ) + (1 - RETRY_FUZZ/2)); - + timeout = timeout * ((Math.random() * self.RETRY_FUZZ) + + (1 - self.RETRY_FUZZ/2)); return timeout; - }; - var retry_later = function () { - var timeout = retry_timeout(status.retry_count) - if (retry_timer) { clearTimeout(retry_timer); } - retry_timer = setTimeout(retry_now, timeout); + }, - status.status = "waiting" - status.connected = false; - status.retry_time = (new Date()).getTime() + timeout; - status_changed(); - }; - var retry_now = function () { - status.retry_count += 1; - status.status = "connecting"; - status.connected = false; - delete status.retry_time; - status_changed(); + _retry_later: function () { + var self = this; - launch_connection(); - }; + var timeout = self._retry_timeout(self.status.retry_count) + if (self.retry_timer) + clearTimeout(self.retry_timer); + self.retry_timer = setTimeout(_.bind(self._retry_now, self), timeout); - var launch_connection = function () { - cleanup_socket(); // cleanup the old socket, if there was one. + self.status.status = "waiting" + self.status.connected = false; + self.status.retry_time = (new Date()).getTime() + timeout; + self.status_changed(); + }, - socket = io.connect('/', { reconnect: false, - 'connect timeout': CONNECT_TIMEOUT, - 'force new connection': true } ); - socket.once('welcome', connected); - socket.on('disconnect', disconnected); - socket.on('connect_failed', disconnected); + _retry_now: function () { + var self = this; - _.each(event_callbacks, function (callbacks, name) { + self.status.retry_count += 1; + self.status.status = "connecting"; + self.status.connected = false; + delete self.status.retry_time; + self.status_changed(); + + self._launch_connection(); + }, + + _launch_connection: function () { + var self = this; + self._cleanup_socket(); // cleanup the old socket, if there was one. + + self.socket = io.connect(self.url, { + reconnect: false, + 'connect timeout': self.CONNECT_TIMEOUT, + 'force new connection': true + }); + self.socket.once('welcome', _.bind(self._connected, self)); + self.socket.on('disconnect', _.bind(self._disconnected, self)); + self.socket.on('connect_failed', _.bind(self._disconnected, self)); + + _.each(self.event_callbacks, function (callbacks, name) { _.each(callbacks, function (callback) { - socket.on(name, callback); + self.socket.on(name, callback); }); }); - if (connection_timer) clearTimeout(connection_timer); - connection_timer = setTimeout(fake_connect_failed, - CONNECT_TIMEOUT + CONNECT_TIMEOUT_SLOP); - - }; - - ////////// Save and restore state ////////// - - Meteor._reload.on_migrate('stream', function () { - return { message_list: _.toArray(message_queue) }; - }); - - var migration_data = Meteor._reload.migration_data('stream'); - if (migration_data && migration_data.message_list) { - _.each(migration_data.message_list, function (msg) { - message_queue[next_message_id++] = msg; - }); + if (self.connection_timer) + clearTimeout(self.connection_timer); + var timeout = self.CONNECT_TIMEOUT + self.CONNECT_TIMEOUT_SLOP; + self.connection_timer = setTimeout(_.bind(self._fake_connect_failed, self), + timeout); } - - ////////// User facing API ////////// - - Meteor.status = function () { - var context = Meteor.deps.Context.current; - if (context && !(context.id in status_listeners)) { - status_listeners[context.id] = context; - context.on_invalidate(function () { - delete status_listeners[context.id]; - }); - } - return status; - }; - - Meteor.reconnect = function () { - if (status.connected) return; // already connected. noop. - - // if we're mid-connection, stop it. - if (status.status === "connecting") { - fake_connect_failed(); - } - - if (retry_timer) clearTimeout(retry_timer); - retry_timer = undefined; - status.retry_count -= 1; // don't count manual retries - retry_now(); - }; - - - ////////// API for other packages ////////// - - Meteor._stream = { - on: function (name, callback) { - if (!event_callbacks[name]) event_callbacks[name] = []; - event_callbacks[name].push(callback); - if (socket) socket.on(name, callback); - }, - - emit: function (/* var args */) { - var args = _.toArray(arguments); - var id = next_message_id++; - message_queue[id] = args; - - if (status.connected) { - socket.json.send(args, function () { - delete message_queue[id]; - }); - } - }, - - // provide a hook for modules to re-initialize state upon new - // connection. callback is a function that takes a message list and - // returns a message list. modules use this to strip out unneeded - // messages and/or insert new messages. NOTE: this API is weird! We - // probably want to revisit this, potentially adding some sort of - // namespacing so multiple modules can share the stream more - // gracefully. - reset: function (callback) { - reset_callbacks.push(callback); - } - }; - - - ////////// Kickoff! ////////// - launch_connection(); - -})(); +}); diff --git a/packages/stream/stream_server.js b/packages/stream/stream_server.js index 0ad8c2d91a..e946b45f85 100644 --- a/packages/stream/stream_server.js +++ b/packages/stream/stream_server.js @@ -1,41 +1,39 @@ if (typeof Meteor === "undefined") Meteor = {}; -(function () { - - ////////// Internals ////////// - - var registration_callbacks = []; +Meteor._StreamServer = function () { + var self = this; + self.registration_callbacks = []; // unique id for this instantiation of the server. If this changes // between client reconnects, the client will reload. In production, // we might want to make this the bundle id, so that if runner restarts // we don't force clients to reload unneccesarily. Or we could integrate // with the bundler and have this be a hash of all the code. - var server_id = Meteor.uuid(); + self.server_id = Meteor.uuid(); - // basic socketio setup + // set up socket.io var socketio = __meteor_bootstrap__.require('socket.io'); - var io = socketio.listen(__meteor_bootstrap__.app); - io.configure(function() { + self.io = socketio.listen(__meteor_bootstrap__.app); + self.io.configure(function() { // Don't serve static files from socket.io. We serve them separately // to get gzip and other fun things. - io.set('browser client', false); + self.io.set('browser client', false); - io.set('log level', 1); + self.io.set('log level', 1); // XXX disable websockets! they break chrome both debugging // and node-http-proxy (used in outer app) - io.set('transports', _.without(io.transports(), 'websocket')); + self.io.set('transports', _.without(self.io.transports(), 'websocket')); }); - io.sockets.on('connection', function (socket) { + self.io.sockets.on('connection', function (socket) { // Send a welcome message with the server_id. Client uses this to // reload if needed. - socket.emit('welcome', {server_id: server_id}); + socket.emit('welcome', {server_id: self.server_id}); // call all our callbacks when we get a new socket. they will do the // work of setting up handlers and such for specific messages. - _.each(registration_callbacks, function (callback) { + _.each(self.registration_callbacks, function (callback) { callback(socket); }); @@ -45,23 +43,22 @@ if (typeof Meteor === "undefined") Meteor = {}; socket.$emit.apply(socket, msg); }); }); +}; - ////////// API for other packages ////////// +_.extend(Meteor._StreamServer.prototype, { + // call my callback when a new socket connects. + // also call it for all current connections. + register: function (callback) { + var self = this; + self.registration_callbacks.push(callback); + _.each(self.io.sockets.sockets, function (socket) { + callback(socket); + }); + }, - Meteor._stream = { - // call my callback when a new socket connects. - // also call it for all current connections. - register: function (callback) { - registration_callbacks.push(callback); - _.each(io.sockets.sockets, function (socket) { - callback(socket); - }); - }, - - // get a list of all sockets - all_sockets: function () { - return io.sockets.sockets; - } - }; - -})(); + // get a list of all sockets + all_sockets: function () { + var self = this; + return self.io.sockets.sockets; + } +});