diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 57e48df1c3..efa68a06d0 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -41,381 +41,382 @@ class MongoIDMap extends IdMap { // fails. We should have better usability in the latter case (while // still transparently reconnecting if it's just a transient failure // or the server migrating us). -var Connection = function(url, options) { - var self = this; - options = _.extend( - { - onConnected: function() {}, - onDDPVersionNegotiationFailure: function(description) { - Meteor._debug(description); +class Connection { + constructor(url, options) { + var self = this; + options = _.extend( + { + onConnected() {}, + onDDPVersionNegotiationFailure(description) { + Meteor._debug(description); + }, + heartbeatInterval: 17500, + heartbeatTimeout: 15000, + npmFayeOptions: {}, + // These options are only for testing. + reloadWithOutstanding: false, + supportedDDPVersions: DDPCommon.SUPPORTED_DDP_VERSIONS, + retry: true, + respondToPings: true, + // When updates are coming within this ms interval, batch them together. + bufferedWritesInterval: 5, + // Flush buffers immediately if writes are happening continuously for more than this many ms. + bufferedWritesMaxAge: 500 }, - heartbeatInterval: 17500, - heartbeatTimeout: 15000, - npmFayeOptions: {}, - // These options are only for testing. - reloadWithOutstanding: false, - supportedDDPVersions: DDPCommon.SUPPORTED_DDP_VERSIONS, - retry: true, - respondToPings: true, - // When updates are coming within this ms interval, batch them together. - bufferedWritesInterval: 5, - // Flush buffers immediately if writes are happening continuously for more than this many ms. - bufferedWritesMaxAge: 500 - }, - options - ); + options + ); - // If set, called when we reconnect, queuing method calls _before_ the - // existing outstanding ones. - // NOTE: This feature has been preserved for backwards compatibility. The - // preferred method of setting a callback on reconnect is to use - // DDP.onReconnect. - self.onReconnect = null; + // If set, called when we reconnect, queuing method calls _before_ the + // existing outstanding ones. + // NOTE: This feature has been preserved for backwards compatibility. The + // preferred method of setting a callback on reconnect is to use + // DDP.onReconnect. + self.onReconnect = null; - // as a test hook, allow passing a stream instead of a url. - if (typeof url === 'object') { - self._stream = url; - } else { - self._stream = new LivedataTest.ClientStream(url, { - retry: options.retry, - headers: options.headers, - _sockjsOptions: options._sockjsOptions, - // Used to keep some tests quiet, or for other cases in which - // the right thing to do with connection errors is to silently - // fail (e.g. sending package usage stats). At some point we - // should have a real API for handling client-stream-level - // errors. - _dontPrintErrors: options._dontPrintErrors, - connectTimeoutMs: options.connectTimeoutMs, - npmFayeOptions: options.npmFayeOptions - }); - } - - self._lastSessionId = null; - self._versionSuggestion = null; // The last proposed DDP version. - self._version = null; // The DDP version agreed on by client and server. - self._stores = {}; // name -> object with methods - self._methodHandlers = {}; // name -> func - self._nextMethodId = 1; - self._supportedDDPVersions = options.supportedDDPVersions; - - self._heartbeatInterval = options.heartbeatInterval; - self._heartbeatTimeout = options.heartbeatTimeout; - - // Tracks methods which the user has tried to call but which have not yet - // called their user callback (ie, they are waiting on their result or for all - // of their writes to be written to the local cache). Map from method ID to - // MethodInvoker object. - self._methodInvokers = {}; - - // Tracks methods which the user has called but whose result messages have not - // arrived yet. - // - // _outstandingMethodBlocks is an array of blocks of methods. Each block - // represents a set of methods that can run at the same time. The first block - // represents the methods which are currently in flight; subsequent blocks - // must wait for previous blocks to be fully finished before they can be sent - // to the server. - // - // Each block is an object with the following fields: - // - methods: a list of MethodInvoker objects - // - wait: a boolean; if true, this block had a single method invoked with - // the "wait" option - // - // There will never be adjacent blocks with wait=false, because the only thing - // that makes methods need to be serialized is a wait method. - // - // Methods are removed from the first block when their "result" is - // received. The entire first block is only removed when all of the in-flight - // methods have received their results (so the "methods" list is empty) *AND* - // all of the data written by those methods are visible in the local cache. So - // it is possible for the first block's methods list to be empty, if we are - // still waiting for some objects to quiesce. - // - // Example: - // _outstandingMethodBlocks = [ - // {wait: false, methods: []}, - // {wait: true, methods: []}, - // {wait: false, methods: [, - // ]}] - // This means that there were some methods which were sent to the server and - // which have returned their results, but some of the data written by - // the methods may not be visible in the local cache. Once all that data is - // visible, we will send a 'login' method. Once the login method has returned - // and all the data is visible (including re-running subs if userId changes), - // we will send the 'foo' and 'bar' methods in parallel. - self._outstandingMethodBlocks = []; - - // method ID -> array of objects with keys 'collection' and 'id', listing - // documents written by a given method's stub. keys are associated with - // methods whose stub wrote at least one document, and whose data-done message - // has not yet been received. - self._documentsWrittenByStub = {}; - // collection -> IdMap of "server document" object. A "server document" has: - // - "document": the version of the document according the - // server (ie, the snapshot before a stub wrote it, amended by any changes - // received from the server) - // It is undefined if we think the document does not exist - // - "writtenByStubs": a set of method IDs whose stubs wrote to the document - // whose "data done" messages have not yet been processed - self._serverDocuments = {}; - - // Array of callbacks to be called after the next update of the local - // cache. Used for: - // - Calling methodInvoker.dataVisible and sub ready callbacks after - // the relevant data is flushed. - // - Invoking the callbacks of "half-finished" methods after reconnect - // quiescence. Specifically, methods whose result was received over the old - // connection (so we don't re-send it) but whose data had not been made - // visible. - self._afterUpdateCallbacks = []; - - // In two contexts, we buffer all incoming data messages and then process them - // all at once in a single update: - // - During reconnect, we buffer all data messages until all subs that had - // been ready before reconnect are ready again, and all methods that are - // active have returned their "data done message"; then - // - During the execution of a "wait" method, we buffer all data messages - // until the wait method gets its "data done" message. (If the wait method - // occurs during reconnect, it doesn't get any special handling.) - // all data messages are processed in one update. - // - // The following fields are used for this "quiescence" process. - - // This buffers the messages that aren't being processed yet. - self._messagesBufferedUntilQuiescence = []; - // Map from method ID -> true. Methods are removed from this when their - // "data done" message is received, and we will not quiesce until it is - // empty. - self._methodsBlockingQuiescence = {}; - // map from sub ID -> true for subs that were ready (ie, called the sub - // ready callback) before reconnect but haven't become ready again yet - self._subsBeingRevived = {}; // map from sub._id -> true - // if true, the next data update should reset all stores. (set during - // reconnect.) - self._resetStores = false; - - // name -> array of updates for (yet to be created) collections - self._updatesForUnknownStores = {}; - // if we're blocking a migration, the retry func - self._retryMigrate = null; - - self.__flushBufferedWrites = Meteor.bindEnvironment( - self._flushBufferedWrites, - 'flushing DDP buffered writes', - self - ); - // Collection name -> array of messages. - self._bufferedWrites = {}; - // When current buffer of updates must be flushed at, in ms timestamp. - self._bufferedWritesFlushAt = null; - // Timeout handle for the next processing of all pending writes - self._bufferedWritesFlushHandle = null; - - self._bufferedWritesInterval = options.bufferedWritesInterval; - self._bufferedWritesMaxAge = options.bufferedWritesMaxAge; - - // metadata for subscriptions. Map from sub ID to object with keys: - // - id - // - name - // - params - // - inactive (if true, will be cleaned up if not reused in re-run) - // - ready (has the 'ready' message been received?) - // - readyCallback (an optional callback to call when ready) - // - errorCallback (an optional callback to call if the sub terminates with - // an error, XXX COMPAT WITH 1.0.3.1) - // - stopCallback (an optional callback to call when the sub terminates - // for any reason, with an error argument if an error triggered the stop) - self._subscriptions = {}; - - // Reactive userId. - self._userId = null; - self._userIdDeps = new Tracker.Dependency(); - - // Block auto-reload while we're waiting for method responses. - if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) { - Package.reload.Reload._onMigrate(function(retry) { - if (!self._readyToMigrate()) { - if (self._retryMigrate) throw new Error('Two migrations in progress?'); - self._retryMigrate = retry; - return false; - } else { - return [true]; - } - }); - } - - var onMessage = function(raw_msg) { - try { - var msg = DDPCommon.parseDDP(raw_msg); - } catch (e) { - Meteor._debug('Exception while parsing DDP', e); - return; - } - - // Any message counts as receiving a pong, as it demonstrates that - // the server is still alive. - if (self._heartbeat) { - self._heartbeat.messageReceived(); - } - - if (msg === null || !msg.msg) { - // XXX COMPAT WITH 0.6.6. ignore the old welcome message for back - // compat. Remove this 'if' once the server stops sending welcome - // messages (stream_server.js). - if (!(msg && msg.server_id)) - Meteor._debug('discarding invalid livedata message', msg); - return; - } - - if (msg.msg === 'connected') { - self._version = self._versionSuggestion; - self._livedata_connected(msg); - options.onConnected(); - } else if (msg.msg === 'failed') { - if (_.contains(self._supportedDDPVersions, msg.version)) { - self._versionSuggestion = msg.version; - self._stream.reconnect({ _force: true }); - } else { - var description = - 'DDP version negotiation failed; server requested version ' + - msg.version; - self._stream.disconnect({ _permanent: true, _error: description }); - options.onDDPVersionNegotiationFailure(description); - } - } else if (msg.msg === 'ping' && options.respondToPings) { - self._send({ msg: 'pong', id: msg.id }); - } else if (msg.msg === 'pong') { - // noop, as we assume everything's a pong - } else if ( - _.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg) - ) - self._livedata_data(msg); - else if (msg.msg === 'nosub') self._livedata_nosub(msg); - else if (msg.msg === 'result') self._livedata_result(msg); - else if (msg.msg === 'error') self._livedata_error(msg); - else Meteor._debug('discarding unknown livedata message type', msg); - }; - - var onReset = function() { - // Send a connect message at the beginning of the stream. - // NOTE: reset is called even on the first connection, so this is - // the only place we send this message. - var msg = { msg: 'connect' }; - if (self._lastSessionId) msg.session = self._lastSessionId; - msg.version = self._versionSuggestion || self._supportedDDPVersions[0]; - self._versionSuggestion = msg.version; - msg.support = self._supportedDDPVersions; - self._send(msg); - - // Mark non-retry calls as failed. This has to be done early as getting these methods out of the - // current block is pretty important to making sure that quiescence is properly calculated, as - // well as possibly moving on to another useful block. - - // Only bother testing if there is an outstandingMethodBlock (there might not be, especially if - // we are connecting for the first time. - if (self._outstandingMethodBlocks.length > 0) { - // If there is an outstanding method block, we only care about the first one as that is the - // one that could have already sent messages with no response, that are not allowed to retry. - const currentMethodBlock = self._outstandingMethodBlocks[0].methods; - self._outstandingMethodBlocks[0].methods = currentMethodBlock.filter( - methodInvoker => { - // Methods with 'noRetry' option set are not allowed to re-send after - // recovering dropped connection. - if (methodInvoker.sentMessage && methodInvoker.noRetry) { - // Make sure that the method is told that it failed. - methodInvoker.receiveResult( - new Meteor.Error( - 'invocation-failed', - 'Method invocation might have failed due to dropped connection. ' + - 'Failing because `noRetry` option was passed to Meteor.apply.' - ) - ); - } - - // Only keep a method if it wasn't sent or it's allowed to retry. - // This may leave the block empty, but we don't move on to the next - // block until the callback has been delivered, in _outstandingMethodFinished. - return !(methodInvoker.sentMessage && methodInvoker.noRetry); - } - ); - } - - // Now, to minimize setup latency, go ahead and blast out all of - // our pending methods ands subscriptions before we've even taken - // the necessary RTT to know if we successfully reconnected. (1) - // They're supposed to be idempotent, and where they are not, - // they can block retry in apply; (2) even if we did reconnect, - // we're not sure what messages might have gotten lost - // (in either direction) since we were disconnected (TCP being - // sloppy about that.) - - // If the current block of methods all got their results (but didn't all get - // their data visible), discard the empty block now. - if ( - !_.isEmpty(self._outstandingMethodBlocks) && - _.isEmpty(self._outstandingMethodBlocks[0].methods) - ) { - self._outstandingMethodBlocks.shift(); - } - - // Mark all messages as unsent, they have not yet been sent on this - // connection. - _.each(self._methodInvokers, function(m) { - m.sentMessage = false; - }); - - // If an `onReconnect` handler is set, call it first. Go through - // some hoops to ensure that methods that are called from within - // `onReconnect` get executed _before_ ones that were originally - // outstanding (since `onReconnect` is used to re-establish auth - // certificates) - self._callOnReconnectAndSendAppropriateOutstandingMethods(); - - // add new subscriptions at the end. this way they take effect after - // the handlers and we don't see flicker. - _.each(self._subscriptions, function(sub, id) { - self._send({ - msg: 'sub', - id: id, - name: sub.name, - params: sub.params + // as a test hook, allow passing a stream instead of a url. + if (typeof url === 'object') { + self._stream = url; + } else { + self._stream = new LivedataTest.ClientStream(url, { + retry: options.retry, + headers: options.headers, + _sockjsOptions: options._sockjsOptions, + // Used to keep some tests quiet, or for other cases in which + // the right thing to do with connection errors is to silently + // fail (e.g. sending package usage stats). At some point we + // should have a real API for handling client-stream-level + // errors. + _dontPrintErrors: options._dontPrintErrors, + connectTimeoutMs: options.connectTimeoutMs, + npmFayeOptions: options.npmFayeOptions }); - }); - }; - - var onDisconnect = function() { - if (self._heartbeat) { - self._heartbeat.stop(); - self._heartbeat = null; } - }; - if (Meteor.isServer) { - self._stream.on( - 'message', - Meteor.bindEnvironment(onMessage, 'handling DDP message') + self._lastSessionId = null; + self._versionSuggestion = null; // The last proposed DDP version. + self._version = null; // The DDP version agreed on by client and server. + self._stores = {}; // name -> object with methods + self._methodHandlers = {}; // name -> func + self._nextMethodId = 1; + self._supportedDDPVersions = options.supportedDDPVersions; + + self._heartbeatInterval = options.heartbeatInterval; + self._heartbeatTimeout = options.heartbeatTimeout; + + // Tracks methods which the user has tried to call but which have not yet + // called their user callback (ie, they are waiting on their result or for all + // of their writes to be written to the local cache). Map from method ID to + // MethodInvoker object. + self._methodInvokers = {}; + + // Tracks methods which the user has called but whose result messages have not + // arrived yet. + // + // _outstandingMethodBlocks is an array of blocks of methods. Each block + // represents a set of methods that can run at the same time. The first block + // represents the methods which are currently in flight; subsequent blocks + // must wait for previous blocks to be fully finished before they can be sent + // to the server. + // + // Each block is an object with the following fields: + // - methods: a list of MethodInvoker objects + // - wait: a boolean; if true, this block had a single method invoked with + // the "wait" option + // + // There will never be adjacent blocks with wait=false, because the only thing + // that makes methods need to be serialized is a wait method. + // + // Methods are removed from the first block when their "result" is + // received. The entire first block is only removed when all of the in-flight + // methods have received their results (so the "methods" list is empty) *AND* + // all of the data written by those methods are visible in the local cache. So + // it is possible for the first block's methods list to be empty, if we are + // still waiting for some objects to quiesce. + // + // Example: + // _outstandingMethodBlocks = [ + // {wait: false, methods: []}, + // {wait: true, methods: []}, + // {wait: false, methods: [, + // ]}] + // This means that there were some methods which were sent to the server and + // which have returned their results, but some of the data written by + // the methods may not be visible in the local cache. Once all that data is + // visible, we will send a 'login' method. Once the login method has returned + // and all the data is visible (including re-running subs if userId changes), + // we will send the 'foo' and 'bar' methods in parallel. + self._outstandingMethodBlocks = []; + + // method ID -> array of objects with keys 'collection' and 'id', listing + // documents written by a given method's stub. keys are associated with + // methods whose stub wrote at least one document, and whose data-done message + // has not yet been received. + self._documentsWrittenByStub = {}; + // collection -> IdMap of "server document" object. A "server document" has: + // - "document": the version of the document according the + // server (ie, the snapshot before a stub wrote it, amended by any changes + // received from the server) + // It is undefined if we think the document does not exist + // - "writtenByStubs": a set of method IDs whose stubs wrote to the document + // whose "data done" messages have not yet been processed + self._serverDocuments = {}; + + // Array of callbacks to be called after the next update of the local + // cache. Used for: + // - Calling methodInvoker.dataVisible and sub ready callbacks after + // the relevant data is flushed. + // - Invoking the callbacks of "half-finished" methods after reconnect + // quiescence. Specifically, methods whose result was received over the old + // connection (so we don't re-send it) but whose data had not been made + // visible. + self._afterUpdateCallbacks = []; + + // In two contexts, we buffer all incoming data messages and then process them + // all at once in a single update: + // - During reconnect, we buffer all data messages until all subs that had + // been ready before reconnect are ready again, and all methods that are + // active have returned their "data done message"; then + // - During the execution of a "wait" method, we buffer all data messages + // until the wait method gets its "data done" message. (If the wait method + // occurs during reconnect, it doesn't get any special handling.) + // all data messages are processed in one update. + // + // The following fields are used for this "quiescence" process. + + // This buffers the messages that aren't being processed yet. + self._messagesBufferedUntilQuiescence = []; + // Map from method ID -> true. Methods are removed from this when their + // "data done" message is received, and we will not quiesce until it is + // empty. + self._methodsBlockingQuiescence = {}; + // map from sub ID -> true for subs that were ready (ie, called the sub + // ready callback) before reconnect but haven't become ready again yet + self._subsBeingRevived = {}; // map from sub._id -> true + // if true, the next data update should reset all stores. (set during + // reconnect.) + self._resetStores = false; + + // name -> array of updates for (yet to be created) collections + self._updatesForUnknownStores = {}; + // if we're blocking a migration, the retry func + self._retryMigrate = null; + + self.__flushBufferedWrites = Meteor.bindEnvironment( + self._flushBufferedWrites, + 'flushing DDP buffered writes', + self ); - self._stream.on( - 'reset', - Meteor.bindEnvironment(onReset, 'handling DDP reset') - ); - self._stream.on( - 'disconnect', - Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect') - ); - } else { - self._stream.on('message', onMessage); - self._stream.on('reset', onReset); - self._stream.on('disconnect', onDisconnect); + // Collection name -> array of messages. + self._bufferedWrites = {}; + // When current buffer of updates must be flushed at, in ms timestamp. + self._bufferedWritesFlushAt = null; + // Timeout handle for the next processing of all pending writes + self._bufferedWritesFlushHandle = null; + + self._bufferedWritesInterval = options.bufferedWritesInterval; + self._bufferedWritesMaxAge = options.bufferedWritesMaxAge; + + // metadata for subscriptions. Map from sub ID to object with keys: + // - id + // - name + // - params + // - inactive (if true, will be cleaned up if not reused in re-run) + // - ready (has the 'ready' message been received?) + // - readyCallback (an optional callback to call when ready) + // - errorCallback (an optional callback to call if the sub terminates with + // an error, XXX COMPAT WITH 1.0.3.1) + // - stopCallback (an optional callback to call when the sub terminates + // for any reason, with an error argument if an error triggered the stop) + self._subscriptions = {}; + + // Reactive userId. + self._userId = null; + self._userIdDeps = new Tracker.Dependency(); + + // Block auto-reload while we're waiting for method responses. + if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) { + Package.reload.Reload._onMigrate(function(retry) { + if (!self._readyToMigrate()) { + if (self._retryMigrate) + throw new Error('Two migrations in progress?'); + self._retryMigrate = retry; + return false; + } else { + return [true]; + } + }); + } + + var onMessage = function(raw_msg) { + try { + var msg = DDPCommon.parseDDP(raw_msg); + } catch (e) { + Meteor._debug('Exception while parsing DDP', e); + return; + } + + // Any message counts as receiving a pong, as it demonstrates that + // the server is still alive. + if (self._heartbeat) { + self._heartbeat.messageReceived(); + } + + if (msg === null || !msg.msg) { + // XXX COMPAT WITH 0.6.6. ignore the old welcome message for back + // compat. Remove this 'if' once the server stops sending welcome + // messages (stream_server.js). + if (!(msg && msg.server_id)) + Meteor._debug('discarding invalid livedata message', msg); + return; + } + + if (msg.msg === 'connected') { + self._version = self._versionSuggestion; + self._livedata_connected(msg); + options.onConnected(); + } else if (msg.msg === 'failed') { + if (_.contains(self._supportedDDPVersions, msg.version)) { + self._versionSuggestion = msg.version; + self._stream.reconnect({ _force: true }); + } else { + var description = + 'DDP version negotiation failed; server requested version ' + + msg.version; + self._stream.disconnect({ _permanent: true, _error: description }); + options.onDDPVersionNegotiationFailure(description); + } + } else if (msg.msg === 'ping' && options.respondToPings) { + self._send({ msg: 'pong', id: msg.id }); + } else if (msg.msg === 'pong') { + // noop, as we assume everything's a pong + } else if ( + _.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg) + ) + self._livedata_data(msg); + else if (msg.msg === 'nosub') self._livedata_nosub(msg); + else if (msg.msg === 'result') self._livedata_result(msg); + else if (msg.msg === 'error') self._livedata_error(msg); + else Meteor._debug('discarding unknown livedata message type', msg); + }; + + var onReset = function() { + // Send a connect message at the beginning of the stream. + // NOTE: reset is called even on the first connection, so this is + // the only place we send this message. + var msg = { msg: 'connect' }; + if (self._lastSessionId) msg.session = self._lastSessionId; + msg.version = self._versionSuggestion || self._supportedDDPVersions[0]; + self._versionSuggestion = msg.version; + msg.support = self._supportedDDPVersions; + self._send(msg); + + // Mark non-retry calls as failed. This has to be done early as getting these methods out of the + // current block is pretty important to making sure that quiescence is properly calculated, as + // well as possibly moving on to another useful block. + + // Only bother testing if there is an outstandingMethodBlock (there might not be, especially if + // we are connecting for the first time. + if (self._outstandingMethodBlocks.length > 0) { + // If there is an outstanding method block, we only care about the first one as that is the + // one that could have already sent messages with no response, that are not allowed to retry. + const currentMethodBlock = self._outstandingMethodBlocks[0].methods; + self._outstandingMethodBlocks[0].methods = currentMethodBlock.filter( + methodInvoker => { + // Methods with 'noRetry' option set are not allowed to re-send after + // recovering dropped connection. + if (methodInvoker.sentMessage && methodInvoker.noRetry) { + // Make sure that the method is told that it failed. + methodInvoker.receiveResult( + new Meteor.Error( + 'invocation-failed', + 'Method invocation might have failed due to dropped connection. ' + + 'Failing because `noRetry` option was passed to Meteor.apply.' + ) + ); + } + + // Only keep a method if it wasn't sent or it's allowed to retry. + // This may leave the block empty, but we don't move on to the next + // block until the callback has been delivered, in _outstandingMethodFinished. + return !(methodInvoker.sentMessage && methodInvoker.noRetry); + } + ); + } + + // Now, to minimize setup latency, go ahead and blast out all of + // our pending methods ands subscriptions before we've even taken + // the necessary RTT to know if we successfully reconnected. (1) + // They're supposed to be idempotent, and where they are not, + // they can block retry in apply; (2) even if we did reconnect, + // we're not sure what messages might have gotten lost + // (in either direction) since we were disconnected (TCP being + // sloppy about that.) + + // If the current block of methods all got their results (but didn't all get + // their data visible), discard the empty block now. + if ( + !_.isEmpty(self._outstandingMethodBlocks) && + _.isEmpty(self._outstandingMethodBlocks[0].methods) + ) { + self._outstandingMethodBlocks.shift(); + } + + // Mark all messages as unsent, they have not yet been sent on this + // connection. + _.each(self._methodInvokers, function(m) { + m.sentMessage = false; + }); + + // If an `onReconnect` handler is set, call it first. Go through + // some hoops to ensure that methods that are called from within + // `onReconnect` get executed _before_ ones that were originally + // outstanding (since `onReconnect` is used to re-establish auth + // certificates) + self._callOnReconnectAndSendAppropriateOutstandingMethods(); + + // add new subscriptions at the end. this way they take effect after + // the handlers and we don't see flicker. + _.each(self._subscriptions, function(sub, id) { + self._send({ + msg: 'sub', + id: id, + name: sub.name, + params: sub.params + }); + }); + }; + + var onDisconnect = function() { + if (self._heartbeat) { + self._heartbeat.stop(); + self._heartbeat = null; + } + }; + + if (Meteor.isServer) { + self._stream.on( + 'message', + Meteor.bindEnvironment(onMessage, 'handling DDP message') + ); + self._stream.on( + 'reset', + Meteor.bindEnvironment(onReset, 'handling DDP reset') + ); + self._stream.on( + 'disconnect', + Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect') + ); + } else { + self._stream.on('message', onMessage); + self._stream.on('reset', onReset); + self._stream.on('disconnect', onDisconnect); + } } -}; -_.extend(Connection.prototype, { // 'name' is the name of the data on the wire that should go in the // store. 'wrappedStore' should be an object with methods beginUpdate, update, // endUpdate, saveOriginals, retrieveOriginals. see Collection for an example. - registerStore: function(name, wrappedStore) { + registerStore(name, wrappedStore) { var self = this; if (name in self._stores) return false; @@ -455,7 +456,7 @@ _.extend(Connection.prototype, { } return true; - }, + } /** * @memberOf Meteor @@ -472,7 +473,7 @@ _.extend(Connection.prototype, { * argument to `onStop`. If a function is passed instead of an object, it * is interpreted as an `onReady` callback. */ - subscribe: function(name /* .. [arguments] .. (callback|callbacks) */) { + subscribe(name /* .. [arguments] .. (callback|callbacks) */) { var self = this; var params = Array.prototype.slice.call(arguments, 1); @@ -566,11 +567,11 @@ _.extend(Connection.prototype, { errorCallback: callbacks.onError, stopCallback: callbacks.onStop, connection: self, - remove: function() { + remove() { delete this.connection._subscriptions[this.id]; this.ready && this.readyDeps.changed(); }, - stop: function() { + stop() { this.connection._send({ msg: 'unsub', id: id }); this.remove(); @@ -584,12 +585,12 @@ _.extend(Connection.prototype, { // return a handle to the application. var handle = { - stop: function() { + stop() { if (!_.has(self._subscriptions, id)) return; self._subscriptions[id].stop(); }, - ready: function() { + ready() { // return false if we've unsubscribed. if (!_.has(self._subscriptions, id)) return false; var record = self._subscriptions[id]; @@ -621,23 +622,23 @@ _.extend(Connection.prototype, { } return handle; - }, + } // options: // - onLateError {Function(error)} called if an error was received after the ready event. // (errors received before ready cause an error to be thrown) - _subscribeAndWait: function(name, args, options) { + _subscribeAndWait(name, args, options) { var self = this; var f = new Future(); var ready = false; var handle; args = args || []; args.push({ - onReady: function() { + onReady() { ready = true; f['return'](); }, - onError: function(e) { + onError(e) { if (!ready) f['throw'](e); else options && options.onLateError && options.onLateError(e); } @@ -646,9 +647,9 @@ _.extend(Connection.prototype, { handle = self.subscribe.apply(self, [name].concat(args)); f.wait(); return handle; - }, + } - methods: function(methods) { + methods(methods) { var self = this; _.each(methods, function(func, name) { if (typeof func !== 'function') @@ -657,7 +658,7 @@ _.extend(Connection.prototype, { throw new Error("A method named '" + name + "' is already defined"); self._methodHandlers[name] = func; }); - }, + } /** * @memberOf Meteor @@ -668,14 +669,14 @@ _.extend(Connection.prototype, { * @param {EJSONable} [arg1,arg2...] Optional method arguments * @param {Function} [asyncCallback] Optional callback, which is called asynchronously with the error or result after the method is complete. If not provided, the method runs synchronously if possible (see below). */ - call: function(name /* .. [arguments] .. callback */) { + call(name /* .. [arguments] .. callback */) { // if it's a function, the last argument is the result callback, // not a parameter to the remote method. var args = Array.prototype.slice.call(arguments, 1); if (args.length && typeof args[args.length - 1] === 'function') var callback = args.pop(); return this.apply(name, args, callback); - }, + } // @param options {Optional Object} // wait: Boolean - Should we wait to call this until all current methods @@ -712,7 +713,7 @@ _.extend(Connection.prototype, { * @param {Boolean} options.throwStubExceptions (Client only) If true, exceptions thrown by method stubs will be thrown instead of logged, and the method will not be invoked on the server. * @param {Function} [asyncCallback] Optional callback; same semantics as in [`Meteor.call`](#meteor_call). */ - apply: function(name, args, options, callback) { + apply(name, args, options, callback) { var self = this; // We were passed 3 arguments. They may be either (name, args, options) @@ -789,7 +790,7 @@ _.extend(Connection.prototype, { isSimulation: true, userId: self.userId(), setUserId: setUserId, - randomSeed: function() { + randomSeed() { return randomSeedGenerator(); } }); @@ -924,22 +925,22 @@ _.extend(Connection.prototype, { return future.wait(); } return options.returnStubValue ? stubReturnValue : undefined; - }, + } // Before calling a method stub, prepare all stores to track changes and allow // _retrieveAndStoreOriginals to get the original versions of changed // documents. - _saveOriginals: function() { + _saveOriginals() { var self = this; if (!self._waitingForQuiescence()) self._flushBufferedWrites(); _.each(self._stores, function(s) { s.saveOriginals(); }); - }, + } // Retrieves the original versions of all documents modified by the stub for // method 'methodId' from all stores and saves them to _serverDocuments (keyed // by document) and _documentsWrittenByStub (keyed by method ID). - _retrieveAndStoreOriginals: function(methodId) { + _retrieveAndStoreOriginals(methodId) { var self = this; if (self._documentsWrittenByStub[methodId]) throw new Error('Duplicate methodId in _retrieveAndStoreOriginals'); @@ -970,11 +971,11 @@ _.extend(Connection.prototype, { if (!_.isEmpty(docsWritten)) { self._documentsWrittenByStub[methodId] = docsWritten; } - }, + } // This is very much a private function we use to make the tests // take up fewer server resources after they complete. - _unsubscribeAll: function() { + _unsubscribeAll() { var self = this; _.each(_.clone(self._subscriptions), function(sub, id) { // Avoid killing the autoupdate subscription so that developers @@ -987,21 +988,21 @@ _.extend(Connection.prototype, { self._subscriptions[id].stop(); } }); - }, + } // Sends the DDP stringification of the given message object - _send: function(obj) { + _send(obj) { var self = this; self._stream.send(DDPCommon.stringifyDDP(obj)); - }, + } // We detected via DDP-level heartbeats that we've lost the // connection. Unlike `disconnect` or `close`, a lost connection // will be automatically retried. - _lostConnection: function(error) { + _lostConnection(error) { var self = this; self._stream._lostConnection(error); - }, + } /** * @summary Get the current connection status. A reactive data source. @@ -1009,10 +1010,10 @@ _.extend(Connection.prototype, { * @memberOf Meteor * @importFromPackage meteor */ - status: function(/*passthrough args*/) { + status(/*passthrough args*/) { var self = this; return self._stream.status.apply(self._stream, arguments); - }, + } /** * @summary Force an immediate reconnection attempt if the client is not connected to the server. @@ -1022,10 +1023,10 @@ _.extend(Connection.prototype, { * @memberOf Meteor * @importFromPackage meteor */ - reconnect: function(/*passthrough args*/) { + reconnect(/*passthrough args*/) { var self = this; return self._stream.reconnect.apply(self._stream, arguments); - }, + } /** * @summary Disconnect the client from the server. @@ -1033,64 +1034,64 @@ _.extend(Connection.prototype, { * @memberOf Meteor * @importFromPackage meteor */ - disconnect: function(/*passthrough args*/) { + disconnect(/*passthrough args*/) { var self = this; return self._stream.disconnect.apply(self._stream, arguments); - }, + } - close: function() { + close() { var self = this; return self._stream.disconnect({ _permanent: true }); - }, + } /// /// Reactive user system /// - userId: function() { + userId() { var self = this; if (self._userIdDeps) self._userIdDeps.depend(); return self._userId; - }, + } - setUserId: function(userId) { + setUserId(userId) { var self = this; // Avoid invalidating dependents if setUserId is called with current value. if (self._userId === userId) return; self._userId = userId; if (self._userIdDeps) self._userIdDeps.changed(); - }, + } // Returns true if we are in a state after reconnect of waiting for subs to be // revived or early methods to finish their data, or we are waiting for a // "wait" method to finish. - _waitingForQuiescence: function() { + _waitingForQuiescence() { var self = this; return ( !_.isEmpty(self._subsBeingRevived) || !_.isEmpty(self._methodsBlockingQuiescence) ); - }, + } // Returns true if any method whose message has been sent to the server has // not yet invoked its user callback. - _anyMethodsAreOutstanding: function() { + _anyMethodsAreOutstanding() { var self = this; return _.any(_.pluck(self._methodInvokers, 'sentMessage')); - }, + } - _livedata_connected: function(msg) { + _livedata_connected(msg) { var self = this; if (self._version !== 'pre1' && self._heartbeatInterval !== 0) { self._heartbeat = new DDPCommon.Heartbeat({ heartbeatInterval: self._heartbeatInterval, heartbeatTimeout: self._heartbeatTimeout, - onTimeout: function() { + onTimeout() { self._lostConnection( new DDP.ConnectionError('DDP heartbeat timed out') ); }, - sendPing: function() { + sendPing() { self._send({ msg: 'ping' }); } }); @@ -1184,15 +1185,15 @@ _.extend(Connection.prototype, { } self._runAfterUpdateCallbacks(); } - }, + } - _processOneDataMessage: function(msg, updates) { + _processOneDataMessage(msg, updates) { var self = this; // Using underscore here so as not to need to capitalize. self['_process_' + msg.msg](msg, updates); - }, + } - _livedata_data: function(msg) { + _livedata_data(msg) { var self = this; if (self._waitingForQuiescence()) { @@ -1244,9 +1245,9 @@ _.extend(Connection.prototype, { self.__flushBufferedWrites, self._bufferedWritesInterval ); - }, + } - _flushBufferedWrites: function() { + _flushBufferedWrites() { var self = this; if (self._bufferedWritesFlushHandle) { clearTimeout(self._bufferedWritesFlushHandle); @@ -1260,9 +1261,9 @@ _.extend(Connection.prototype, { var writes = self._bufferedWrites; self._bufferedWrites = {}; self._performWrites(writes); - }, + } - _performWrites: function(updates) { + _performWrites(updates) { var self = this; if (self._resetStores || !_.isEmpty(updates)) { @@ -1303,36 +1304,36 @@ _.extend(Connection.prototype, { } self._runAfterUpdateCallbacks(); - }, + } // Call any callbacks deferred with _runWhenAllServerDocsAreFlushed whose // relevant docs have been flushed, as well as dataVisible callbacks at // reconnect-quiescence time. - _runAfterUpdateCallbacks: function() { + _runAfterUpdateCallbacks() { var self = this; var callbacks = self._afterUpdateCallbacks; self._afterUpdateCallbacks = []; _.each(callbacks, function(c) { c(); }); - }, + } - _pushUpdate: function(updates, collection, msg) { + _pushUpdate(updates, collection, msg) { var self = this; if (!_.has(updates, collection)) { updates[collection] = []; } updates[collection].push(msg); - }, + } - _getServerDoc: function(collection, id) { + _getServerDoc(collection, id) { var self = this; if (!_.has(self._serverDocuments, collection)) return null; var serverDocsForCollection = self._serverDocuments[collection]; return serverDocsForCollection.get(id) || null; - }, + } - _process_added: function(msg, updates) { + _process_added(msg, updates) { var self = this; var id = MongoID.idParse(msg.id); var serverDoc = self._getServerDoc(msg.collection, id); @@ -1358,9 +1359,9 @@ _.extend(Connection.prototype, { } else { self._pushUpdate(updates, msg.collection, msg); } - }, + } - _process_changed: function(msg, updates) { + _process_changed(msg, updates) { var self = this; var serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id)); if (serverDoc) { @@ -1370,9 +1371,9 @@ _.extend(Connection.prototype, { } else { self._pushUpdate(updates, msg.collection, msg); } - }, + } - _process_removed: function(msg, updates) { + _process_removed(msg, updates) { var self = this; var serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id)); if (serverDoc) { @@ -1387,9 +1388,9 @@ _.extend(Connection.prototype, { id: msg.id }); } - }, + } - _process_updated: function(msg, updates) { + _process_updated(msg, updates) { var self = this; // Process "method done" messages. _.each(msg.methods, function(methodId) { @@ -1441,9 +1442,9 @@ _.extend(Connection.prototype, { _.bind(callbackInvoker.dataVisible, callbackInvoker) ); }); - }, + } - _process_ready: function(msg, updates) { + _process_ready(msg, updates) { var self = this; // Process "sub ready" messages. "sub ready" messages don't take effect // until all current server documents have been flushed to the local @@ -1460,12 +1461,12 @@ _.extend(Connection.prototype, { subRecord.readyDeps.changed(); }); }); - }, + } // Ensures that "f" will be called after all documents currently in // _serverDocuments have been written to the local cache. f will not be called // if the connection is lost before then! - _runWhenAllServerDocsAreFlushed: function(f) { + _runWhenAllServerDocsAreFlushed(f) { var self = this; var runFAfterUpdates = function() { self._afterUpdateCallbacks.push(f); @@ -1499,9 +1500,9 @@ _.extend(Connection.prototype, { // round of updates is applied! runFAfterUpdates(); } - }, + } - _livedata_nosub: function(msg) { + _livedata_nosub(msg) { var self = this; // First pass it through _livedata_data, which only uses it to help get @@ -1540,17 +1541,17 @@ _.extend(Connection.prototype, { if (stopCallback) { stopCallback(meteorErrorFromMsg(msg)); } - }, + } - _process_nosub: function() { + _process_nosub() { // This is called as part of the "buffer until quiescence" process, but // nosub's effect is always immediate. It only goes in the buffer at all // because it's possible for a nosub to be the thing that triggers // quiescence, if we were waiting for a sub to be revived and it dies // instead. - }, + } - _livedata_result: function(msg) { + _livedata_result(msg) { // id, result or error. error has error (code), reason, details var self = this; @@ -1592,12 +1593,12 @@ _.extend(Connection.prototype, { // value m.receiveResult(undefined, msg.result); } - }, + } // Called by MethodInvoker after a method's callback is invoked. If this was // the last outstanding method in the current block, runs the next block. If // there are no more methods, consider accepting a hot code push. - _outstandingMethodFinished: function() { + _outstandingMethodFinished() { var self = this; if (self._anyMethodsAreOutstanding()) return; @@ -1619,24 +1620,24 @@ _.extend(Connection.prototype, { // Maybe accept a hot code push. self._maybeMigrate(); - }, + } // Sends messages for all the methods in the first block in // _outstandingMethodBlocks. - _sendOutstandingMethods: function() { + _sendOutstandingMethods() { var self = this; if (_.isEmpty(self._outstandingMethodBlocks)) return; _.each(self._outstandingMethodBlocks[0].methods, function(m) { m.sendMessage(); }); - }, + } - _livedata_error: function(msg) { + _livedata_error(msg) { Meteor._debug('Received error from server: ', msg.reason); if (msg.offendingMessage) Meteor._debug('For: ', msg.offendingMessage); - }, + } - _callOnReconnectAndSendAppropriateOutstandingMethods: function() { + _callOnReconnectAndSendAppropriateOutstandingMethods() { var self = this; var oldOutstandingMethodBlocks = self._outstandingMethodBlocks; self._outstandingMethodBlocks = []; @@ -1679,24 +1680,24 @@ _.extend(Connection.prototype, { _.each(oldOutstandingMethodBlocks, function(block) { self._outstandingMethodBlocks.push(block); }); - }, + } // We can accept a hot code push if there are no methods in flight. - _readyToMigrate: function() { + _readyToMigrate() { var self = this; return _.isEmpty(self._methodInvokers); - }, + } // If we were blocking a migration, see if it's now possible to continue. // Call whenever the set of outstanding/blocked methods shrinks. - _maybeMigrate: function() { + _maybeMigrate() { var self = this; if (self._retryMigrate && self._readyToMigrate()) { self._retryMigrate(); self._retryMigrate = null; } } -}); +} LivedataTest.Connection = Connection;