From b08fecbf205173cbbd9401cafc2407c74803873b Mon Sep 17 00:00:00 2001 From: italo jose Date: Thu, 11 Dec 2025 17:28:25 -0300 Subject: [PATCH] mongo: validate and refactor reactivity driver selection - Introduce availableDrivers and DEFAULT_REACTIVITY_ORDER (uses 'polling'). - Validate Meteor.settings.packages.mongo.reactivity if provided. - Add _getConfiguredReactivityOrder to normalize/validate configured order. - Extract _selectReactivityDriver to centralize async availability checks and selection. - Simplify driverChecks and remove duplicated selection logic in _observeChanges. - Use this consistently (instead of self) and fix multiplexer onStop closure. - Ensure observe driver receives correct mongoHandle and store multiplexers on this. - Add JSDoc for compareOperationTimes in mongo_common.js. --- .github/workflows/test-packages.yml | 4 +- packages/mongo/mongo_common.js | 27 +++ packages/mongo/mongo_connection.js | 218 ++++++++++-------- .../change-streams-observer-driver.md | 8 +- 4 files changed, 150 insertions(+), 107 deletions(-) diff --git a/.github/workflows/test-packages.yml b/.github/workflows/test-packages.yml index 880da3be8d..055619c083 100644 --- a/.github/workflows/test-packages.yml +++ b/.github/workflows/test-packages.yml @@ -10,8 +10,8 @@ jobs: fail-fast: false matrix: reactivity_order: - - 'changeStreams,pooling' - - 'oplog,pooling' + - 'changeStreams,polling' + - 'oplog,polling' runs-on: ubuntu-22.04 concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/packages/mongo/mongo_common.js b/packages/mongo/mongo_common.js index 9fdca6ad48..28b5a397ba 100644 --- a/packages/mongo/mongo_common.js +++ b/packages/mongo/mongo_common.js @@ -169,6 +169,33 @@ export function replaceNames(filter, thing) { return thing; } + +/** + * Compares two MongoDB operation times. + * @param {MongoDB.Timestamp|object} opTime1 - The first operation time to compare. + * @param {MongoDB.Timestamp|object} opTime2 - The second operation time to compare. + * @returns {number} - Returns a number indicating the comparison result: + * - A negative number if opTime1 is less than opTime2. + * - Zero if opTime1 is equal to opTime2. + * - A positive number if opTime1 is greater than opTime2. + */ +/** + * Compares two MongoDB operation times (opTimes). + * + * Both parameters accept any value accepted by the `MongoDB.Timestamp` constructor: + * - a `Long` (e.g., `new Timestamp(Long)`), + * - an object of the form `{ t: number, i: number }`, + * - or the legacy two-number form `low, high` (via `Timestamp(low, high)`), which is deprecated; + * prefer `{ t, i }` or a `Long`. + * + * The function constructs a `MongoDB.Timestamp` from `opTime1` and compares it to `opTime2` + * using `Timestamp#compare`. + * + * @param {MongoDB.Long|{t:number,i:number}|Array|number} opTime1 - Operation time 1; any value accepted by `MongoDB.Timestamp`. + * For the two-number form you may provide an array `[low, high]`, but passing two separate numbers to the constructor is deprecated. + * @param {MongoDB.Long|{t:number,i:number}|Array|number} opTime2 - Operation time 2; same accepted forms as `opTime1`. + * @returns {number} Comparison result: negative if `opTime1` < `opTime2`, zero if equal, positive if `opTime1` > `opTime2`. + */ export function compareOperationTimes(opTime1, opTime2) { return (new MongoDB.Timestamp(opTime1)).compare(opTime2); } diff --git a/packages/mongo/mongo_connection.js b/packages/mongo/mongo_connection.js index 30020f9371..96e89cdbe2 100644 --- a/packages/mongo/mongo_connection.js +++ b/packages/mongo/mongo_connection.js @@ -20,7 +20,18 @@ const APP_FOLDER = 'app'; const oplogCollectionWarnings = []; // Oplog continues to be the default when we do not have a specific preference; we expect to change it in the future before an oplog deprecation. -const DEFAULT_REACTIVITY_ORDER = process.env.METEOR_REACTIVITY_ORDER ? process.env.METEOR_REACTIVITY_ORDER.split(',') : ['oplog', 'changeStreams', 'pooling']; +const availableDrivers = ['oplog', 'polling', 'changeStreams'] +const DEFAULT_REACTIVITY_ORDER = process.env.METEOR_REACTIVITY_ORDER ? process.env.METEOR_REACTIVITY_ORDER.split(',') : availableDrivers; + +const reactivitySetting = Meteor.settings?.packages?.mongo?.reactivity; +if (Array.isArray(reactivitySetting)) { + for (const method of reactivitySetting) { + if (!availableDrivers.includes(method)) { + throw new Error(`Invalid Mongo reactivity method in settings: ${method}`); + } + } +} + export const MongoConnection = function (url, options) { var self = this; options = options || {}; @@ -800,22 +811,96 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo }; }; -Object.assign(MongoConnection.prototype, { - _observeChanges: async function ( +const driverClasses = { + changeStreams: ChangeStreamObserveDriver, + oplog: OplogObserveDriver, + polling: PollingObserveDriver, +}; + +function _getConfiguredReactivityOrder () { + const reactivitySetting = Meteor.settings?.packages?.mongo?.reactivity; + const isArraySetting = Array.isArray(reactivitySetting); + const isStringSetting = typeof reactivitySetting === 'string'; + const hasCustomDriverOrder = isArraySetting || isStringSetting; + + if (reactivitySetting && !hasCustomDriverOrder) { + throw new Error('Meteor.settings.packages.mongo.reactivity must be a string or an array of observer drivers'); + } + + let configuredOrder = DEFAULT_REACTIVITY_ORDER; + if (hasCustomDriverOrder) { + if (isStringSetting) { + configuredOrder = [reactivitySetting]; + } else { + configuredOrder = []; + for (const name of reactivitySetting) { + if (!configuredOrder.includes(name)) { + configuredOrder.push(name); + } + } + } + } + + const invalidDriverNames = configuredOrder.filter(name => !driverClasses[name]); + if (invalidDriverNames.length) { + throw new Error(`Invalid Mongo reactivity driver(s): ${invalidDriverNames.join(', ')}`); + } + + if (hasCustomDriverOrder && configuredOrder.length === 0) { + throw new Error('Meteor.settings.packages.mongo.reactivity must specify at least one observer driver'); + } + + return configuredOrder; +}; + +MongoConnection.prototype._selectReactivityDriver = async function (configuredOrder, driverChecks) { + const availabilityErrors = []; + let driverClass; + let matcher; + let sorter; + + for (const driverName of configuredOrder) { + const checker = driverChecks[driverName]; + + if (!checker) { + availabilityErrors.push(`Unknown driver "${driverName}"`); + continue; + } + + const result = await checker(); + + if (result.available) { + matcher = result.matcher; + sorter = result.sorter; + driverClass = driverClasses[driverName]; + break; + } + + if (result.reason) { + availabilityErrors.push(`${driverName}: ${result.reason}`); + } + } + + return { + driverClass, + matcher, + sorter, + }; +}; + +MongoConnection.prototype._observeChanges = async function ( cursorDescription, ordered, callbacks, nonMutatingCallbacks) { - var self = this; const collectionName = cursorDescription.collectionName; if (cursorDescription.options.tailable) { - return self._observeChangesTailable(cursorDescription, ordered, callbacks); + return this._observeChangesTailable(cursorDescription, ordered, callbacks); } // You may not filter out _id when observing changes, because the id is a core // part of the observeChanges API. const fieldsOptions = cursorDescription.options.projection || cursorDescription.options.fields; - if (fieldsOptions && - (fieldsOptions._id === 0 || - fieldsOptions._id === false)) { + if (fieldsOptions?._id === 0 || + fieldsOptions?._id === false) { throw Error("You may not observe a cursor with {fields: {_id: 0}}"); } @@ -828,15 +913,15 @@ Object.assign(MongoConnection.prototype, { // Find a matching ObserveMultiplexer, or create a new one. This next block is // guaranteed to not yield (and it doesn't call anything that can observe a // new query), so no other calls to this function can interleave with it. - if (observeKey in self._observeMultiplexers) { - multiplexer = self._observeMultiplexers[observeKey]; + if (observeKey in this._observeMultiplexers) { + multiplexer = this._observeMultiplexers[observeKey]; } else { firstHandle = true; // Create a new ObserveMultiplexer. multiplexer = new ObserveMultiplexer({ ordered: ordered, - onStop: function () { - delete self._observeMultiplexers[observeKey]; + onStop: () => { + delete this._observeMultiplexers[observeKey]; return observeDriver.stop(); } }); @@ -847,62 +932,23 @@ Object.assign(MongoConnection.prototype, { nonMutatingCallbacks, ); - const oplogOptions = self?._oplogHandle?._oplogOptions || {}; + const oplogOptions = (this._oplogHandle && this._oplogHandle._oplogOptions) || {}; const { includeCollections, excludeCollections } = oplogOptions; if (firstHandle) { var matcher, sorter; - const reactivitySetting = Meteor.settings?.packages?.mongo?.reactivity; - const isArraySetting = Array.isArray(reactivitySetting); - const isStringSetting = typeof reactivitySetting === 'string'; - const hasCustomDriverOrder = isArraySetting || isStringSetting; - - if (reactivitySetting && !hasCustomDriverOrder) { - throw new Error('Meteor.settings.packages.mongo.reactivity must be a string or an array of observer drivers'); - } - - const driverClasses = { - changeStreams: ChangeStreamObserveDriver, - oplog: OplogObserveDriver, - polling: PollingObserveDriver, - pooling: PollingObserveDriver, - }; - - let configuredOrder; - if (hasCustomDriverOrder) { - if (isStringSetting) { - configuredOrder = [reactivitySetting]; - } else { - configuredOrder = []; - for (const name of reactivitySetting) { - if (!configuredOrder.includes(name)) { - configuredOrder.push(name); - } - } - } - } else { - configuredOrder = DEFAULT_REACTIVITY_ORDER; - } - - const invalidDriverNames = configuredOrder.filter(name => !driverClasses[name]); - if (invalidDriverNames.length) { - throw new Error(`Invalid Mongo reactivity driver(s): ${invalidDriverNames.join(', ')}`); - } - - if (hasCustomDriverOrder && configuredOrder.length === 0) { - throw new Error('Meteor.settings.packages.mongo.reactivity must specify at least one observer driver'); - } + const configuredOrder = _getConfiguredReactivityOrder(); const driverChecks = { changeStreams: async () => { let localMatcher; const reasons = []; - if (self._supportsChangeStreams === undefined) { + if (this._supportsChangeStreams === undefined) { const serverReasons = []; try { // Change Streams require MongoDB 3.6+ and replica set or sharded cluster - const admin = self.db.admin(); + const admin = this.db.admin(); const serverInfo = await admin.serverInfo(); const isMasterPromise = admin.command({ isMaster: 1 }); const versionString = serverInfo.version || 'unknown'; @@ -930,13 +976,13 @@ Object.assign(MongoConnection.prototype, { serverReasons.push(`Error checking Change Stream support: ${error.message}`); } - self._changeStreamServerReasons = serverReasons; - self._supportsChangeStreams = serverReasons.length === 0; + this._changeStreamServerReasons = serverReasons; + this._supportsChangeStreams = serverReasons.length === 0; } - if (!self._supportsChangeStreams) { - if (self._changeStreamServerReasons?.length) { - reasons.push(...self._changeStreamServerReasons); + if (!this._supportsChangeStreams) { + if (this._changeStreamServerReasons?.length) { + reasons.push(...this._changeStreamServerReasons); } else { reasons.push('Change Streams not supported by MongoDB deployment'); } @@ -980,7 +1026,7 @@ Object.assign(MongoConnection.prototype, { let localMatcher; let localSorter; - if (!(self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback)) { + if (!(this._oplogHandle && !ordered && !callbacks._testOnlyPollCallback)) { reasons.push('Oplog tailing not available for this cursor'); } @@ -1035,48 +1081,20 @@ Object.assign(MongoConnection.prototype, { }; }, polling: () => ({ available: true }), - pooling: () => ({ available: true }), }; - const availabilityErrors = []; - var driverClass; - var selectedDriverName; + const { + driverClass, + matcher: selectedMatcher, + sorter: selectedSorter, + } = await this._selectReactivityDriver(configuredOrder, driverChecks); - for (const driverName of configuredOrder) { - const checker = driverChecks[driverName]; + matcher = selectedMatcher; + sorter = selectedSorter; - if (!checker) { - availabilityErrors.push(`Unknown driver "${driverName}"`); - continue; - } - - const result = await checker(); - - if (result.available) { - selectedDriverName = driverName; - matcher = result.matcher; - sorter = result.sorter; - driverClass = driverClasses[driverName]; - break; - } - - if (result.reason) { - availabilityErrors.push(`${driverName}: ${result.reason}`); - } - } - - if (!driverClass) { - const errorDetails = availabilityErrors.length - ? ` Reasons: ${availabilityErrors.join(' | ')}` - : ''; - - throw new Error(`Unable to select a Mongo reactivity driver from configuration [${configuredOrder.join(', ')}].${errorDetails}`); - } - - // Meteor._debug(`Using ${selectedDriverName || driverClass.name} for observing changes on collection ${collectionName} (configured order: ${configuredOrder.join(', ')})`); observeDriver = new driverClass({ cursorDescription, - mongoHandle: self, + mongoHandle: this, multiplexer, ordered, matcher, // ignored by polling @@ -1091,11 +1109,9 @@ Object.assign(MongoConnection.prototype, { // This field is only set for use in tests. multiplexer._observeDriver = observeDriver; } - self._observeMultiplexers[observeKey] = multiplexer; + this._observeMultiplexers[observeKey] = multiplexer; // Blocks until the initial adds have been sent. await multiplexer.addHandleAndSendInitialAdds(observeHandle); return observeHandle; - }, - -}); + } diff --git a/v3-docs/docs/performance/change-streams-observer-driver.md b/v3-docs/docs/performance/change-streams-observer-driver.md index 409bb28f31..d88ddaa053 100644 --- a/v3-docs/docs/performance/change-streams-observer-driver.md +++ b/v3-docs/docs/performance/change-streams-observer-driver.md @@ -15,16 +15,16 @@ Before moving production traffic to Change Streams, validate that your MongoDB d ## Choosing the Reactivity Driver Order -Meteor picks the first available driver from the configured list. The default order is `oplog`, then `changeStreams`, then `pooling` (long polling). You can change this globally: +Meteor picks the first available driver from the configured list. The default order is `oplog`, then `changeStreams`, then `polling` (long polling). You can change this globally: -- Environment variable: `METEOR_REACTIVITY_ORDER=changeStreams,oplog,pooling` +- Environment variable: `METEOR_REACTIVITY_ORDER=changeStreams,oplog,polling` - Settings file: ```json { "packages": { "mongo": { - "reactivity": ["changeStreams", "oplog", "pooling"] + "reactivity": ["changeStreams", "oplog", "polling"] } } } @@ -33,7 +33,7 @@ Meteor picks the first available driver from the configured list. The default or Tips: - Put `changeStreams` first when you cannot or do not want to tail the oplog (e.g., Atlas Serverless). - Remove `changeStreams` from the list if you want to disable it. -- Valid entries are `oplog`, `changeStreams`, and `polling`/`pooling` (alias). +- Valid entries are `oplog`, `changeStreams`, and `polling`/`polling` (alias). ## Change Stream Driver Settings