From cc2a5d3ffb3cbcbc449f21b3cebe09aa20ec54b1 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Wed, 12 Dec 2012 16:50:20 -0800 Subject: [PATCH] Client-side changes so ddp-pre1 affects minimongo --- packages/livedata/livedata_connection.js | 133 +++++++++++++++-------- packages/mongo-livedata/collection.js | 50 +++++---- 2 files changed, 115 insertions(+), 68 deletions(-) diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index 9e2fdc1738..9d5918ce89 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -87,6 +87,7 @@ Meteor._LivedataConnection = function (url, options) { // - "document": the version of the document according the // server (ie, the snapshot before a stub wrote it, amended by any changes // received from the server) + // It is undefined if we think the document does not exist // - "writtenByStubs": a set of method IDs whose stubs wrote to the document // whose "data done" messages have not yet been processed self._serverDocuments = {}; @@ -176,7 +177,7 @@ Meteor._LivedataConnection = function (url, options) { if (msg.msg === 'connected') self._livedata_connected(msg); - else if (msg.msg === 'data') + else if (_.include(['added', 'changed', 'removed', 'complete', 'updated'], msg.msg)) self._livedata_data(msg); else if (msg.msg === 'nosub') self._livedata_nosub(msg); @@ -850,6 +851,14 @@ _.extend(Meteor._LivedataConnection.prototype, { } }, + + _processOneDataMessage: function (msg, updates) { + var self = this; + // Using underscore here so as not to need to capitalize. + self['_process_' + msg.msg](msg, updates); + }, + + _livedata_data: function (msg) { var self = this; @@ -918,51 +927,81 @@ _.extend(Meteor._LivedataConnection.prototype, { // reconnect-quiescence time. _runAfterUpdateCallbacks: function () { var self = this; - _.each(self._afterUpdateCallbacks, function (c) { + var callbacks = self._afterUpdateCallbacks; + self._afterUpdateCallbacks = []; + _.each(callbacks, function (c) { c(); }); - self._afterUpdateCallbacks = []; }, - // Process a single "data" message. Stores updates (set/unset/replace) in the - // "updates" object (map from collection name to array of updates). Processes - // "method data done" and "sub ready" declarations and schedules the relevant - // callbacks to occur after all currently buffered docs are written to the - // local cache. - _processOneDataMessage: function (msg, updates) { - var self = this; - // Apply writes (set/unset) from the message. - if (msg.collection && msg.id) { - var serverDoc = Meteor._get( - self._serverDocuments, msg.collection, msg.id); - if (serverDoc) { - // A client stub wrote this document, so we have to apply this change to - // the snapshot in serverDoc rather than directly to the database. - // First apply unset (assuming that there are any fields at all. - if (serverDoc.document) { - _.each(msg.unset, function (propname) { - delete serverDoc.document[propname]; - }); - } - // Now apply set. - _.each(msg.set, function (value, propname) { - if (!serverDoc.document) - serverDoc.document = {}; - serverDoc.document[propname] = value; - }); - // Now erase the document if it has become empty. - if (serverDoc.document && - _.isEmpty(_.without(_.keys(serverDoc.document), '_id'))) - delete serverDoc.document; - } else { - // No client stub wrote this document, so we can apply it - // directly to the database. - if (!updates[msg.collection]) - updates[msg.collection] = []; - updates[msg.collection].push(msg); - } + _pushUpdate: function (updates, collection, msg) { + if (!_.has(updates, collection)) { + updates[collection] = []; } + updates[collection].push(msg); + }, + _process_added: function (msg, updates) { + var self = this; + var serverDoc = Meteor._get(self._serverDocuments, msg.collection, msg.id); + if (serverDoc) { + // Some outstanding stub wrote here. + if (serverDoc.document !== undefined) { + throw new Error("It doesn't make sense to be adding something we know exists: " + + msg.id); + } + serverDoc.document = msg.fields; + serverDoc.document._id = msg.id; + } else { + self._pushUpdate(updates, msg.collection, msg); + } + }, + + _process_changed: function (msg, updates) { + var self = this; + var serverDoc = Meteor._get(self._serverDocuments, msg.collection, msg.id); + if (serverDoc) { + if (serverDoc.document === undefined) { + throw new Error("It doesn't make sense to be changing something we don't think exists: " + + msg.id); + } + _.each(msg.fields, function (value, key) { + serverDoc.document[key] = value; + }); + _.each(msg.cleared, function (clearedKey) { + delete serverDoc.document[clearedKey]; + }); + } else { + self._pushUpdate(updates, msg.collection, msg); + } + }, + + _process_removed: function (msg, updates) { + var self = this; + var idsRemovedNow = []; + _.each(msg.ids, function (removedId) { + var serverDoc = Meteor._get(self._serverDocuments, msg.collection, removedId); + if (serverDoc) { + // Some outstanding stub wrote here. + if (serverDoc.document === undefined) { + throw new Error("It doesn't make sense to be deleting something we don't know exists: " + + removedId); + } + serverDoc.document = undefined; + } else { + idsRemovedNow.push(removedId); + } + }); + if (!_.isEmpty(idsRemovedNow)) + self._pushUpdate(updates, msg.collection, { + msg: 'removed', + collection: msg.collection, + ids: idsRemovedNow + }); + }, + + _process_updated: function (msg, updates) { + var self = this; // Process "method done" messages. _.each(msg.methods, function (methodId) { _.each(self._documentsWrittenByStub[methodId], function (written) { @@ -979,10 +1018,14 @@ _.extend(Meteor._LivedataConnection.prototype, { // now copy the saved document to the database (reverting the stub's // change if the server did not write to this object, or applying the // server's writes if it did). - if (!updates[written.collection]) - updates[written.collection] = []; - updates[written.collection].push({id: written.id, - replace: serverDoc.document}); + + // This is a fake ddp 'replace' message. It's just for talking between + // livedata connections and minimongo. + self._pushUpdate(updates, written.collection, { + msg: 'replace', + id: written.id, + replace: serverDoc.document + }); // Call all flush callbacks. _.each(serverDoc.flushCallbacks, function (c) { c(); @@ -1004,7 +1047,9 @@ _.extend(Meteor._LivedataConnection.prototype, { self._runWhenAllServerDocsAreFlushed( _.bind(callbackInvoker.dataVisible, callbackInvoker)); }); + }, + _process_complete: function (msg, updates) { // Process "sub ready" messages. "sub ready" messages don't take effect // until all current server documents have been flushed to the local // database. We can use a write fence to implement this. diff --git a/packages/mongo-livedata/collection.js b/packages/mongo-livedata/collection.js index a719675820..13eb4ec0d1 100644 --- a/packages/mongo-livedata/collection.js +++ b/packages/mongo-livedata/collection.js @@ -73,42 +73,44 @@ Meteor.Collection = function (name, options) { // Is this a "replace the whole doc" message coming from the quiescence // of method writes to an object? (Note that 'undefined' is a valid // value meaning "remove it".) - if (_.has(msg, 'replace')) { + if (msg.msg === 'replace') { var replace = msg.replace; - // An empty doc is equivalent to a nonexistent doc. - if (replace && _.isEmpty(_.without(_.keys(replace), '_id'))) - replace = undefined; if (!replace) { if (doc) self._collection.remove(msg.id); } else if (!doc) { - self._collection.insert(_.extend({_id: msg.id}, replace)); + self._collection.insert(replace); } else { // XXX check that replace has no $ ops self._collection.update(msg.id, replace); } return; + } else if (msg.msg === 'added') { + if (doc) + throw new Error("Expected not to find a document already present for an add"); + self._collection.insert(_.extend({_id: msg.id}, msg.fields)); + } else if (msg.msg === 'removed') { + if (!doc) + throw new Error("Expected to find a document to remove"); + _.each(msg.ids, function (removedId) { + self._collection.remove(removedId); + }); + } else if (msg.msg === 'changed') { + if (!doc) + throw new Error("Expected to find a document to change"); + var modifier = {}; + if (!_.isEmpty(msg.fields)) + modifier.$set = msg.fields; + if (!_.isEmpty(msg.cleared)) { + _.each(msg.cleared, function (propname) { + modifier.$unset[propname] = 1; + }); + } + self._collection.update(msg.id, modifier); + } else { + throw new Error("I don't know how to deal with this message"); } - // ... otherwise we're applying set/unset messages against specific - // fields. - if (doc - && (!msg.set) - && _.difference(_.keys(doc), msg.unset, ['_id']).length === 0) { - // what's left is empty, just remove it. cannot fail. - self._collection.remove(msg.id); - } else if (doc) { - var mutator = {$set: msg.set, $unset: {}}; - _.each(msg.unset, function (propname) { - mutator.$unset[propname] = 1; - }); - // XXX error check return value from update. - self._collection.update(msg.id, mutator); - } else { - // XXX error check return value from insert. - if (msg.set) - self._collection.insert(_.extend({_id: msg.id}, msg.set)); - } }, // Called at the end of a batch of updates.