Fix new merge box for overlapping universal subs.

We were using the subscription ID heavily inside the merge box, but universal
subs don't have one. Use a "subscription handle" instead.
This commit is contained in:
David Glasser
2013-01-10 11:26:29 -08:00
parent 7a75b8418a
commit 4740d17e1d
3 changed files with 93 additions and 27 deletions

View File

@@ -31,8 +31,8 @@ var diffObjects = function (left, right, callbacks) {
Meteor._SessionDocumentView = function () {
var self = this;
self.existsIn = {}; // set of subId
self.dataByKey = {}; // key-> [ {subscriptionId, value} by precedence]
self.existsIn = {}; // set of subscriptionHandle
self.dataByKey = {}; // key-> [ {subscriptionHandle, value} by precedence]
};
_.extend(Meteor._SessionDocumentView.prototype, {
@@ -46,7 +46,7 @@ _.extend(Meteor._SessionDocumentView.prototype, {
return ret;
},
clearField: function (subscriptionId, key, changeCollector) {
clearField: function (subscriptionHandle, key, changeCollector) {
var self = this;
// Publish API ignores _id if present in fields
if (key === "_id")
@@ -58,7 +58,7 @@ _.extend(Meteor._SessionDocumentView.prototype, {
var removedValue = undefined;
for (var i = 0; i < precedenceList.length; i++) {
var precedence = precedenceList[i];
if (precedence.subscriptionId === subscriptionId) {
if (precedence.subscriptionHandle === subscriptionHandle) {
// The view's value can only change if this subscription is the one that
// used to have precedence.
if (i === 0)
@@ -76,13 +76,15 @@ _.extend(Meteor._SessionDocumentView.prototype, {
}
},
changeField: function (subscriptionId, key, value, changeCollector, isAdd) {
changeField: function (subscriptionHandle, key, value,
changeCollector, isAdd) {
var self = this;
// Publish API ignores _id if present in fields
if (key === "_id")
return;
if (!_.has(self.dataByKey, key)) {
self.dataByKey[key] = [{subscriptionId: subscriptionId, value: value}];
self.dataByKey[key] = [{subscriptionHandle: subscriptionHandle,
value: value}];
changeCollector[key] = value;
return;
}
@@ -90,7 +92,7 @@ _.extend(Meteor._SessionDocumentView.prototype, {
var elt;
if (!isAdd) {
elt = _.find(precedenceList, function (precedence) {
return precedence.subscriptionId === subscriptionId;
return precedence.subscriptionHandle === subscriptionHandle;
});
}
@@ -102,7 +104,7 @@ _.extend(Meteor._SessionDocumentView.prototype, {
elt.value = value;
} else {
// this subscription is newly caring about this field
precedenceList.push({subscriptionId: subscriptionId, value: value});
precedenceList.push({subscriptionHandle: subscriptionHandle, value: value});
}
}
@@ -155,7 +157,7 @@ _.extend(Meteor._SessionCollectionView.prototype, {
self.callbacks.changed(self.collectionName, id, fields);
},
added: function (subscriptionId, id, fields) {
added: function (subscriptionHandle, id, fields) {
var self = this;
var docView = self.documents[id];
var added = false;
@@ -164,10 +166,11 @@ _.extend(Meteor._SessionCollectionView.prototype, {
docView = new Meteor._SessionDocumentView();
self.documents[id] = docView;
}
docView.existsIn[subscriptionId] = true;
docView.existsIn[subscriptionHandle] = true;
var changeCollector = {};
_.each(fields, function (value, key) {
docView.changeField(subscriptionId, key, value, changeCollector, true);
docView.changeField(
subscriptionHandle, key, value, changeCollector, true);
});
if (added)
self.callbacks.added(self.collectionName, id, changeCollector);
@@ -175,7 +178,7 @@ _.extend(Meteor._SessionCollectionView.prototype, {
self.callbacks.changed(self.collectionName, id, changeCollector);
},
changed: function (subscriptionId, id, changed) {
changed: function (subscriptionHandle, id, changed) {
var self = this;
var changedResult = {};
var docView = self.documents[id];
@@ -183,21 +186,21 @@ _.extend(Meteor._SessionCollectionView.prototype, {
throw new Error("Could not find element with id " + id + " to change");
_.each(changed, function (value, key) {
if (value === undefined)
docView.clearField(subscriptionId, key, changedResult);
docView.clearField(subscriptionHandle, key, changedResult);
else
docView.changeField(subscriptionId, key, value, changedResult);
docView.changeField(subscriptionHandle, key, value, changedResult);
});
self.callbacks.changed(self.collectionName, id, changedResult);
},
removed: function (subscriptionId, id) {
removed: function (subscriptionHandle, id) {
var self = this;
var docView = self.documents[id];
if (!docView) {
var err = new Error("Removed nonexistent document " + id);
throw err;
}
delete docView.existsIn[subscriptionId];
delete docView.existsIn[subscriptionHandle];
if (_.isEmpty(docView.existsIn)) {
// it is gone from everyone
self.callbacks.removed(self.collectionName, id);
@@ -207,7 +210,7 @@ _.extend(Meteor._SessionCollectionView.prototype, {
// remove this subscription from every precedence list
// and record the changes
_.each(docView.dataByKey, function (precedenceList, key) {
docView.clearField(subscriptionId, key, changed);
docView.clearField(subscriptionHandle, key, changed);
});
self.callbacks.changed(self.collectionName, id, changed);
@@ -318,25 +321,25 @@ _.extend(Meteor._LivedataSession.prototype, {
return ret;
},
added: function (subscriptionId, collectionName, id, fields) {
added: function (subscriptionHandle, collectionName, id, fields) {
var self = this;
var view = self.getCollectionView(collectionName);
view.added(subscriptionId, id, fields);
view.added(subscriptionHandle, id, fields);
},
removed: function (subscriptionId, collectionName, id) {
removed: function (subscriptionHandle, collectionName, id) {
var self = this;
var view = self.getCollectionView(collectionName);
view.removed(subscriptionId, id);
view.removed(subscriptionHandle, id);
if (view.isEmpty()) {
delete self.collectionViews[collectionName];
}
},
changed: function (subscriptionId, collectionName, id, fields) {
changed: function (subscriptionHandle, collectionName, id, fields) {
var self = this;
var view = self.getCollectionView(collectionName);
view.changed(subscriptionId, id, fields);
view.changed(subscriptionHandle, id, fields);
},
// Connect a new socket to this session, displacing (and closing)
// any socket that was previously connected
@@ -757,9 +760,18 @@ Meteor._LivedataSubscription = function (session, subscriptionId) {
// LivedataSession
self._session = session;
// my subscription ID (generated by client, null for universal subs).
// my subscription ID (generated by client, undefined for universal subs).
self._subscriptionId = subscriptionId;
// Only named subscriptions have IDs, but we need some sort of string
// internally to keep track of all subscriptions inside
// SessionDocumentViews. We use this subscriptionHandle for that.
if (self._subscriptionId) {
self._subscriptionHandle = 'N' + self._subscriptionId;
} else {
self._subscriptionHandle = 'U' + Meteor.id();
}
// has stop() been called?
self._stopped = false;
@@ -821,13 +833,13 @@ _.extend(Meteor._LivedataSubscription.prototype, {
var self = this;
id = self._idFilter.idStringify(id);
Meteor._ensure(self._documents, collectionName)[id] = true;
self._session.added(self._subscriptionId, collectionName, id, fields);
self._session.added(self._subscriptionHandle, collectionName, id, fields);
},
changed: function (collectionName, id, fields) {
var self = this;
id = self._idFilter.idStringify(id);
self._session.changed(self._subscriptionId, collectionName, id, fields);
self._session.changed(self._subscriptionHandle, collectionName, id, fields);
},
removed: function (collectionName, id) {
@@ -836,11 +848,13 @@ _.extend(Meteor._LivedataSubscription.prototype, {
// We don't bother to delete sets of things in a collection if the
// collection is empty. It could break _removeAllDocuments.
delete self._documents[collectionName][id];
self._session.removed(self._subscriptionId, collectionName, id);
self._session.removed(self._subscriptionHandle, collectionName, id);
},
complete: function () {
var self = this;
if (!self._subscriptionId)
throw new Error("Can't complete a universal subscription");
if (!self._complete) {
self._session.sendComplete([self._subscriptionId]);
self._complete = true;

View File

@@ -166,4 +166,42 @@ if (Meteor.isServer) {
}
});
}
/*****/
/// Helper for "livedata - overlapping universal subs"
if (Meteor.isServer) {
(function(){
var collName = "overlappingUniversalSubs";
var universalSubscribers = [[], []];
_.each([0, 1], function (index) {
Meteor.publish(null, function () {
var sub = this;
universalSubscribers[index].push(sub);
sub.onStop(function () {
universalSubscribers[index] = _.without(
universalSubscribers[index], sub);
});
});
});
Meteor.methods({
testOverlappingSubs: function (token) {
_.each(universalSubscribers[0], function (sub) {
sub.added(collName, token, {});
});
_.each(universalSubscribers[1], function (sub) {
sub.added(collName, token, {});
});
_.each(universalSubscribers[0], function (sub) {
sub.removed(collName, token);
});
}
});
})();
}
})();

View File

@@ -397,6 +397,20 @@ Tinytest.add("livedata - setUserId error when called from server", function(test
}
});
if (Meteor.isClient) {
testAsyncMulti("livedata - overlapping universal subs", [
function (test, expect) {
var coll = new Meteor.Collection("overlappingUniversalSubs");
var token = Meteor.uuid();
test.isFalse(coll.findOne(token));
Meteor.call("testOverlappingSubs", token, expect(function (err) {
test.isFalse(err);
test.isTrue(coll.findOne(token));
}));
}
]);
}
// XXX some things to test in greater detail:
// staying in simulation mode
// time warp