From 4fa09c32cdc5a63ade9038f0e773f8ddfe35fc58 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Tue, 26 Nov 2013 16:31:18 -0800 Subject: [PATCH] extract MongoPollster to its own file --- packages/mongo-livedata/mongo_driver.js | 186 ---------------------- packages/mongo-livedata/mongo_pollster.js | 185 +++++++++++++++++++++ packages/mongo-livedata/package.js | 2 +- 3 files changed, 186 insertions(+), 187 deletions(-) create mode 100644 packages/mongo-livedata/mongo_pollster.js diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index f382ae9af9..e4010e9740 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -1231,192 +1231,6 @@ forEachTrigger = function (cursorDescription, triggerCallback) { } }; -var MongoPollster = function (cursorDescription, mongoHandle, ordered, - multiplexer, testOnlyPollCallback) { - var self = this; - - self._cursorDescription = cursorDescription; - self._mongoHandle = mongoHandle; - self._ordered = ordered; - self._multiplexer = multiplexer; - self._stopCallbacks = []; - self._stopped = false; - - // This constructor cannot yield, so we don't create the synchronousCursor yet - // (since that can yield). - self._synchronousCursor = null; - - // previous results snapshot. on each poll cycle, diffs against - // results drives the callbacks. - self._results = null; - - // The number of _pollMongo calls that have been added to self._taskQueue but - // have not started running. Used to make sure we never schedule more than one - // _pollMongo (other than possibly the one that is currently running). It's - // also used by _suspendPolling to pretend there's a poll scheduled. Usually, - // it's either 0 (for "no polls scheduled other than maybe one currently - // running") or 1 (for "a poll scheduled that isn't running yet"), but it can - // also be 2 if incremented by _suspendPolling. - self._pollsScheduledButNotStarted = 0; - self._pendingWrites = []; // people to notify when polling completes - - // Make sure to create a separately throttled function for each MongoPollster - // object. - self._ensurePollIsScheduled = _.throttle( - self._unthrottledEnsurePollIsScheduled, 50 /* ms */); - - // XXX figure out if we still need a queue - self._taskQueue = new Meteor._SynchronousQueue(); - - var listenersHandle = listenAll( - cursorDescription, function (notification, complete) { - // When someone does a transaction that might affect us, schedule a poll - // of the database. If that transaction happens inside of a write fence, - // block the fence until we've polled and notified observers. - var fence = DDPServer._CurrentWriteFence.get(); - if (fence) - self._pendingWrites.push(fence.beginWrite()); - // Ensure a poll is scheduled... but if we already know that one is, - // don't hit the throttled _ensurePollIsScheduled function (which might - // lead to us calling it unnecessarily in 50ms). - if (self._pollsScheduledButNotStarted === 0) - self._ensurePollIsScheduled(); - complete(); - } - ); - self._stopCallbacks.push(function () { listenersHandle.stop(); }); - - // every once and a while, poll even if we don't think we're dirty, for - // eventual consistency with database writes from outside the Meteor - // universe. - // - // For testing, there's an undocumented callback argument to observeChanges - // which disables time-based polling and gets called at the beginning of each - // poll. - if (testOnlyPollCallback) { - self._testOnlyPollCallback = testOnlyPollCallback; - } else { - var intervalHandle = Meteor.setInterval( - _.bind(self._ensurePollIsScheduled, self), 10 * 1000); - self._stopCallbacks.push(function () { - Meteor.clearInterval(intervalHandle); - }); - } - - // Make sure we actually poll soon! - self._unthrottledEnsurePollIsScheduled(); - - Package.facts && Package.facts.Facts.incrementServerFact( - "mongo-livedata", "mongo-pollsters", 1); -}; - -_.extend(MongoPollster.prototype, { - // This is always called through _.throttle (except once at startup). - _unthrottledEnsurePollIsScheduled: function () { - var self = this; - if (self._pollsScheduledButNotStarted > 0) - return; - ++self._pollsScheduledButNotStarted; - self._taskQueue.queueTask(function () { - self._pollMongo(); - }); - }, - - // test-only interface for controlling polling. - // - // _suspendPolling blocks until any currently running and scheduled polls are - // done, and prevents any further polls from being scheduled. (new - // ObserveHandles can be added and receive their initial added callbacks, - // though.) - // - // _resumePolling immediately polls, and allows further polls to occur. - _suspendPolling: function() { - var self = this; - // Pretend that there's another poll scheduled (which will prevent - // _ensurePollIsScheduled from queueing any more polls). - ++self._pollsScheduledButNotStarted; - // Now block until all currently running or scheduled polls are done. - self._taskQueue.runTask(function() {}); - - // Confirm that there is only one "poll" (the fake one we're pretending to - // have) scheduled. - if (self._pollsScheduledButNotStarted !== 1) - throw new Error("_pollsScheduledButNotStarted is " + - self._pollsScheduledButNotStarted); - }, - _resumePolling: function() { - var self = this; - // We should be in the same state as in the end of _suspendPolling. - if (self._pollsScheduledButNotStarted !== 1) - throw new Error("_pollsScheduledButNotStarted is " + - self._pollsScheduledButNotStarted); - // Run a poll synchronously (which will counteract the - // ++_pollsScheduledButNotStarted from _suspendPolling). - self._taskQueue.runTask(function () { - self._pollMongo(); - }); - }, - - _pollMongo: function () { - var self = this; - --self._pollsScheduledButNotStarted; - - var first = false; - if (!self._results) { - first = true; - // XXX maybe use _IdMap/OrderedDict instead? - self._results = self.ordered ? [] : {}; - } - - self._testOnlyPollCallback && self._testOnlyPollCallback(); - - // Save the list of pending writes which this round will commit. - var writesForCycle = self._pendingWrites; - self._pendingWrites = []; - - // Get the new query results. (These calls can yield.) - if (self._synchronousCursor) { - self._synchronousCursor.rewind(); - } else { - self._synchronousCursor = self._mongoHandle._createSynchronousCursor( - self._cursorDescription); - } - var newResults = self._synchronousCursor.getRawObjects(self._ordered); - var oldResults = self._results; - - // Run diffs. (This can yield too.) - if (!self._stopped) { - LocalCollection._diffQueryChanges( - self._ordered, oldResults, newResults, self._multiplexer); - } - - // Replace self._results atomically. - self._results = newResults; - - // Signals the multiplexer to call all initial adds. - if (first) - self._multiplexer.ready(); - - // Once the ObserveMultiplexer has processed everything we've done in this - // round, mark all the writes which existed before this call as - // commmitted. (If new writes have shown up in the meantime, there'll - // already be another _pollMongo task scheduled.) - self._multiplexer.onFlush(function () { - _.each(writesForCycle, function (w) { - w.committed(); - }); - }); - }, - - stop: function () { - var self = this; - self._stopped = true; - _.each(self._stopCallbacks, function (c) { c(); }); - Package.facts && Package.facts.Facts.incrementServerFact( - "mongo-livedata", "mongo-pollsters", -1); - } -}); - // observeChanges for tailable cursors on capped collections. // // Some differences from normal cursors: diff --git a/packages/mongo-livedata/mongo_pollster.js b/packages/mongo-livedata/mongo_pollster.js new file mode 100644 index 0000000000..b956d20cb5 --- /dev/null +++ b/packages/mongo-livedata/mongo_pollster.js @@ -0,0 +1,185 @@ +MongoPollster = function (cursorDescription, mongoHandle, ordered, + multiplexer, testOnlyPollCallback) { + var self = this; + + self._cursorDescription = cursorDescription; + self._mongoHandle = mongoHandle; + self._ordered = ordered; + self._multiplexer = multiplexer; + self._stopCallbacks = []; + self._stopped = false; + + // This constructor cannot yield, so we don't create the synchronousCursor yet + // (since that can yield). + self._synchronousCursor = null; + + // previous results snapshot. on each poll cycle, diffs against + // results drives the callbacks. + self._results = null; + + // The number of _pollMongo calls that have been added to self._taskQueue but + // have not started running. Used to make sure we never schedule more than one + // _pollMongo (other than possibly the one that is currently running). It's + // also used by _suspendPolling to pretend there's a poll scheduled. Usually, + // it's either 0 (for "no polls scheduled other than maybe one currently + // running") or 1 (for "a poll scheduled that isn't running yet"), but it can + // also be 2 if incremented by _suspendPolling. + self._pollsScheduledButNotStarted = 0; + self._pendingWrites = []; // people to notify when polling completes + + // Make sure to create a separately throttled function for each MongoPollster + // object. + self._ensurePollIsScheduled = _.throttle( + self._unthrottledEnsurePollIsScheduled, 50 /* ms */); + + // XXX figure out if we still need a queue + self._taskQueue = new Meteor._SynchronousQueue(); + + var listenersHandle = listenAll( + cursorDescription, function (notification, complete) { + // When someone does a transaction that might affect us, schedule a poll + // of the database. If that transaction happens inside of a write fence, + // block the fence until we've polled and notified observers. + var fence = DDPServer._CurrentWriteFence.get(); + if (fence) + self._pendingWrites.push(fence.beginWrite()); + // Ensure a poll is scheduled... but if we already know that one is, + // don't hit the throttled _ensurePollIsScheduled function (which might + // lead to us calling it unnecessarily in 50ms). + if (self._pollsScheduledButNotStarted === 0) + self._ensurePollIsScheduled(); + complete(); + } + ); + self._stopCallbacks.push(function () { listenersHandle.stop(); }); + + // every once and a while, poll even if we don't think we're dirty, for + // eventual consistency with database writes from outside the Meteor + // universe. + // + // For testing, there's an undocumented callback argument to observeChanges + // which disables time-based polling and gets called at the beginning of each + // poll. + if (testOnlyPollCallback) { + self._testOnlyPollCallback = testOnlyPollCallback; + } else { + var intervalHandle = Meteor.setInterval( + _.bind(self._ensurePollIsScheduled, self), 10 * 1000); + self._stopCallbacks.push(function () { + Meteor.clearInterval(intervalHandle); + }); + } + + // Make sure we actually poll soon! + self._unthrottledEnsurePollIsScheduled(); + + Package.facts && Package.facts.Facts.incrementServerFact( + "mongo-livedata", "mongo-pollsters", 1); +}; + +_.extend(MongoPollster.prototype, { + // This is always called through _.throttle (except once at startup). + _unthrottledEnsurePollIsScheduled: function () { + var self = this; + if (self._pollsScheduledButNotStarted > 0) + return; + ++self._pollsScheduledButNotStarted; + self._taskQueue.queueTask(function () { + self._pollMongo(); + }); + }, + + // test-only interface for controlling polling. + // + // _suspendPolling blocks until any currently running and scheduled polls are + // done, and prevents any further polls from being scheduled. (new + // ObserveHandles can be added and receive their initial added callbacks, + // though.) + // + // _resumePolling immediately polls, and allows further polls to occur. + _suspendPolling: function() { + var self = this; + // Pretend that there's another poll scheduled (which will prevent + // _ensurePollIsScheduled from queueing any more polls). + ++self._pollsScheduledButNotStarted; + // Now block until all currently running or scheduled polls are done. + self._taskQueue.runTask(function() {}); + + // Confirm that there is only one "poll" (the fake one we're pretending to + // have) scheduled. + if (self._pollsScheduledButNotStarted !== 1) + throw new Error("_pollsScheduledButNotStarted is " + + self._pollsScheduledButNotStarted); + }, + _resumePolling: function() { + var self = this; + // We should be in the same state as in the end of _suspendPolling. + if (self._pollsScheduledButNotStarted !== 1) + throw new Error("_pollsScheduledButNotStarted is " + + self._pollsScheduledButNotStarted); + // Run a poll synchronously (which will counteract the + // ++_pollsScheduledButNotStarted from _suspendPolling). + self._taskQueue.runTask(function () { + self._pollMongo(); + }); + }, + + _pollMongo: function () { + var self = this; + --self._pollsScheduledButNotStarted; + + var first = false; + if (!self._results) { + first = true; + // XXX maybe use _IdMap/OrderedDict instead? + self._results = self.ordered ? [] : {}; + } + + self._testOnlyPollCallback && self._testOnlyPollCallback(); + + // Save the list of pending writes which this round will commit. + var writesForCycle = self._pendingWrites; + self._pendingWrites = []; + + // Get the new query results. (These calls can yield.) + if (self._synchronousCursor) { + self._synchronousCursor.rewind(); + } else { + self._synchronousCursor = self._mongoHandle._createSynchronousCursor( + self._cursorDescription); + } + var newResults = self._synchronousCursor.getRawObjects(self._ordered); + var oldResults = self._results; + + // Run diffs. (This can yield too.) + if (!self._stopped) { + LocalCollection._diffQueryChanges( + self._ordered, oldResults, newResults, self._multiplexer); + } + + // Replace self._results atomically. + self._results = newResults; + + // Signals the multiplexer to call all initial adds. + if (first) + self._multiplexer.ready(); + + // Once the ObserveMultiplexer has processed everything we've done in this + // round, mark all the writes which existed before this call as + // commmitted. (If new writes have shown up in the meantime, there'll + // already be another _pollMongo task scheduled.) + self._multiplexer.onFlush(function () { + _.each(writesForCycle, function (w) { + w.committed(); + }); + }); + }, + + stop: function () { + var self = this; + self._stopped = true; + _.each(self._stopCallbacks, function (c) { c(); }); + Package.facts && Package.facts.Facts.incrementServerFact( + "mongo-livedata", "mongo-pollsters", -1); + } +}); diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index 6cb718001a..2b0061ef15 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -43,7 +43,7 @@ Package.on_use(function (api) { api.export('MongoTest', 'server', {testOnly: true}); api.add_files(['doc_fetcher.js', 'mongo_driver.js', 'observe_multiplex.js', - 'oplog.js'], 'server'); + 'mongo_pollster.js', 'oplog.js'], 'server'); api.add_files('local_collection_driver.js', ['client', 'server']); api.add_files('remote_collection_driver.js', 'server'); api.add_files('collection.js', ['client', 'server']);