Fix method calls during reconnect

User could enter irrecoverable broken state after doing any action while
reconnecting (specifically between stream reset and quiescence).
In this state some or all documents in collections would be missing,
because the loop processing buffered messages is interrupted with an
exception in _process_added.

This commonly happened on slow/bad networks, such as mobile.
This commit is contained in:
Oleksandr Chekhovskyi
2015-09-04 15:07:34 +02:00
parent 7abbceabdf
commit c6ea93f42e
3 changed files with 93 additions and 4 deletions

View File

@@ -440,7 +440,7 @@ _.extend(Connection.prototype, {
// implemented by 'store' into a no-op.
var store = {};
_.each(['update', 'beginUpdate', 'endUpdate', 'saveOriginals',
'retrieveOriginals'], function (method) {
'retrieveOriginals', 'getDoc'], function (method) {
store[method] = function () {
return (wrappedStore[method]
? wrappedStore[method].apply(wrappedStore, arguments)
@@ -1279,10 +1279,24 @@ _.extend(Connection.prototype, {
var serverDoc = self._getServerDoc(msg.collection, id);
if (serverDoc) {
// Some outstanding stub wrote here.
if (serverDoc.document !== undefined)
throw new Error("Server sent add for existing id: " + msg.id);
var isExisting = (serverDoc.document !== undefined);
serverDoc.document = msg.fields || {};
serverDoc.document._id = id;
if (self._resetStores) {
// During reconnect the server is sending adds for existing ids.
// Always push an update so that document stays in the store after
// reset. Use current version of the document for this update, so
// that stub-written values are preserved.
var currentDoc = self._stores[msg.collection].getDoc(msg.id);
if (currentDoc !== undefined)
msg.fields = currentDoc;
self._pushUpdate(updates, msg.collection, msg);
} else if (isExisting) {
throw new Error("Server sent add for existing id: " + msg.id);
}
} else {
self._pushUpdate(updates, msg.collection, msg);
}

View File

@@ -1803,6 +1803,76 @@ if (Meteor.isClient) {
test.equal(coll.find().count(), 0);
});
}
if (Meteor.isClient) {
Tinytest.add("livedata stub - method call between reset and quiescence", function (test) {
var stream = new StubStream();
var conn = newConnection(stream);
startAndConnect(test, stream);
var collName = Random.id();
var coll = new Mongo.Collection(collName, {connection: conn});
conn.methods({
update_value: function () {
coll.update('aaa', {value: 222});
}
});
// Set up test subscription.
var sub = conn.subscribe('test_data');
var subMessage = JSON.parse(stream.sent.shift());
test.equal(subMessage, {msg: 'sub', name: 'test_data',
params: [], id:subMessage.id});
test.length(stream.sent, 0);
var subDocMessage = {msg: 'added', collection: collName,
id: 'aaa', fields: {value: 111}};
var subReadyMessage = {msg: 'ready', 'subs': [subMessage.id]};
stream.receive(subDocMessage);
stream.receive(subReadyMessage);
test.isTrue(coll.findOne('aaa').value == 111);
// Initiate reconnect.
stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, subMessage);
stream.receive({msg: 'connected', session: SESSION_ID + 1});
// Now in reconnect, can still see the document.
test.isTrue(coll.findOne('aaa').value == 111);
conn.call('update_value');
// Observe the stub-written value.
test.isTrue(coll.findOne('aaa').value == 222);
var methodMessage = JSON.parse(stream.sent.shift());
test.equal(methodMessage, {msg: 'method', method: 'update_value',
params: [], id:methodMessage.id});
test.length(stream.sent, 0);
stream.receive(subDocMessage);
stream.receive(subReadyMessage);
// By this point quiescence is reached and stores have been reset.
// The stub-written value is still there.
test.isTrue(coll.findOne('aaa').value == 222);
stream.receive({msg: 'changed', collection: collName,
id: 'aaa', fields: {value: 333}});
stream.receive({msg: 'updated', 'methods': [methodMessage.id]});
stream.receive({msg: 'result', id:methodMessage.id, result:null});
// Server wrote a different value, make sure it's visible now.
test.isTrue(coll.findOne('aaa').value == 333);
});
}
// XXX also test:
// - reconnect, with session resume.
// - restart on update flag

View File

@@ -199,7 +199,12 @@ Mongo.Collection = function (name, options) {
},
retrieveOriginals: function () {
return self._collection.retrieveOriginals();
}
},
// Used to preserve current versions of documents across a store reset.
getDoc: function(id) {
return self.findOne(id);
},
});
if (!ok)