diff --git a/packages/livedata/livedata_client.js b/packages/livedata/livedata_client.js index bfe99178da..1ef23fe579 100644 --- a/packages/livedata/livedata_client.js +++ b/packages/livedata/livedata_client.js @@ -13,43 +13,78 @@ if (typeof Meteor === "undefined") Meteor = {}; // Meteor.subscriptions(). But who wants that? What does that even mean? var capture_subs; - Meteor._stream.on('published', function (data) { - _.each(data, function (changes, collection_name) { - var coll = collections[collection_name]; - if (!coll) { - Meteor._debug( - "discarding data received for unknown collection " + - JSON.stringify(collection_name)); - return; - } + // all socket.io traffic is framed as a "livedata" message. + Meteor._stream.on('livedata', function (msg) { + // connected + // data + // nosub + // result - // XXX this is all a little whack. Need to think about how we handle - // removes, etc. - _.each(changes.inserted || [], function (elt) { - if (!coll.findOne(elt._id)) { - coll._collection.insert(elt); - } else { - // we already added it locally! this is the case after an insert - // handler. - coll._collection.update({_id: elt._id}, elt); - } - }); - _.each(changes.updated || [], function (elt) { - coll._collection.update({_id: elt._id}, elt); - }); - _.each(changes.removed || [], function (id) { - coll._collection.remove({_id: id}); - }); - }); + if (typeof(msg) !== 'object' || !msg.msg) { + Meteor._debug("discarding invalid livedata message", msg); + return; + } + + if (msg.msg === 'connected') + livedata_connected(msg); + else if (msg.msg === 'data') + livedata_data(msg); + else if (msg.msg === 'nosub') + livedata_nosub(msg); + else if (msg.msg === 'result') + livedata_result(msg); + else + Meteor._debug("discarding unknown livedata message type", msg); }); + var livedata_connected = function (msg) { + }; + + var livedata_data = function (msg) { + var meteor_coll = msg.collection && collections[msg.collection]; + + if (!meteor_coll) { + Meteor._debug( + "discarding data received for unknown collection " + + JSON.stringify(msg.collection)); + return; + } + + // do all the work against underlying minimongo collection. + var coll = meteor_coll._collection; + + var doc = coll.findOne(msg.id); + + if (doc + && (!msg.set || msg.set.length === 0) + && _.difference(_.keys(doc), msg.unset, ['_id']).length === 0) { + // what's left is empty, just remove it. cannot fail. + coll.remove(msg.id); + } else if (doc) { + var mutator = {$set: msg.set, $unset: {}}; + _.each(msg.unset, function (propname) { + mutator.$unset[propname] = 1; + }); + // XXX error check return value from update. + coll.update(msg.id, mutator); + } else { + // XXX error check return value from insert. + coll.insert(_.extend({_id: msg.id}, msg.set)); + } + }; + + var livedata_nosub = function (msg) { + }; + + var livedata_result = function (msg) { + }; + Meteor._stream.on('subscription_ready', function (id) { var arr = sub_ready_callbacks[id]; if (arr) _.each(arr, function (c) { c(); }); delete sub_ready_callbacks[id]; }); - Meteor._stream.reset(function (msg_list) { // remove existing subscribe and unsubscribe msg_list = _.reject(msg_list, function (elem) { diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 5f4a87a201..86b6459d03 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -9,29 +9,18 @@ if (typeof Meteor === "undefined") Meteor = {}; var poll_subscriptions = function (socket) { Fiber(function () { - // what we send to the client. - // {collection_name: - // {inserted: [objects], updated: [objects], removed: [ids]}} - var results = {}; - // set of keys we touched in the update. things in the cache that - // are not touched are removed. - var touched_keys = {}; - - var add_to_results = function (collection, which, id) { - if (!(collection in results)) - results[collection] = {}; - if (!(which in results[collection])) - results[collection][which] = []; - results[collection][which].push(id); - }; + // holds a clean copy of client's data. channel.send will + // populate new_cache, then we compute the difference with the old + // cache, send the delta. + var new_cache = {}; // setup a channel object var channel = { + // this gets called by publish lambda with each object. send + // populates the server's copy of what the client has. send: function(collection_name, obj) { if (!(obj instanceof Array)) obj = [obj]; - if (obj.length === 0) - return; _.each(obj, function (o) { if (!o._id) { @@ -42,19 +31,8 @@ if (typeof Meteor === "undefined") Meteor = {}; // | not allowed in collection name? var key = collection_name + "|" + o._id; - var cached = socket.meteor.cache[key]; - socket.meteor.cache[key] = o; - touched_keys[key] = true; - - if (!cached) - add_to_results(collection_name, 'inserted', o); - else if (JSON.stringify(o) !== JSON.stringify(cached)) - // Not canonical order comparison or anything, but close - // enough I hope. We may send some spurious updates? - add_to_results(collection_name, 'updated', o); - else { - // cache hit. do nothing. - } + // insert or extend new_cache with 'o' object + new_cache[key] = _.extend(new_cache[key] || {}, o); }); } }; @@ -71,9 +49,50 @@ if (typeof Meteor === "undefined") Meteor = {}; pub(channel, sub.args); }); - // compute the removed keys. + // emit deltas for each item in the new cache (any object + // created in this poll cycle). + _.each(new_cache, function (new_obj, key) { + var old_obj = socket.meteor.cache[key]; + + // XXX parsing from the string is so ugly. + var parts = key.split("|"); + if (!parts || parts.length !== 2) return; + var collection_name = parts[0]; + var id = parts[1]; + + var msg = {msg: 'data', collection: collection_name, id: id}; + + if (!old_obj) { + var obj_to_send = _.extend({}, new_obj); + delete obj_to_send._id; + msg.set = obj_to_send; + socket.emit('livedata', msg); + + } else { + var set = {}; + var unset = []; + + _.each(new_obj, function (v, k) { + // Not canonical order comparison or anything, but close + // enough I hope. We may send some spurious updates? + if (JSON.stringify(v) !== JSON.stringify(old_obj[k])) + set[k] = v; + }); + + unset = _.difference(_.keys(old_obj), _.keys(new_obj)); + + if (_.keys(set).length > 0) + msg.set = set; + if (unset.length > 0) + msg.unset = unset; + + socket.emit('livedata', msg); + } + }); + + // emit deltas for items in the old cache that no longer exist. var removed_keys = _.difference(_.keys(socket.meteor.cache), - _.keys(touched_keys)); + _.keys(new_cache)); _.each(removed_keys, function (key) { // XXX parsing from the string is so ugly. var parts = key.split("|"); @@ -81,15 +100,13 @@ if (typeof Meteor === "undefined") Meteor = {}; var collection_name = parts[0]; var id = parts[1]; - add_to_results(collection_name, 'removed', id); - delete socket.meteor.cache[key]; + var msg = {msg: 'data', collection: collection_name, id: id}; + msg.unset = _.without(_.keys(socket.meteor.cache[key]), '_id'); + socket.emit('livedata', msg); }); - // if (and only if) any changes, send to client - for (var x in results) { - socket.emit('published', results); - break; - } + // promote new_cache to old_cache + socket.meteor.cache = new_cache; // inform the client that the subscription is ready to go _.each(socket.meteor.subs, function (sub) { @@ -102,7 +119,6 @@ if (typeof Meteor === "undefined") Meteor = {}; }).run(); }; - var register_subscription = function (socket, data) { socket.meteor.subs.push(data); poll_subscriptions(socket); @@ -221,8 +237,8 @@ if (typeof Meteor === "undefined") Meteor = {}; var func = function (channel, params) { var opt = function (key, or) { var x = options[key] || or; - return (x instanceof Function) ? x(params) : x - } + return (x instanceof Function) ? x(params) : x; + }; channel.send(collection._name, collection.find(opt("selector", {}), { sort: opt("sort"), skip: opt("skip"), @@ -313,7 +329,7 @@ if (typeof Meteor === "undefined") Meteor = {}; this._api[method] = methods[method]; } } - } + }; if (name) collections[name] = ret;