Client-side changes so ddp-pre1 affects minimongo

This commit is contained in:
Naomi Seyfer
2012-12-12 16:50:20 -08:00
committed by David Glasser
parent 070bfc72ba
commit cc2a5d3ffb
2 changed files with 115 additions and 68 deletions

View File

@@ -87,6 +87,7 @@ Meteor._LivedataConnection = function (url, options) {
// - "document": the version of the document according the
// server (ie, the snapshot before a stub wrote it, amended by any changes
// received from the server)
// It is undefined if we think the document does not exist
// - "writtenByStubs": a set of method IDs whose stubs wrote to the document
// whose "data done" messages have not yet been processed
self._serverDocuments = {};
@@ -176,7 +177,7 @@ Meteor._LivedataConnection = function (url, options) {
if (msg.msg === 'connected')
self._livedata_connected(msg);
else if (msg.msg === 'data')
else if (_.include(['added', 'changed', 'removed', 'complete', 'updated'], msg.msg))
self._livedata_data(msg);
else if (msg.msg === 'nosub')
self._livedata_nosub(msg);
@@ -850,6 +851,14 @@ _.extend(Meteor._LivedataConnection.prototype, {
}
},
_processOneDataMessage: function (msg, updates) {
var self = this;
// Using underscore here so as not to need to capitalize.
self['_process_' + msg.msg](msg, updates);
},
_livedata_data: function (msg) {
var self = this;
@@ -918,51 +927,81 @@ _.extend(Meteor._LivedataConnection.prototype, {
// reconnect-quiescence time.
_runAfterUpdateCallbacks: function () {
var self = this;
_.each(self._afterUpdateCallbacks, function (c) {
var callbacks = self._afterUpdateCallbacks;
self._afterUpdateCallbacks = [];
_.each(callbacks, function (c) {
c();
});
self._afterUpdateCallbacks = [];
},
// Process a single "data" message. Stores updates (set/unset/replace) in the
// "updates" object (map from collection name to array of updates). Processes
// "method data done" and "sub ready" declarations and schedules the relevant
// callbacks to occur after all currently buffered docs are written to the
// local cache.
_processOneDataMessage: function (msg, updates) {
var self = this;
// Apply writes (set/unset) from the message.
if (msg.collection && msg.id) {
var serverDoc = Meteor._get(
self._serverDocuments, msg.collection, msg.id);
if (serverDoc) {
// A client stub wrote this document, so we have to apply this change to
// the snapshot in serverDoc rather than directly to the database.
// First apply unset (assuming that there are any fields at all.
if (serverDoc.document) {
_.each(msg.unset, function (propname) {
delete serverDoc.document[propname];
});
}
// Now apply set.
_.each(msg.set, function (value, propname) {
if (!serverDoc.document)
serverDoc.document = {};
serverDoc.document[propname] = value;
});
// Now erase the document if it has become empty.
if (serverDoc.document &&
_.isEmpty(_.without(_.keys(serverDoc.document), '_id')))
delete serverDoc.document;
} else {
// No client stub wrote this document, so we can apply it
// directly to the database.
if (!updates[msg.collection])
updates[msg.collection] = [];
updates[msg.collection].push(msg);
}
_pushUpdate: function (updates, collection, msg) {
if (!_.has(updates, collection)) {
updates[collection] = [];
}
updates[collection].push(msg);
},
_process_added: function (msg, updates) {
var self = this;
var serverDoc = Meteor._get(self._serverDocuments, msg.collection, msg.id);
if (serverDoc) {
// Some outstanding stub wrote here.
if (serverDoc.document !== undefined) {
throw new Error("It doesn't make sense to be adding something we know exists: "
+ msg.id);
}
serverDoc.document = msg.fields;
serverDoc.document._id = msg.id;
} else {
self._pushUpdate(updates, msg.collection, msg);
}
},
_process_changed: function (msg, updates) {
var self = this;
var serverDoc = Meteor._get(self._serverDocuments, msg.collection, msg.id);
if (serverDoc) {
if (serverDoc.document === undefined) {
throw new Error("It doesn't make sense to be changing something we don't think exists: "
+ msg.id);
}
_.each(msg.fields, function (value, key) {
serverDoc.document[key] = value;
});
_.each(msg.cleared, function (clearedKey) {
delete serverDoc.document[clearedKey];
});
} else {
self._pushUpdate(updates, msg.collection, msg);
}
},
_process_removed: function (msg, updates) {
var self = this;
var idsRemovedNow = [];
_.each(msg.ids, function (removedId) {
var serverDoc = Meteor._get(self._serverDocuments, msg.collection, removedId);
if (serverDoc) {
// Some outstanding stub wrote here.
if (serverDoc.document === undefined) {
throw new Error("It doesn't make sense to be deleting something we don't know exists: "
+ removedId);
}
serverDoc.document = undefined;
} else {
idsRemovedNow.push(removedId);
}
});
if (!_.isEmpty(idsRemovedNow))
self._pushUpdate(updates, msg.collection, {
msg: 'removed',
collection: msg.collection,
ids: idsRemovedNow
});
},
_process_updated: function (msg, updates) {
var self = this;
// Process "method done" messages.
_.each(msg.methods, function (methodId) {
_.each(self._documentsWrittenByStub[methodId], function (written) {
@@ -979,10 +1018,14 @@ _.extend(Meteor._LivedataConnection.prototype, {
// now copy the saved document to the database (reverting the stub's
// change if the server did not write to this object, or applying the
// server's writes if it did).
if (!updates[written.collection])
updates[written.collection] = [];
updates[written.collection].push({id: written.id,
replace: serverDoc.document});
// This is a fake ddp 'replace' message. It's just for talking between
// livedata connections and minimongo.
self._pushUpdate(updates, written.collection, {
msg: 'replace',
id: written.id,
replace: serverDoc.document
});
// Call all flush callbacks.
_.each(serverDoc.flushCallbacks, function (c) {
c();
@@ -1004,7 +1047,9 @@ _.extend(Meteor._LivedataConnection.prototype, {
self._runWhenAllServerDocsAreFlushed(
_.bind(callbackInvoker.dataVisible, callbackInvoker));
});
},
_process_complete: function (msg, updates) {
// Process "sub ready" messages. "sub ready" messages don't take effect
// until all current server documents have been flushed to the local
// database. We can use a write fence to implement this.

View File

@@ -73,42 +73,44 @@ Meteor.Collection = function (name, options) {
// Is this a "replace the whole doc" message coming from the quiescence
// of method writes to an object? (Note that 'undefined' is a valid
// value meaning "remove it".)
if (_.has(msg, 'replace')) {
if (msg.msg === 'replace') {
var replace = msg.replace;
// An empty doc is equivalent to a nonexistent doc.
if (replace && _.isEmpty(_.without(_.keys(replace), '_id')))
replace = undefined;
if (!replace) {
if (doc)
self._collection.remove(msg.id);
} else if (!doc) {
self._collection.insert(_.extend({_id: msg.id}, replace));
self._collection.insert(replace);
} else {
// XXX check that replace has no $ ops
self._collection.update(msg.id, replace);
}
return;
} else if (msg.msg === 'added') {
if (doc)
throw new Error("Expected not to find a document already present for an add");
self._collection.insert(_.extend({_id: msg.id}, msg.fields));
} else if (msg.msg === 'removed') {
if (!doc)
throw new Error("Expected to find a document to remove");
_.each(msg.ids, function (removedId) {
self._collection.remove(removedId);
});
} else if (msg.msg === 'changed') {
if (!doc)
throw new Error("Expected to find a document to change");
var modifier = {};
if (!_.isEmpty(msg.fields))
modifier.$set = msg.fields;
if (!_.isEmpty(msg.cleared)) {
_.each(msg.cleared, function (propname) {
modifier.$unset[propname] = 1;
});
}
self._collection.update(msg.id, modifier);
} else {
throw new Error("I don't know how to deal with this message");
}
// ... otherwise we're applying set/unset messages against specific
// fields.
if (doc
&& (!msg.set)
&& _.difference(_.keys(doc), msg.unset, ['_id']).length === 0) {
// what's left is empty, just remove it. cannot fail.
self._collection.remove(msg.id);
} else if (doc) {
var mutator = {$set: msg.set, $unset: {}};
_.each(msg.unset, function (propname) {
mutator.$unset[propname] = 1;
});
// XXX error check return value from update.
self._collection.update(msg.id, mutator);
} else {
// XXX error check return value from insert.
if (msg.set)
self._collection.insert(_.extend({_id: msg.id}, msg.set));
}
},
// Called at the end of a batch of updates.