mongo: validate and refactor reactivity driver selection

- Introduce availableDrivers and DEFAULT_REACTIVITY_ORDER (uses 'polling').
- Validate Meteor.settings.packages.mongo.reactivity if provided.
- Add _getConfiguredReactivityOrder to normalize/validate configured order.
- Extract _selectReactivityDriver to centralize async availability checks and selection.
- Simplify driverChecks and remove duplicated selection logic in _observeChanges.
- Use this consistently (instead of self) and fix multiplexer onStop closure.
- Ensure observe driver receives correct mongoHandle and store multiplexers on this.
- Add JSDoc for compareOperationTimes in mongo_common.js.
This commit is contained in:
italo jose
2025-12-11 17:28:25 -03:00
parent 98ab215894
commit b08fecbf20
4 changed files with 150 additions and 107 deletions

View File

@@ -10,8 +10,8 @@ jobs:
fail-fast: false
matrix:
reactivity_order:
- 'changeStreams,pooling'
- 'oplog,pooling'
- 'changeStreams,polling'
- 'oplog,polling'
runs-on: ubuntu-22.04
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}

View File

@@ -169,6 +169,33 @@ export function replaceNames(filter, thing) {
return thing;
}
/**
* Compares two MongoDB operation times.
* @param {MongoDB.Timestamp|object} opTime1 - The first operation time to compare.
* @param {MongoDB.Timestamp|object} opTime2 - The second operation time to compare.
* @returns {number} - Returns a number indicating the comparison result:
* - A negative number if opTime1 is less than opTime2.
* - Zero if opTime1 is equal to opTime2.
* - A positive number if opTime1 is greater than opTime2.
*/
/**
* Compares two MongoDB operation times (opTimes).
*
* Both parameters accept any value accepted by the `MongoDB.Timestamp` constructor:
* - a `Long` (e.g., `new Timestamp(Long)`),
* - an object of the form `{ t: number, i: number }`,
* - or the legacy two-number form `low, high` (via `Timestamp(low, high)`), which is deprecated;
* prefer `{ t, i }` or a `Long`.
*
* The function constructs a `MongoDB.Timestamp` from `opTime1` and compares it to `opTime2`
* using `Timestamp#compare`.
*
* @param {MongoDB.Long|{t:number,i:number}|Array<number>|number} opTime1 - Operation time 1; any value accepted by `MongoDB.Timestamp`.
* For the two-number form you may provide an array `[low, high]`, but passing two separate numbers to the constructor is deprecated.
* @param {MongoDB.Long|{t:number,i:number}|Array<number>|number} opTime2 - Operation time 2; same accepted forms as `opTime1`.
* @returns {number} Comparison result: negative if `opTime1` < `opTime2`, zero if equal, positive if `opTime1` > `opTime2`.
*/
export function compareOperationTimes(opTime1, opTime2) {
return (new MongoDB.Timestamp(opTime1)).compare(opTime2);
}

View File

@@ -20,7 +20,18 @@ const APP_FOLDER = 'app';
const oplogCollectionWarnings = [];
// Oplog continues to be the default when we do not have a specific preference; we expect to change it in the future before an oplog deprecation.
const DEFAULT_REACTIVITY_ORDER = process.env.METEOR_REACTIVITY_ORDER ? process.env.METEOR_REACTIVITY_ORDER.split(',') : ['oplog', 'changeStreams', 'pooling'];
const availableDrivers = ['oplog', 'polling', 'changeStreams']
const DEFAULT_REACTIVITY_ORDER = process.env.METEOR_REACTIVITY_ORDER ? process.env.METEOR_REACTIVITY_ORDER.split(',') : availableDrivers;
const reactivitySetting = Meteor.settings?.packages?.mongo?.reactivity;
if (Array.isArray(reactivitySetting)) {
for (const method of reactivitySetting) {
if (!availableDrivers.includes(method)) {
throw new Error(`Invalid Mongo reactivity method in settings: ${method}`);
}
}
}
export const MongoConnection = function (url, options) {
var self = this;
options = options || {};
@@ -800,22 +811,96 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo
};
};
Object.assign(MongoConnection.prototype, {
_observeChanges: async function (
const driverClasses = {
changeStreams: ChangeStreamObserveDriver,
oplog: OplogObserveDriver,
polling: PollingObserveDriver,
};
function _getConfiguredReactivityOrder () {
const reactivitySetting = Meteor.settings?.packages?.mongo?.reactivity;
const isArraySetting = Array.isArray(reactivitySetting);
const isStringSetting = typeof reactivitySetting === 'string';
const hasCustomDriverOrder = isArraySetting || isStringSetting;
if (reactivitySetting && !hasCustomDriverOrder) {
throw new Error('Meteor.settings.packages.mongo.reactivity must be a string or an array of observer drivers');
}
let configuredOrder = DEFAULT_REACTIVITY_ORDER;
if (hasCustomDriverOrder) {
if (isStringSetting) {
configuredOrder = [reactivitySetting];
} else {
configuredOrder = [];
for (const name of reactivitySetting) {
if (!configuredOrder.includes(name)) {
configuredOrder.push(name);
}
}
}
}
const invalidDriverNames = configuredOrder.filter(name => !driverClasses[name]);
if (invalidDriverNames.length) {
throw new Error(`Invalid Mongo reactivity driver(s): ${invalidDriverNames.join(', ')}`);
}
if (hasCustomDriverOrder && configuredOrder.length === 0) {
throw new Error('Meteor.settings.packages.mongo.reactivity must specify at least one observer driver');
}
return configuredOrder;
};
MongoConnection.prototype._selectReactivityDriver = async function (configuredOrder, driverChecks) {
const availabilityErrors = [];
let driverClass;
let matcher;
let sorter;
for (const driverName of configuredOrder) {
const checker = driverChecks[driverName];
if (!checker) {
availabilityErrors.push(`Unknown driver "${driverName}"`);
continue;
}
const result = await checker();
if (result.available) {
matcher = result.matcher;
sorter = result.sorter;
driverClass = driverClasses[driverName];
break;
}
if (result.reason) {
availabilityErrors.push(`${driverName}: ${result.reason}`);
}
}
return {
driverClass,
matcher,
sorter,
};
};
MongoConnection.prototype._observeChanges = async function (
cursorDescription, ordered, callbacks, nonMutatingCallbacks) {
var self = this;
const collectionName = cursorDescription.collectionName;
if (cursorDescription.options.tailable) {
return self._observeChangesTailable(cursorDescription, ordered, callbacks);
return this._observeChangesTailable(cursorDescription, ordered, callbacks);
}
// You may not filter out _id when observing changes, because the id is a core
// part of the observeChanges API.
const fieldsOptions = cursorDescription.options.projection || cursorDescription.options.fields;
if (fieldsOptions &&
(fieldsOptions._id === 0 ||
fieldsOptions._id === false)) {
if (fieldsOptions?._id === 0 ||
fieldsOptions?._id === false) {
throw Error("You may not observe a cursor with {fields: {_id: 0}}");
}
@@ -828,15 +913,15 @@ Object.assign(MongoConnection.prototype, {
// Find a matching ObserveMultiplexer, or create a new one. This next block is
// guaranteed to not yield (and it doesn't call anything that can observe a
// new query), so no other calls to this function can interleave with it.
if (observeKey in self._observeMultiplexers) {
multiplexer = self._observeMultiplexers[observeKey];
if (observeKey in this._observeMultiplexers) {
multiplexer = this._observeMultiplexers[observeKey];
} else {
firstHandle = true;
// Create a new ObserveMultiplexer.
multiplexer = new ObserveMultiplexer({
ordered: ordered,
onStop: function () {
delete self._observeMultiplexers[observeKey];
onStop: () => {
delete this._observeMultiplexers[observeKey];
return observeDriver.stop();
}
});
@@ -847,62 +932,23 @@ Object.assign(MongoConnection.prototype, {
nonMutatingCallbacks,
);
const oplogOptions = self?._oplogHandle?._oplogOptions || {};
const oplogOptions = (this._oplogHandle && this._oplogHandle._oplogOptions) || {};
const { includeCollections, excludeCollections } = oplogOptions;
if (firstHandle) {
var matcher, sorter;
const reactivitySetting = Meteor.settings?.packages?.mongo?.reactivity;
const isArraySetting = Array.isArray(reactivitySetting);
const isStringSetting = typeof reactivitySetting === 'string';
const hasCustomDriverOrder = isArraySetting || isStringSetting;
if (reactivitySetting && !hasCustomDriverOrder) {
throw new Error('Meteor.settings.packages.mongo.reactivity must be a string or an array of observer drivers');
}
const driverClasses = {
changeStreams: ChangeStreamObserveDriver,
oplog: OplogObserveDriver,
polling: PollingObserveDriver,
pooling: PollingObserveDriver,
};
let configuredOrder;
if (hasCustomDriverOrder) {
if (isStringSetting) {
configuredOrder = [reactivitySetting];
} else {
configuredOrder = [];
for (const name of reactivitySetting) {
if (!configuredOrder.includes(name)) {
configuredOrder.push(name);
}
}
}
} else {
configuredOrder = DEFAULT_REACTIVITY_ORDER;
}
const invalidDriverNames = configuredOrder.filter(name => !driverClasses[name]);
if (invalidDriverNames.length) {
throw new Error(`Invalid Mongo reactivity driver(s): ${invalidDriverNames.join(', ')}`);
}
if (hasCustomDriverOrder && configuredOrder.length === 0) {
throw new Error('Meteor.settings.packages.mongo.reactivity must specify at least one observer driver');
}
const configuredOrder = _getConfiguredReactivityOrder();
const driverChecks = {
changeStreams: async () => {
let localMatcher;
const reasons = [];
if (self._supportsChangeStreams === undefined) {
if (this._supportsChangeStreams === undefined) {
const serverReasons = [];
try {
// Change Streams require MongoDB 3.6+ and replica set or sharded cluster
const admin = self.db.admin();
const admin = this.db.admin();
const serverInfo = await admin.serverInfo();
const isMasterPromise = admin.command({ isMaster: 1 });
const versionString = serverInfo.version || 'unknown';
@@ -930,13 +976,13 @@ Object.assign(MongoConnection.prototype, {
serverReasons.push(`Error checking Change Stream support: ${error.message}`);
}
self._changeStreamServerReasons = serverReasons;
self._supportsChangeStreams = serverReasons.length === 0;
this._changeStreamServerReasons = serverReasons;
this._supportsChangeStreams = serverReasons.length === 0;
}
if (!self._supportsChangeStreams) {
if (self._changeStreamServerReasons?.length) {
reasons.push(...self._changeStreamServerReasons);
if (!this._supportsChangeStreams) {
if (this._changeStreamServerReasons?.length) {
reasons.push(...this._changeStreamServerReasons);
} else {
reasons.push('Change Streams not supported by MongoDB deployment');
}
@@ -980,7 +1026,7 @@ Object.assign(MongoConnection.prototype, {
let localMatcher;
let localSorter;
if (!(self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback)) {
if (!(this._oplogHandle && !ordered && !callbacks._testOnlyPollCallback)) {
reasons.push('Oplog tailing not available for this cursor');
}
@@ -1035,48 +1081,20 @@ Object.assign(MongoConnection.prototype, {
};
},
polling: () => ({ available: true }),
pooling: () => ({ available: true }),
};
const availabilityErrors = [];
var driverClass;
var selectedDriverName;
const {
driverClass,
matcher: selectedMatcher,
sorter: selectedSorter,
} = await this._selectReactivityDriver(configuredOrder, driverChecks);
for (const driverName of configuredOrder) {
const checker = driverChecks[driverName];
matcher = selectedMatcher;
sorter = selectedSorter;
if (!checker) {
availabilityErrors.push(`Unknown driver "${driverName}"`);
continue;
}
const result = await checker();
if (result.available) {
selectedDriverName = driverName;
matcher = result.matcher;
sorter = result.sorter;
driverClass = driverClasses[driverName];
break;
}
if (result.reason) {
availabilityErrors.push(`${driverName}: ${result.reason}`);
}
}
if (!driverClass) {
const errorDetails = availabilityErrors.length
? ` Reasons: ${availabilityErrors.join(' | ')}`
: '';
throw new Error(`Unable to select a Mongo reactivity driver from configuration [${configuredOrder.join(', ')}].${errorDetails}`);
}
// Meteor._debug(`Using ${selectedDriverName || driverClass.name} for observing changes on collection ${collectionName} (configured order: ${configuredOrder.join(', ')})`);
observeDriver = new driverClass({
cursorDescription,
mongoHandle: self,
mongoHandle: this,
multiplexer,
ordered,
matcher, // ignored by polling
@@ -1091,11 +1109,9 @@ Object.assign(MongoConnection.prototype, {
// This field is only set for use in tests.
multiplexer._observeDriver = observeDriver;
}
self._observeMultiplexers[observeKey] = multiplexer;
this._observeMultiplexers[observeKey] = multiplexer;
// Blocks until the initial adds have been sent.
await multiplexer.addHandleAndSendInitialAdds(observeHandle);
return observeHandle;
},
});
}

View File

@@ -15,16 +15,16 @@ Before moving production traffic to Change Streams, validate that your MongoDB d
## Choosing the Reactivity Driver Order
Meteor picks the first available driver from the configured list. The default order is `oplog`, then `changeStreams`, then `pooling` (long polling). You can change this globally:
Meteor picks the first available driver from the configured list. The default order is `oplog`, then `changeStreams`, then `polling` (long polling). You can change this globally:
- Environment variable: `METEOR_REACTIVITY_ORDER=changeStreams,oplog,pooling`
- Environment variable: `METEOR_REACTIVITY_ORDER=changeStreams,oplog,polling`
- Settings file:
```json
{
"packages": {
"mongo": {
"reactivity": ["changeStreams", "oplog", "pooling"]
"reactivity": ["changeStreams", "oplog", "polling"]
}
}
}
@@ -33,7 +33,7 @@ Meteor picks the first available driver from the configured list. The default or
Tips:
- Put `changeStreams` first when you cannot or do not want to tail the oplog (e.g., Atlas Serverless).
- Remove `changeStreams` from the list if you want to disable it.
- Valid entries are `oplog`, `changeStreams`, and `polling`/`pooling` (alias).
- Valid entries are `oplog`, `changeStreams`, and `polling`/`polling` (alias).
## Change Stream Driver Settings