Merge branch 'release-3.5' into mongo-collation-support

This commit is contained in:
Michael Pfeiffer
2026-04-27 09:00:20 -05:00
committed by GitHub
3 changed files with 2272 additions and 1931 deletions

View File

@@ -10,10 +10,9 @@ import { compareOperationTimes } from './mongo_common';
const SUPPORTED_OPERATIONS = ['insert', 'update', 'replace', 'delete'];
/**
* ChangeStreamObserveDriver - MongoDB Change Streams based observe driver
*
*
* Uses MongoDB Change Streams to watch for real-time changes to a collection.
* Implements a stop callback system similar to PollingObserveDriver for proper
* resource cleanup when the driver is stopped.
@@ -54,23 +53,23 @@ export class ChangeStreamObserveDriver {
return fields;
};
}
this._startListening();
this._startWatching();
}
_sendMultiplexerAdded(id, projectedDoc) {
// Apply EJSON transformation before sending to client
projectedDoc = replaceTypes(projectedDoc, replaceMongoAtomWithMeteor);
try {
this._multiplexer.added(id, projectedDoc);
} catch (error) {
console.error('[ChangeStreams] Error sending added document:', error);
}
// Apply EJSON transformation before sending to client
projectedDoc = replaceTypes(projectedDoc, replaceMongoAtomWithMeteor);
try {
this._multiplexer.added(id, projectedDoc);
} catch (error) {
console.error('[ChangeStreams] Error sending added document:', error);
}
}
async _startListening() {
// Register a listener to be notified when writes happen
// This follows the same pattern as OplogObserveDriver
const stopHandle = await listenAll(
@@ -80,31 +79,34 @@ export class ChangeStreamObserveDriver {
const fence = DDPServer._getCurrentFence();
if (!fence || fence.fired)
return;
if (fence._changeStreamObserveDrivers) {
fence._changeStreamObserveDrivers[this._id] = this;
return;
}
fence._changeStreamObserveDrivers = {};
fence._changeStreamObserveDrivers[this._id] = this;
fence.onBeforeFire(async () => {
const drivers = fence._changeStreamObserveDrivers;
delete fence._changeStreamObserveDrivers;
// Process each driver that needs to be synchronized with the fence
for (const driver of Object.values(drivers)) {
if (driver._stopped) continue;
const write = await fence.beginWrite();
// Wait for the change stream to catch up with any pending operations
await driver._waitUntilCaughtUp();
// Wait for the change stream to catch up with any pending operations.
// Pass the fence explicitly: fence.fire() runs outside the
// AsyncLocalStorage context, so DDPServer._getCurrentFence() would
// return undefined here and miss the fence._csTargetTs annotation.
await driver._waitUntilCaughtUp(fence);
// Process any pending writes immediately
driver._flushPendingWrites();
// If the driver is ready (initial adds complete), ensure all writes are committed
if (driver._isReady) {
await driver._multiplexer.onFlush(async () => {
@@ -115,14 +117,19 @@ export class ChangeStreamObserveDriver {
driver._writesToCommitWhenReady.push(write);
}
}
// Release the per-collection write-timestamp map now that every
// driver on this fence has caught up. The fence object is about
// to be discarded, but clearing explicitly prevents any stray
// read of a now-stale target.
delete fence._csTargetTsByCollection;
});
}
);
// Register the stop handle
this._addStopCallback(() => stopHandle.stop());
}
_addStopCallback(callback) {
@@ -133,9 +140,9 @@ export class ChangeStreamObserveDriver {
}
async _startWatching() {
if (this._stopped) return;
try {
const collection = this._mongoHandle.rawCollection(this._cursorDescription.collectionName);
@@ -178,7 +185,7 @@ export class ChangeStreamObserveDriver {
this._setLastProcessedOperationTime(change.clusterTime);
}
this._handleChange(change);
// Check if we're in a fence
const fence = DDPServer._getCurrentFence();
if (fence && !fence.fired) {
@@ -204,7 +211,7 @@ export class ChangeStreamObserveDriver {
this._restartChangeStream();
}
}, Meteor?.settings?.packages?.mongo?.changeStream?.delay?.error || 100);
// Register timeout cleanup
this._addStopCallback(() => {
clearTimeout(timeoutId);
@@ -219,20 +226,20 @@ export class ChangeStreamObserveDriver {
this._restartChangeStream();
}
}, Meteor?.settings?.packages?.mongo?.changeStream?.delay?.close || 100);
// Register timeout cleanup
this._addStopCallback(() => {
clearTimeout(timeoutId);
});
}
}));
// Now we can allow queued fence writes to commit safely
this._isReady = true;
await this._flushWritesToCommit();
// Remove the defer that was calling _flushPendingWrites
} catch (error) {
console.error('Failed to start ChangeStream:', error);
throw error;
@@ -241,21 +248,21 @@ export class ChangeStreamObserveDriver {
async _sendInitialAdds(collection) {
if (this._stopped) return;
try {
// Build the same selector and options that the cursor would use
const selector = this._cursorDescription.selector || {};
const options = { ...this._cursorDescription.options };
// Find all existing documents
const cursor = collection.find(selector, options);
// Follow oplog driver pattern: get current fence and store write for later commit
const fence = DDPServer._getCurrentFence();
if (fence) {
this._writesToCommitWhenReady.push(fence.beginWrite());
}
// Send 'added' for each existing document that matches our matcher
let docCount = 0;
for await (const doc of cursor) {
@@ -265,9 +272,9 @@ export class ChangeStreamObserveDriver {
this._sendMultiplexerAdded(id, projectedDoc);
docCount++;
}
// DON'T call ready() or flush here - let _startWatching handle it
} catch (error) {
console.error('Error sending initial adds for ChangeStream:', error);
throw error;
@@ -279,8 +286,8 @@ export class ChangeStreamObserveDriver {
// Close current stream using stop callbacks if they exist
if (this._changeStream) {
// Find and execute the change stream stop callback
const changeStreamCallback = this._stopCallbacks.find(cb =>
typeof cb._changeStream === 'function'
const changeStreamCallback = this._stopCallbacks.find(cb =>
typeof cb._changeStream === 'function'
);
if (changeStreamCallback) {
await changeStreamCallback();
@@ -298,12 +305,12 @@ export class ChangeStreamObserveDriver {
// For now, use a simple pipeline that watches all operations
// We'll filter using our matcher in _handleChange
const selector = this._cursorDescription.selector;
if (!selector || Object.keys(selector).length === 0) {
// No selector, watch all changes
return [];
}
// Simple pipeline that just filters by operation type
// More complex selector filtering will be done in _handleChange
return [
@@ -317,7 +324,7 @@ export class ChangeStreamObserveDriver {
async _handleChange(change) {
if (this._stopped) return;
const { operationType, documentKey, fullDocument, fullDocumentBeforeChange, clusterTime } = change;
if (!SUPPORTED_OPERATIONS.includes(operationType)) {
@@ -328,12 +335,12 @@ export class ChangeStreamObserveDriver {
if (typeof documentKey._id?.toHexString === 'function') {
id = new MongoID.ObjectID(documentKey._id.toHexString());
}
// Update last processed operation time (redundant with early update, but safe)
if (clusterTime) {
this._setLastProcessedOperationTime(clusterTime);
}
// Store callback to be executed later when fence processes writes
// Don't try to capture fence here - it will be handled in onBeforeFire
const callbackData = {
@@ -343,7 +350,7 @@ export class ChangeStreamObserveDriver {
fullDocumentBeforeChange,
change
};
this._pendingWrites.push(callbackData);
}
@@ -380,7 +387,7 @@ export class ChangeStreamObserveDriver {
const res = await commands[index]();
return res?.operationTime || res?.$clusterTime?.clusterTime || null;
} catch (error) {
if (!error) {
if (!error) {
return false;
}
@@ -433,7 +440,7 @@ export class ChangeStreamObserveDriver {
// Similar to oplog driver's _beSteady method
const writes = this._writesToCommitWhenReady;
this._writesToCommitWhenReady = [];
if (writes.length > 0) {
await this._multiplexer.onFlush(async () => {
for (const write of writes) {
@@ -506,12 +513,29 @@ export class ChangeStreamObserveDriver {
}
}
async _waitUntilCaughtUp() {
async _waitUntilCaughtUp(fenceOverride) {
// Wait until our change stream has processed events up to the
// server's current operation time. Mirrors oplog's wait logic.
if (this._stopped) return;
const targetTs = await this._getServerOperationTime();
// Prefer the exact clusterTime of the write(s) that triggered this fence,
// when the write path annotated it on the fence (see
// mongo_connection._annotateFenceWithWriteTs). Falls back to asking the
// server for its current operationTime, which races ahead of the write's
// ts and historically caused every fence to wait the full 1000ms timeout.
// The fence must be passed explicitly because fence.fire() runs outside
// the AsyncLocalStorage context where _getCurrentFence() would find it.
// Target is looked up per-collection: this driver only observes events
// from its own collection, so waiting on a ts from a different
// collection's write would always time out.
const fence = fenceOverride || DDPServer._getCurrentFence();
const { collectionName } = this._cursorDescription;
const { _csTargetTsByCollection } = fence || {};
let targetTs = _csTargetTsByCollection && collectionName ? _csTargetTsByCollection[collectionName] : undefined;
if (!targetTs) {
targetTs = await this._getServerOperationTime();
}
if (!targetTs) {
// Best-effort fallback: yield to I/O but don't artificially delay
await new Promise((r) => setImmediate(r));
@@ -532,7 +556,13 @@ export class ChangeStreamObserveDriver {
let timeoutId = null;
const entry = { ts: targetTs, resolver: null };
const timeoutMs = Meteor?.settings?.packages?.mongo?.changeStream?.waitUntilCaughtUpTimeoutMs ?? 1000;
// With fence._csTargetTsByCollection annotated by the write path, the
// resolver fires as soon as the driver's own change event arrives,
// typically <100ms. The timeout is a safety valve for edge cases
// (stream stalled, write outside a fence, etc.). 250ms is small
// enough that a regression in the annotation path surfaces quickly
// in benchmarks rather than hiding behind a one-second wait.
const timeoutMs = Meteor?.settings?.packages?.mongo?.changeStream?.waitUntilCaughtUpTimeoutMs ?? 250;
await new Promise((resolve) => {
entry.resolver = () => {
@@ -555,9 +585,9 @@ export class ChangeStreamObserveDriver {
async stop() {
if (this._stopped) return;
this._stopped = true;
// Execute all stop callbacks
for (const callback of this._stopCallbacks) {
try {
@@ -566,20 +596,20 @@ export class ChangeStreamObserveDriver {
console.error('Error in stop callback:', error);
}
}
// Handle any remaining pending writes (following oplog driver pattern)
for (const write of this._pendingWrites) {
if(!write || typeof write.committed !== 'function') continue;
if (!write || typeof write.committed !== 'function') continue;
await write.committed();
}
this._pendingWrites = [];
// Handle any remaining writes to commit
for (const write of this._writesToCommitWhenReady) {
await write.committed();
}
this._writesToCommitWhenReady = [];
// Clear callbacks array
this._stopCallbacks = [];
}

View File

@@ -6,7 +6,7 @@ import { AsynchronousCursor } from './asynchronous_cursor';
import { Cursor } from './cursor';
import { CursorDescription } from './cursor_description';
import { DocFetcher } from './doc_fetcher';
import { MongoDB, replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common';
import { MongoDB, compareOperationTimes, replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common';
import { ObserveHandle } from './observe_handle';
import { ObserveMultiplexer } from './observe_multiplex';
import { OplogObserveDriver } from './oplog_observe_driver';
@@ -165,6 +165,26 @@ MongoConnection.prototype._maybeBeginWrite = function () {
}
};
// Record the clusterTime of a write on the current DDP write fence so the
// ChangeStreamObserveDriver can wait for that exact timestamp instead of
// polling the server for a "current" time that may not be echoed by the
// stream until the next heartbeat (~1s).
//
// The target is per-collection: each change stream driver watches a single
// collection and will only observe clusterTimes from events in that
// collection. A fence may cover writes across multiple collections (e.g.
// creating a card also writes to activities), so picking a single "max ts"
// for the whole fence would stall drivers whose collection never sees
// that specific ts. We therefore keep the max ts per collection.
function _annotateFenceWithWriteTs(fence, collectionName, writeTs) {
if (!fence || !writeTs || !collectionName) return;
const map = fence._csTargetTsByCollection = fence._csTargetTsByCollection || {};
const prev = map[collectionName];
if (!prev || compareOperationTimes(writeTs, prev) > 0) {
map[collectionName] = writeTs;
}
}
// Internal interface: adds a callback which is called when the Mongo primary
// changes. Returns a stop handle.
MongoConnection.prototype._onFailover = function (callback) {
@@ -189,16 +209,21 @@ MongoConnection.prototype.insertAsync = async function (collection_name, documen
var refresh = async function () {
await Meteor.refresh({collection: collection_name, id: document._id });
};
const session = self.client.startSession();
return self.rawCollection(collection_name).insertOne(
replaceTypes(document, replaceMeteorAtomWithMongo),
{
safe: true,
session,
}
).then(async ({insertedId}) => {
_annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime);
await session.endSession();
await refresh();
await write.committed();
return insertedId;
}).catch(async e => {
try { await session.endSession(); } catch (_) { /* ignore */ }
await write.committed();
throw e;
});
@@ -237,15 +262,20 @@ MongoConnection.prototype.removeAsync = async function (collection_name, selecto
await self._refresh(collection_name, selector);
};
const session = self.client.startSession();
return self.rawCollection(collection_name)
.deleteMany(replaceTypes(selector, replaceMeteorAtomWithMongo), {
safe: true,
session,
})
.then(async ({ deletedCount }) => {
_annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime);
await session.endSession();
await refresh();
await write.committed();
return transformResult({ result : {modifiedCount : deletedCount} }).numberAffected;
}).catch(async (err) => {
try { await session.endSession(); } catch (_) { /* ignore */ }
await write.committed();
throw err;
});
@@ -264,15 +294,19 @@ MongoConnection.prototype.dropCollectionAsync = async function(collectionName) {
});
};
const session = self.client.startSession();
return self
.rawCollection(collectionName)
.drop()
.drop({ session })
.then(async result => {
_annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collectionName, session.operationTime);
await session.endSession();
await refresh();
await write.committed();
return result;
})
.catch(async e => {
try { await session.endSession(); } catch (_) { /* ignore */ }
await write.committed();
throw e;
});
@@ -334,7 +368,8 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto
};
var collection = self.rawCollection(collection_name);
var mongoOpts = {safe: true};
const session = self.client.startSession();
var mongoOpts = {safe: true, session};
// Add support for filtered positional operator
if (options.arrayFilters !== undefined) mongoOpts.arrayFilters = options.arrayFilters;
// explictly enumerate options that minimongo supports
@@ -386,8 +421,10 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto
// - The id is defined by query or mod we can just add it to the replacement doc
// - The user did not specify any id preference and the id is a Mongo ObjectId,
// then we can just let Mongo generate the id
return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options)
return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options, session)
.then(async result => {
_annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime);
await session.endSession();
await refresh();
await write.committed();
if (result && ! options._returnObject) {
@@ -395,6 +432,9 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto
} else {
return result;
}
}).catch(async err => {
try { await session.endSession(); } catch (_) { /* ignore */ }
throw err;
});
} else {
if (options.upsert && !knownId && options.insertedId && isModify) {
@@ -414,6 +454,8 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto
return collection[updateMethod]
.bind(collection)(mongoSelector, mongoMod, mongoOpts)
.then(async result => {
_annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime);
await session.endSession();
var meteorResult = transformResult({result});
if (meteorResult && options._returnObject) {
// If this was an upsertAsync() call, and we ended up
@@ -435,6 +477,7 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto
return meteorResult.numberAffected;
}
}).catch(async (err) => {
try { await session.endSession(); } catch (_) { /* ignore */ }
await write.committed();
throw err;
});
@@ -561,7 +604,7 @@ var NUM_OPTIMISTIC_TRIES = 3;
var simulateUpsertWithInsertedId = async function (collection, selector, mod, options) {
var simulateUpsertWithInsertedId = async function (collection, selector, mod, options, session) {
// STRATEGY: First try doing an upsert with a generated ID.
// If this throws an error about changing the ID on an existing document
// then without affecting the database, we know we should probably try
@@ -578,11 +621,13 @@ var simulateUpsertWithInsertedId = async function (collection, selector, mod, op
var insertedId = options.insertedId; // must exist
var mongoOptsForUpdate = {
safe: true,
multi: options.multi
multi: options.multi,
session,
};
var mongoOptsForInsert = {
safe: true,
upsert: true
upsert: true,
session,
};
var replacementWithId = Object.assign(

File diff suppressed because it is too large Load Diff