Implement Live Data protocol: 'data'.

server now sends attribute diffs to the client, rather than sets of
documents.
This commit is contained in:
matt debergalis
2012-02-07 14:51:33 -08:00
parent 7a88230e48
commit d23b7ff197
2 changed files with 122 additions and 71 deletions

View File

@@ -13,43 +13,78 @@ if (typeof Meteor === "undefined") Meteor = {};
// Meteor.subscriptions(). But who wants that? What does that even mean?
var capture_subs;
Meteor._stream.on('published', function (data) {
_.each(data, function (changes, collection_name) {
var coll = collections[collection_name];
if (!coll) {
Meteor._debug(
"discarding data received for unknown collection " +
JSON.stringify(collection_name));
return;
}
// all socket.io traffic is framed as a "livedata" message.
Meteor._stream.on('livedata', function (msg) {
// connected
// data
// nosub
// result
// XXX this is all a little whack. Need to think about how we handle
// removes, etc.
_.each(changes.inserted || [], function (elt) {
if (!coll.findOne(elt._id)) {
coll._collection.insert(elt);
} else {
// we already added it locally! this is the case after an insert
// handler.
coll._collection.update({_id: elt._id}, elt);
}
});
_.each(changes.updated || [], function (elt) {
coll._collection.update({_id: elt._id}, elt);
});
_.each(changes.removed || [], function (id) {
coll._collection.remove({_id: id});
});
});
if (typeof(msg) !== 'object' || !msg.msg) {
Meteor._debug("discarding invalid livedata message", msg);
return;
}
if (msg.msg === 'connected')
livedata_connected(msg);
else if (msg.msg === 'data')
livedata_data(msg);
else if (msg.msg === 'nosub')
livedata_nosub(msg);
else if (msg.msg === 'result')
livedata_result(msg);
else
Meteor._debug("discarding unknown livedata message type", msg);
});
var livedata_connected = function (msg) {
};
var livedata_data = function (msg) {
var meteor_coll = msg.collection && collections[msg.collection];
if (!meteor_coll) {
Meteor._debug(
"discarding data received for unknown collection " +
JSON.stringify(msg.collection));
return;
}
// do all the work against underlying minimongo collection.
var coll = meteor_coll._collection;
var doc = coll.findOne(msg.id);
if (doc
&& (!msg.set || msg.set.length === 0)
&& _.difference(_.keys(doc), msg.unset, ['_id']).length === 0) {
// what's left is empty, just remove it. cannot fail.
coll.remove(msg.id);
} else if (doc) {
var mutator = {$set: msg.set, $unset: {}};
_.each(msg.unset, function (propname) {
mutator.$unset[propname] = 1;
});
// XXX error check return value from update.
coll.update(msg.id, mutator);
} else {
// XXX error check return value from insert.
coll.insert(_.extend({_id: msg.id}, msg.set));
}
};
var livedata_nosub = function (msg) {
};
var livedata_result = function (msg) {
};
Meteor._stream.on('subscription_ready', function (id) {
var arr = sub_ready_callbacks[id];
if (arr) _.each(arr, function (c) { c(); });
delete sub_ready_callbacks[id];
});
Meteor._stream.reset(function (msg_list) {
// remove existing subscribe and unsubscribe
msg_list = _.reject(msg_list, function (elem) {

View File

@@ -9,29 +9,18 @@ if (typeof Meteor === "undefined") Meteor = {};
var poll_subscriptions = function (socket) {
Fiber(function () {
// what we send to the client.
// {collection_name:
// {inserted: [objects], updated: [objects], removed: [ids]}}
var results = {};
// set of keys we touched in the update. things in the cache that
// are not touched are removed.
var touched_keys = {};
var add_to_results = function (collection, which, id) {
if (!(collection in results))
results[collection] = {};
if (!(which in results[collection]))
results[collection][which] = [];
results[collection][which].push(id);
};
// holds a clean copy of client's data. channel.send will
// populate new_cache, then we compute the difference with the old
// cache, send the delta.
var new_cache = {};
// setup a channel object
var channel = {
// this gets called by publish lambda with each object. send
// populates the server's copy of what the client has.
send: function(collection_name, obj) {
if (!(obj instanceof Array))
obj = [obj];
if (obj.length === 0)
return;
_.each(obj, function (o) {
if (!o._id) {
@@ -42,19 +31,8 @@ if (typeof Meteor === "undefined") Meteor = {};
// | not allowed in collection name?
var key = collection_name + "|" + o._id;
var cached = socket.meteor.cache[key];
socket.meteor.cache[key] = o;
touched_keys[key] = true;
if (!cached)
add_to_results(collection_name, 'inserted', o);
else if (JSON.stringify(o) !== JSON.stringify(cached))
// Not canonical order comparison or anything, but close
// enough I hope. We may send some spurious updates?
add_to_results(collection_name, 'updated', o);
else {
// cache hit. do nothing.
}
// insert or extend new_cache with 'o' object
new_cache[key] = _.extend(new_cache[key] || {}, o);
});
}
};
@@ -71,9 +49,50 @@ if (typeof Meteor === "undefined") Meteor = {};
pub(channel, sub.args);
});
// compute the removed keys.
// emit deltas for each item in the new cache (any object
// created in this poll cycle).
_.each(new_cache, function (new_obj, key) {
var old_obj = socket.meteor.cache[key];
// XXX parsing from the string is so ugly.
var parts = key.split("|");
if (!parts || parts.length !== 2) return;
var collection_name = parts[0];
var id = parts[1];
var msg = {msg: 'data', collection: collection_name, id: id};
if (!old_obj) {
var obj_to_send = _.extend({}, new_obj);
delete obj_to_send._id;
msg.set = obj_to_send;
socket.emit('livedata', msg);
} else {
var set = {};
var unset = [];
_.each(new_obj, function (v, k) {
// Not canonical order comparison or anything, but close
// enough I hope. We may send some spurious updates?
if (JSON.stringify(v) !== JSON.stringify(old_obj[k]))
set[k] = v;
});
unset = _.difference(_.keys(old_obj), _.keys(new_obj));
if (_.keys(set).length > 0)
msg.set = set;
if (unset.length > 0)
msg.unset = unset;
socket.emit('livedata', msg);
}
});
// emit deltas for items in the old cache that no longer exist.
var removed_keys = _.difference(_.keys(socket.meteor.cache),
_.keys(touched_keys));
_.keys(new_cache));
_.each(removed_keys, function (key) {
// XXX parsing from the string is so ugly.
var parts = key.split("|");
@@ -81,15 +100,13 @@ if (typeof Meteor === "undefined") Meteor = {};
var collection_name = parts[0];
var id = parts[1];
add_to_results(collection_name, 'removed', id);
delete socket.meteor.cache[key];
var msg = {msg: 'data', collection: collection_name, id: id};
msg.unset = _.without(_.keys(socket.meteor.cache[key]), '_id');
socket.emit('livedata', msg);
});
// if (and only if) any changes, send to client
for (var x in results) {
socket.emit('published', results);
break;
}
// promote new_cache to old_cache
socket.meteor.cache = new_cache;
// inform the client that the subscription is ready to go
_.each(socket.meteor.subs, function (sub) {
@@ -102,7 +119,6 @@ if (typeof Meteor === "undefined") Meteor = {};
}).run();
};
var register_subscription = function (socket, data) {
socket.meteor.subs.push(data);
poll_subscriptions(socket);
@@ -221,8 +237,8 @@ if (typeof Meteor === "undefined") Meteor = {};
var func = function (channel, params) {
var opt = function (key, or) {
var x = options[key] || or;
return (x instanceof Function) ? x(params) : x
}
return (x instanceof Function) ? x(params) : x;
};
channel.send(collection._name, collection.find(opt("selector", {}), {
sort: opt("sort"),
skip: opt("skip"),
@@ -313,7 +329,7 @@ if (typeof Meteor === "undefined") Meteor = {};
this._api[method] = methods[method];
}
}
}
};
if (name)
collections[name] = ret;