diff --git a/packages/mongo/changestream_observe_driver.js b/packages/mongo/changestream_observe_driver.js index 3c09ef6ca3..cf0c5f49c8 100644 --- a/packages/mongo/changestream_observe_driver.js +++ b/packages/mongo/changestream_observe_driver.js @@ -10,10 +10,9 @@ import { compareOperationTimes } from './mongo_common'; const SUPPORTED_OPERATIONS = ['insert', 'update', 'replace', 'delete']; - /** * ChangeStreamObserveDriver - MongoDB Change Streams based observe driver - * + * * Uses MongoDB Change Streams to watch for real-time changes to a collection. * Implements a stop callback system similar to PollingObserveDriver for proper * resource cleanup when the driver is stopped. @@ -54,23 +53,23 @@ export class ChangeStreamObserveDriver { return fields; }; } - + this._startListening(); this._startWatching(); } _sendMultiplexerAdded(id, projectedDoc) { - // Apply EJSON transformation before sending to client - projectedDoc = replaceTypes(projectedDoc, replaceMongoAtomWithMeteor); - try { - this._multiplexer.added(id, projectedDoc); - } catch (error) { - console.error('[ChangeStreams] Error sending added document:', error); - } + // Apply EJSON transformation before sending to client + projectedDoc = replaceTypes(projectedDoc, replaceMongoAtomWithMeteor); + try { + this._multiplexer.added(id, projectedDoc); + } catch (error) { + console.error('[ChangeStreams] Error sending added document:', error); + } } - + async _startListening() { - + // Register a listener to be notified when writes happen // This follows the same pattern as OplogObserveDriver const stopHandle = await listenAll( @@ -80,31 +79,34 @@ export class ChangeStreamObserveDriver { const fence = DDPServer._getCurrentFence(); if (!fence || fence.fired) return; - + if (fence._changeStreamObserveDrivers) { fence._changeStreamObserveDrivers[this._id] = this; return; } - + fence._changeStreamObserveDrivers = {}; fence._changeStreamObserveDrivers[this._id] = this; - + fence.onBeforeFire(async () => { const drivers = fence._changeStreamObserveDrivers; delete fence._changeStreamObserveDrivers; - + // Process each driver that needs to be synchronized with the fence for (const driver of Object.values(drivers)) { if (driver._stopped) continue; - + const write = await fence.beginWrite(); - - // Wait for the change stream to catch up with any pending operations - await driver._waitUntilCaughtUp(); - + + // Wait for the change stream to catch up with any pending operations. + // Pass the fence explicitly: fence.fire() runs outside the + // AsyncLocalStorage context, so DDPServer._getCurrentFence() would + // return undefined here and miss the fence._csTargetTs annotation. + await driver._waitUntilCaughtUp(fence); + // Process any pending writes immediately driver._flushPendingWrites(); - + // If the driver is ready (initial adds complete), ensure all writes are committed if (driver._isReady) { await driver._multiplexer.onFlush(async () => { @@ -115,14 +117,19 @@ export class ChangeStreamObserveDriver { driver._writesToCommitWhenReady.push(write); } } + // Release the per-collection write-timestamp map now that every + // driver on this fence has caught up. The fence object is about + // to be discarded, but clearing explicitly prevents any stray + // read of a now-stale target. + delete fence._csTargetTsByCollection; }); } ); - + // Register the stop handle this._addStopCallback(() => stopHandle.stop()); } - + _addStopCallback(callback) { @@ -133,9 +140,9 @@ export class ChangeStreamObserveDriver { } async _startWatching() { - + if (this._stopped) return; - + try { const collection = this._mongoHandle.rawCollection(this._cursorDescription.collectionName); @@ -178,7 +185,7 @@ export class ChangeStreamObserveDriver { this._setLastProcessedOperationTime(change.clusterTime); } this._handleChange(change); - + // Check if we're in a fence const fence = DDPServer._getCurrentFence(); if (fence && !fence.fired) { @@ -204,7 +211,7 @@ export class ChangeStreamObserveDriver { this._restartChangeStream(); } }, Meteor?.settings?.packages?.mongo?.changeStream?.delay?.error || 100); - + // Register timeout cleanup this._addStopCallback(() => { clearTimeout(timeoutId); @@ -219,20 +226,20 @@ export class ChangeStreamObserveDriver { this._restartChangeStream(); } }, Meteor?.settings?.packages?.mongo?.changeStream?.delay?.close || 100); - + // Register timeout cleanup this._addStopCallback(() => { clearTimeout(timeoutId); }); } })); - + // Now we can allow queued fence writes to commit safely this._isReady = true; await this._flushWritesToCommit(); - + // Remove the defer that was calling _flushPendingWrites - + } catch (error) { console.error('Failed to start ChangeStream:', error); throw error; @@ -241,21 +248,21 @@ export class ChangeStreamObserveDriver { async _sendInitialAdds(collection) { if (this._stopped) return; - + try { // Build the same selector and options that the cursor would use const selector = this._cursorDescription.selector || {}; const options = { ...this._cursorDescription.options }; - + // Find all existing documents const cursor = collection.find(selector, options); - + // Follow oplog driver pattern: get current fence and store write for later commit const fence = DDPServer._getCurrentFence(); if (fence) { this._writesToCommitWhenReady.push(fence.beginWrite()); } - + // Send 'added' for each existing document that matches our matcher let docCount = 0; for await (const doc of cursor) { @@ -265,9 +272,9 @@ export class ChangeStreamObserveDriver { this._sendMultiplexerAdded(id, projectedDoc); docCount++; } - + // DON'T call ready() or flush here - let _startWatching handle it - + } catch (error) { console.error('Error sending initial adds for ChangeStream:', error); throw error; @@ -279,8 +286,8 @@ export class ChangeStreamObserveDriver { // Close current stream using stop callbacks if they exist if (this._changeStream) { // Find and execute the change stream stop callback - const changeStreamCallback = this._stopCallbacks.find(cb => - typeof cb._changeStream === 'function' + const changeStreamCallback = this._stopCallbacks.find(cb => + typeof cb._changeStream === 'function' ); if (changeStreamCallback) { await changeStreamCallback(); @@ -298,12 +305,12 @@ export class ChangeStreamObserveDriver { // For now, use a simple pipeline that watches all operations // We'll filter using our matcher in _handleChange const selector = this._cursorDescription.selector; - + if (!selector || Object.keys(selector).length === 0) { // No selector, watch all changes return []; } - + // Simple pipeline that just filters by operation type // More complex selector filtering will be done in _handleChange return [ @@ -317,7 +324,7 @@ export class ChangeStreamObserveDriver { async _handleChange(change) { if (this._stopped) return; - + const { operationType, documentKey, fullDocument, fullDocumentBeforeChange, clusterTime } = change; if (!SUPPORTED_OPERATIONS.includes(operationType)) { @@ -328,12 +335,12 @@ export class ChangeStreamObserveDriver { if (typeof documentKey._id?.toHexString === 'function') { id = new MongoID.ObjectID(documentKey._id.toHexString()); } - + // Update last processed operation time (redundant with early update, but safe) if (clusterTime) { this._setLastProcessedOperationTime(clusterTime); } - + // Store callback to be executed later when fence processes writes // Don't try to capture fence here - it will be handled in onBeforeFire const callbackData = { @@ -343,7 +350,7 @@ export class ChangeStreamObserveDriver { fullDocumentBeforeChange, change }; - + this._pendingWrites.push(callbackData); } @@ -380,7 +387,7 @@ export class ChangeStreamObserveDriver { const res = await commands[index](); return res?.operationTime || res?.$clusterTime?.clusterTime || null; } catch (error) { - if (!error) { + if (!error) { return false; } @@ -433,7 +440,7 @@ export class ChangeStreamObserveDriver { // Similar to oplog driver's _beSteady method const writes = this._writesToCommitWhenReady; this._writesToCommitWhenReady = []; - + if (writes.length > 0) { await this._multiplexer.onFlush(async () => { for (const write of writes) { @@ -506,12 +513,29 @@ export class ChangeStreamObserveDriver { } } - async _waitUntilCaughtUp() { + async _waitUntilCaughtUp(fenceOverride) { // Wait until our change stream has processed events up to the // server's current operation time. Mirrors oplog's wait logic. if (this._stopped) return; - const targetTs = await this._getServerOperationTime(); + // Prefer the exact clusterTime of the write(s) that triggered this fence, + // when the write path annotated it on the fence (see + // mongo_connection._annotateFenceWithWriteTs). Falls back to asking the + // server for its current operationTime, which races ahead of the write's + // ts and historically caused every fence to wait the full 1000ms timeout. + // The fence must be passed explicitly because fence.fire() runs outside + // the AsyncLocalStorage context where _getCurrentFence() would find it. + // Target is looked up per-collection: this driver only observes events + // from its own collection, so waiting on a ts from a different + // collection's write would always time out. + const fence = fenceOverride || DDPServer._getCurrentFence(); + const { collectionName } = this._cursorDescription; + const { _csTargetTsByCollection } = fence || {}; + let targetTs = _csTargetTsByCollection && collectionName ? _csTargetTsByCollection[collectionName] : undefined; + if (!targetTs) { + targetTs = await this._getServerOperationTime(); + } + if (!targetTs) { // Best-effort fallback: yield to I/O but don't artificially delay await new Promise((r) => setImmediate(r)); @@ -532,7 +556,13 @@ export class ChangeStreamObserveDriver { let timeoutId = null; const entry = { ts: targetTs, resolver: null }; - const timeoutMs = Meteor?.settings?.packages?.mongo?.changeStream?.waitUntilCaughtUpTimeoutMs ?? 1000; + // With fence._csTargetTsByCollection annotated by the write path, the + // resolver fires as soon as the driver's own change event arrives, + // typically <100ms. The timeout is a safety valve for edge cases + // (stream stalled, write outside a fence, etc.). 250ms is small + // enough that a regression in the annotation path surfaces quickly + // in benchmarks rather than hiding behind a one-second wait. + const timeoutMs = Meteor?.settings?.packages?.mongo?.changeStream?.waitUntilCaughtUpTimeoutMs ?? 250; await new Promise((resolve) => { entry.resolver = () => { @@ -555,9 +585,9 @@ export class ChangeStreamObserveDriver { async stop() { if (this._stopped) return; - + this._stopped = true; - + // Execute all stop callbacks for (const callback of this._stopCallbacks) { try { @@ -566,20 +596,20 @@ export class ChangeStreamObserveDriver { console.error('Error in stop callback:', error); } } - + // Handle any remaining pending writes (following oplog driver pattern) for (const write of this._pendingWrites) { - if(!write || typeof write.committed !== 'function') continue; + if (!write || typeof write.committed !== 'function') continue; await write.committed(); } this._pendingWrites = []; - + // Handle any remaining writes to commit for (const write of this._writesToCommitWhenReady) { await write.committed(); } this._writesToCommitWhenReady = []; - + // Clear callbacks array this._stopCallbacks = []; } diff --git a/packages/mongo/mongo_connection.js b/packages/mongo/mongo_connection.js index 42c9a8d480..f53b6f3fa6 100644 --- a/packages/mongo/mongo_connection.js +++ b/packages/mongo/mongo_connection.js @@ -6,7 +6,7 @@ import { AsynchronousCursor } from './asynchronous_cursor'; import { Cursor } from './cursor'; import { CursorDescription } from './cursor_description'; import { DocFetcher } from './doc_fetcher'; -import { MongoDB, replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common'; +import { MongoDB, compareOperationTimes, replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common'; import { ObserveHandle } from './observe_handle'; import { ObserveMultiplexer } from './observe_multiplex'; import { OplogObserveDriver } from './oplog_observe_driver'; @@ -165,6 +165,26 @@ MongoConnection.prototype._maybeBeginWrite = function () { } }; +// Record the clusterTime of a write on the current DDP write fence so the +// ChangeStreamObserveDriver can wait for that exact timestamp instead of +// polling the server for a "current" time that may not be echoed by the +// stream until the next heartbeat (~1s). +// +// The target is per-collection: each change stream driver watches a single +// collection and will only observe clusterTimes from events in that +// collection. A fence may cover writes across multiple collections (e.g. +// creating a card also writes to activities), so picking a single "max ts" +// for the whole fence would stall drivers whose collection never sees +// that specific ts. We therefore keep the max ts per collection. +function _annotateFenceWithWriteTs(fence, collectionName, writeTs) { + if (!fence || !writeTs || !collectionName) return; + const map = fence._csTargetTsByCollection = fence._csTargetTsByCollection || {}; + const prev = map[collectionName]; + if (!prev || compareOperationTimes(writeTs, prev) > 0) { + map[collectionName] = writeTs; + } +} + // Internal interface: adds a callback which is called when the Mongo primary // changes. Returns a stop handle. MongoConnection.prototype._onFailover = function (callback) { @@ -189,16 +209,21 @@ MongoConnection.prototype.insertAsync = async function (collection_name, documen var refresh = async function () { await Meteor.refresh({collection: collection_name, id: document._id }); }; + const session = self.client.startSession(); return self.rawCollection(collection_name).insertOne( replaceTypes(document, replaceMeteorAtomWithMongo), { safe: true, + session, } ).then(async ({insertedId}) => { + _annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime); + await session.endSession(); await refresh(); await write.committed(); return insertedId; }).catch(async e => { + try { await session.endSession(); } catch (_) { /* ignore */ } await write.committed(); throw e; }); @@ -237,15 +262,20 @@ MongoConnection.prototype.removeAsync = async function (collection_name, selecto await self._refresh(collection_name, selector); }; + const session = self.client.startSession(); return self.rawCollection(collection_name) .deleteMany(replaceTypes(selector, replaceMeteorAtomWithMongo), { safe: true, + session, }) .then(async ({ deletedCount }) => { + _annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime); + await session.endSession(); await refresh(); await write.committed(); return transformResult({ result : {modifiedCount : deletedCount} }).numberAffected; }).catch(async (err) => { + try { await session.endSession(); } catch (_) { /* ignore */ } await write.committed(); throw err; }); @@ -264,15 +294,19 @@ MongoConnection.prototype.dropCollectionAsync = async function(collectionName) { }); }; + const session = self.client.startSession(); return self .rawCollection(collectionName) - .drop() + .drop({ session }) .then(async result => { + _annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collectionName, session.operationTime); + await session.endSession(); await refresh(); await write.committed(); return result; }) .catch(async e => { + try { await session.endSession(); } catch (_) { /* ignore */ } await write.committed(); throw e; }); @@ -334,7 +368,8 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto }; var collection = self.rawCollection(collection_name); - var mongoOpts = {safe: true}; + const session = self.client.startSession(); + var mongoOpts = {safe: true, session}; // Add support for filtered positional operator if (options.arrayFilters !== undefined) mongoOpts.arrayFilters = options.arrayFilters; // explictly enumerate options that minimongo supports @@ -386,8 +421,10 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto // - The id is defined by query or mod we can just add it to the replacement doc // - The user did not specify any id preference and the id is a Mongo ObjectId, // then we can just let Mongo generate the id - return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options) + return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options, session) .then(async result => { + _annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime); + await session.endSession(); await refresh(); await write.committed(); if (result && ! options._returnObject) { @@ -395,6 +432,9 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto } else { return result; } + }).catch(async err => { + try { await session.endSession(); } catch (_) { /* ignore */ } + throw err; }); } else { if (options.upsert && !knownId && options.insertedId && isModify) { @@ -414,6 +454,8 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto return collection[updateMethod] .bind(collection)(mongoSelector, mongoMod, mongoOpts) .then(async result => { + _annotateFenceWithWriteTs(DDPServer._getCurrentFence(), collection_name, session.operationTime); + await session.endSession(); var meteorResult = transformResult({result}); if (meteorResult && options._returnObject) { // If this was an upsertAsync() call, and we ended up @@ -435,6 +477,7 @@ MongoConnection.prototype.updateAsync = async function (collection_name, selecto return meteorResult.numberAffected; } }).catch(async (err) => { + try { await session.endSession(); } catch (_) { /* ignore */ } await write.committed(); throw err; }); @@ -561,7 +604,7 @@ var NUM_OPTIMISTIC_TRIES = 3; -var simulateUpsertWithInsertedId = async function (collection, selector, mod, options) { +var simulateUpsertWithInsertedId = async function (collection, selector, mod, options, session) { // STRATEGY: First try doing an upsert with a generated ID. // If this throws an error about changing the ID on an existing document // then without affecting the database, we know we should probably try @@ -578,11 +621,13 @@ var simulateUpsertWithInsertedId = async function (collection, selector, mod, op var insertedId = options.insertedId; // must exist var mongoOptsForUpdate = { safe: true, - multi: options.multi + multi: options.multi, + session, }; var mongoOptsForInsert = { safe: true, - upsert: true + upsert: true, + session, }; var replacementWithId = Object.assign( diff --git a/packages/mongo/tests/changestream_observe_driver_tests.js b/packages/mongo/tests/changestream_observe_driver_tests.js index b1d01e9432..c7ce21938f 100644 --- a/packages/mongo/tests/changestream_observe_driver_tests.js +++ b/packages/mongo/tests/changestream_observe_driver_tests.js @@ -16,6 +16,7 @@ import { Meteor } from 'meteor/meteor'; import { Mongo } from 'meteor/mongo'; import { Random } from 'meteor/random'; import { EJSON } from 'meteor/ejson'; +import { DDPServer } from 'meteor/ddp-server'; // Helper to check if change streams are supported const isChangeStreamDriver = (handle) => { @@ -29,7 +30,7 @@ const DEFAULT_REACTIVITY = process.env.METEOR_REACTIVITY_ORDER const IS_CHANGESTREAM = DEFAULT_REACTIVITY && DEFAULT_REACTIVITY[0] === 'changeStreams'; // Helper to create a unique collection for each test -const makeCollection = function() { +const makeCollection = function () { return new Mongo.Collection('changestream_test_' + Random.id()); }; @@ -57,11 +58,11 @@ const waitFor = async (conditionFn, timeoutMs = 2000, intervalMs = 50) => { Tinytest.addAsync( 'changestream - driver detection', - async function(test) { + async function (test) { const c = makeCollection(); const handle = await c.find({}).observeChanges({ - added: function() {} + added: function () { } }); // Log which driver is being used for debugging @@ -94,1869 +95,1869 @@ if (!IS_CHANGESTREAM) { Tinytest.addAsync( 'changestream - basic insert detection', - async function(test) { - const c = makeCollection(); - const results = []; + async function (test) { + const c = makeCollection(); + const results = []; - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Insert a document and wait for the callback - await c.insertAsync({ name: 'test', value: 42 }); - - // Wait for the change to be detected - await waitFor(() => results.length > 0); - - test.equal(results.length, 1, 'Should have received one added callback'); - test.equal(results[0].type, 'added'); - test.equal(results[0].fields.name, 'test'); - test.equal(results[0].fields.value, 42); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - basic update detection', - async function(test) { - const c = makeCollection(); - const results = []; - - // Insert first to have something to update - const insertedId = await c.insertAsync({ name: 'test', value: 42 }); - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Clear the initial add - await waitFor(() => results.length > 0); - results.length = 0; - - // Update the document - await c.updateAsync(insertedId, { $set: { value: 100, extra: 'new' } }); - - // Wait for the change to be detected - await waitFor(() => results.length > 0); - - test.equal(results.length, 1, 'Should have received one changed callback'); - test.equal(results[0].type, 'changed'); - test.equal(results[0].fields.value, 100); - test.equal(results[0].fields.extra, 'new'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - basic delete detection', - async function(test) { - const c = makeCollection(); - const results = []; - - // Insert first to have something to delete - const insertedId = await c.insertAsync({ name: 'test', value: 42 }); - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - }, - removed: function(id) { - results.push({ type: 'removed', id }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Wait for initial add - await waitFor(() => results.length > 0); - results.length = 0; - - // Delete the document - await c.removeAsync(insertedId); - - // Wait for the change to be detected - await waitFor(() => results.length > 0); - - test.equal(results.length, 1, 'Should have received one removed callback'); - test.equal(results[0].type, 'removed'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - full document replace', - async function(test) { - const c = makeCollection(); - const results = []; - - const insertedId = await c.insertAsync({ name: 'original', value: 1 }); - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - await waitFor(() => results.length > 0); - results.length = 0; - - // Replace entire document - await c.updateAsync(insertedId, { name: 'replaced', value: 999 }); - - await waitFor(() => results.length > 0); - - test.equal(results.length, 1); - test.equal(results[0].type, 'changed'); - test.equal(results[0].fields.name, 'replaced'); - test.equal(results[0].fields.value, 999); - - handle.stop(); - } - ); - - // ============================================================================ - // PROJECTION / FIELD FILTERING TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - projection filters fields correctly', - async function(test) { - const c = makeCollection(); - const results = []; - - const insertedId = await c.insertAsync({ - name: 'test', - value: 42, - secret: 'hidden' - }); - - // Only observe 'name' field - const handle = await c.find({}, { fields: { name: 1 } }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - await waitFor(() => results.length > 0); - - // Initial add should only have 'name' field - test.equal(results[0].type, 'added'); - test.equal(results[0].fields.name, 'test'); - test.isUndefined(results[0].fields.value, 'value should not be included'); - test.isUndefined(results[0].fields.secret, 'secret should not be included'); - - results.length = 0; - - // Update a non-projected field - should produce change only if name is affected - await c.updateAsync(insertedId, { $set: { name: 'updated' } }); - - await waitFor(() => results.length > 0); - - test.equal(results[0].type, 'changed'); - test.equal(results[0].fields.name, 'updated'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - projection with multiple fields', - async function(test) { - const c = makeCollection(); - const results = []; - - await c.insertAsync({ - a: 1, b: 2, c: 3, d: 4 - }); - - // Only observe 'a' and 'c' fields - const handle = await c.find({}, { fields: { a: 1, c: 1 } }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - await waitFor(() => results.length > 0); - - test.equal(results[0].fields.a, 1); - test.equal(results[0].fields.c, 3); - test.isUndefined(results[0].fields.b); - test.isUndefined(results[0].fields.d); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - projection with exclusion', - async function(test) { - const c = makeCollection(); - const results = []; - - await c.insertAsync({ - a: 1, b: 2, c: 3 - }); - - // Exclude 'b' field - const handle = await c.find({}, { fields: { b: 0 } }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - await waitFor(() => results.length > 0); - - test.equal(results[0].fields.a, 1); - test.equal(results[0].fields.c, 3); - test.isUndefined(results[0].fields.b, 'b should be excluded'); - - handle.stop(); - } - ); - - // ============================================================================ - // SELECTOR / MATCHER FILTERING TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - selector filters documents correctly', - async function(test) { - const c = makeCollection(); - const results = []; - - // Only observe documents with type: 'visible' - const handle = await c.find({ type: 'visible' }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Insert document that should NOT match - await c.insertAsync({ type: 'hidden', name: 'hidden doc' }); - - // Wait a bit to ensure no callback - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0, 'Hidden doc should not trigger callback'); - - // Insert document that SHOULD match - await c.insertAsync({ type: 'visible', name: 'visible doc' }); - - await waitFor(() => results.length > 0); - - test.equal(results.length, 1); - test.equal(results[0].fields.type, 'visible'); - test.equal(results[0].fields.name, 'visible doc'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - selector with comparison operators', - async function(test) { - const c = makeCollection(); - const results = []; - - // Only observe documents with value >= 50 - const handle = await c.find({ value: { $gte: 50 } }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Insert document that should NOT match - await c.insertAsync({ value: 25 }); - - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0); - - // Insert document that SHOULD match - await c.insertAsync({ value: 75 }); - - await waitFor(() => results.length > 0); - - test.equal(results.length, 1); - test.equal(results[0].fields.value, 75); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - document enters and exits result set through update', - async function(test) { - const c = makeCollection(); - const results = []; - - // Insert a document that initially matches - const docId = await c.insertAsync({ status: 'active', name: 'doc' }); - - const handle = await c.find({ status: 'active' }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - }, - removed: function(id) { - results.push({ type: 'removed', id }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Wait for initial add - await waitFor(() => results.length > 0); - test.equal(results[0].type, 'added'); - results.length = 0; - - // Update to no longer match - should trigger removed - await c.updateAsync(docId, { $set: { status: 'inactive' } }); - - await waitFor(() => results.length > 0); - test.equal(results[0].type, 'removed'); - - results.length = 0; - - // Update to match again - should trigger added - await c.updateAsync(docId, { $set: { status: 'active' } }); - - await waitFor(() => results.length > 0); - test.equal(results[0].type, 'added'); - - handle.stop(); - } - ); - - // ============================================================================ - // INITIAL ADDS TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - sends initial adds for existing documents', - async function(test) { - const c = makeCollection(); - const results = []; - - // Pre-populate the collection - for (let i = 1; i <= 3; i++) - await c.insertAsync({ name: `doc${i}`, order: i }); - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - // Wait for all initial adds - await waitFor(() => results.length >= 3); - - test.equal(results.length, 3, 'Should receive 3 initial adds'); - - // Verify all documents were received - const names = results.map(r => r.fields.name).sort(); - test.equal(names, ['doc1', 'doc2', 'doc3']); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - initial adds respect selector', - async function(test) { - const c = makeCollection(); - const results = []; - - // Pre-populate with mixed documents - ['a', 'b', 'a'].forEach(async type => { - await c.insertAsync({ type, name: type + Random.id() }); - }); - - // Only observe type: 'a' - const handle = await c.find({ type: 'a' }).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - - await waitFor(() => results.length >= 2); - - test.equal(results.length, 2, 'Should only receive 2 initial adds'); - test.isTrue(results.every(r => r.fields.type === 'a')); - - handle.stop(); - } - ); - - // ============================================================================ - // MULTIPLE OBSERVERS TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - multiple observers on same query share driver', - async function(test) { - const c = makeCollection(); - const results1 = []; - const results2 = []; - - const handle1 = await c.find({}).observeChanges({ - added: function(id, fields) { - results1.push({ id, fields }); - } - }); - - const handle2 = await c.find({}).observeChanges({ - added: function(id, fields) { - results2.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle1), 'Handle 1 should use ChangeStream driver'); - test.isTrue(isChangeStreamDriver(handle2), 'Handle 2 should use ChangeStream driver'); - - // They should share the same multiplexer - test.equal( - handle1._multiplexer, - handle2._multiplexer, - 'Identical queries should share multiplexer' - ); - - // Insert a document - await c.insertAsync({ name: 'shared' }); - - await waitFor(() => results1.length > 0 && results2.length > 0); - - test.equal(results1.length, 1); - test.equal(results2.length, 1); - test.equal(results1[0].fields.name, 'shared'); - test.equal(results2[0].fields.name, 'shared'); - - handle1.stop(); - handle2.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - different queries use different drivers', - async function(test) { - const c = makeCollection(); - - const handle1 = await c.find({ type: 'a' }).observeChanges({ - added: function() {} - }); - - const handle2 = await c.find({ type: 'b' }).observeChanges({ - added: function() {} - }); - - test.isTrue(isChangeStreamDriver(handle1)); - test.isTrue(isChangeStreamDriver(handle2)); - - // Different queries should have different multiplexers - test.notEqual( - handle1._multiplexer, - handle2._multiplexer, - 'Different queries should have different multiplexers' - ); - - handle1.stop(); - handle2.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - stopping one handle does not affect others', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle1 = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ from: 'handle1', id, fields }); - } - }); - - const handle2 = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ from: 'handle2', id, fields }); - } - }); - - // Wait for any initial state - await new Promise(r => setTimeout(r, 200)); - results.length = 0; - - // Stop handle1 - handle1.stop(); - - // Insert a document - await c.insertAsync({ name: 'after stop' }); - - await waitFor(() => results.length > 0); - - // Only handle2 should receive the callback - test.isTrue(results.every(r => r.from === 'handle2')); - - handle2.stop(); - } - ); - - // ============================================================================ - // CALLBACK ISOLATION TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - callbacks receive independent clones', - async function(test) { - const c = makeCollection(); - let fields1 = null; - let fields2 = null; - - const handle1 = await c.find({}).observeChanges({ - added: function(id, fields) { - fields1 = fields; - // Mutate the fields object - fields.mutated = true; - } - }); - - const handle2 = await c.find({}).observeChanges({ - added: function(id, fields) { - fields2 = fields; - } - }); - - await c.insertAsync({ name: 'test' }); - - await waitFor(() => fields1 !== null && fields2 !== null); - - // handle2's fields should not be affected by handle1's mutation - test.isUndefined(fields2.mutated, 'Callbacks should receive independent objects'); - - handle1.stop(); - handle2.stop(); - } - ); - - // ============================================================================ - // EDGE CASES TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - handles ObjectID correctly', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Insert with ObjectID - const objectId = new Mongo.ObjectID(); - await c.insertAsync({ _id: objectId, name: 'with objectid' }); - - await waitFor(() => results.length > 0); - - test.equal(results.length, 1); - test.equal(results[0].fields.name, 'with objectid'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles nested documents', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - const docId = await c.insertAsync({ - nested: { - level1: { - level2: { - value: 'deep' - } - } - } - }); - - await waitFor(() => results.length > 0); - - test.equal(results[0].fields.nested.level1.level2.value, 'deep'); - - results.length = 0; - - // Update nested field - await c.updateAsync(docId, { $set: { 'nested.level1.level2.value': 'updated' } }); - - await waitFor(() => results.length > 0); - - test.equal(results[0].type, 'changed'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles arrays correctly', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - const docId = await c.insertAsync({ - items: [1, 2, 3], - tags: ['a', 'b'] - }); - - await waitFor(() => results.length > 0); - - test.equal(results[0].fields.items, [1, 2, 3]); - test.equal(results[0].fields.tags, ['a', 'b']); - - results.length = 0; - - // Push to array - await c.updateAsync(docId, { $push: { items: 4 } }); - - await waitFor(() => results.length > 0); - - test.equal(results[0].type, 'changed'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles Date objects', - async function(test) { - const c = makeCollection(); - const results = []; - const testDate = new Date('2025-01-15T12:00:00Z'); - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - await c.insertAsync({ - createdAt: testDate, - name: 'with date' - }); - - await waitFor(() => results.length > 0); - - test.instanceOf(results[0].fields.createdAt, Date); - test.equal(results[0].fields.createdAt.getTime(), testDate.getTime()); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles EJSON types', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Insert with binary data - const binary = EJSON.newBinary(4); - binary[0] = 1; - binary[1] = 2; - binary[2] = 3; - binary[3] = 4; - - await c.insertAsync({ - data: binary, - name: 'with binary' - }); - - await waitFor(() => results.length > 0); - - test.equal(results[0].fields.name, 'with binary'); - - handle.stop(); - } - ); - - // ============================================================================ - // STOP / CLEANUP TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - stop prevents further callbacks', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Insert before stop - await c.insertAsync({ name: 'before stop' }); - await waitFor(() => results.length > 0); - - const countBefore = results.length; - - // Stop the handle - handle.stop(); - - // Insert after stop - await c.insertAsync({ name: 'after stop 1' }); - await c.insertAsync({ name: 'after stop 2' }); - - // Wait a bit - await new Promise(r => setTimeout(r, 500)); - - // No new callbacks should have been received - test.equal(results.length, countBefore, 'No callbacks after stop'); - } - ); - - Tinytest.addAsync( - 'changestream - multiple stops are safe', - async function(test) { - const c = makeCollection(); - - const handle = await c.find({}).observeChanges({ - added: function() {} - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Calling stop multiple times should not throw - handle.stop(); - handle.stop(); - handle.stop(); - - test.ok('Multiple stops did not throw'); - } - ); - - // ============================================================================ - // RAPID CHANGES TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - handles rapid inserts', - async function(test) { - const c = makeCollection(); - const results = []; - const COUNT = 20; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Rapid inserts - const insertPromises = []; - for (let i = 0; i < COUNT; i++) { - insertPromises.push(c.insertAsync({ index: i })); + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); } - await Promise.all(insertPromises); + }); - // Wait for all to be detected - await waitFor(() => results.length >= COUNT, 5000); + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - test.equal(results.length, COUNT, `Should receive all ${COUNT} inserts`); + // Insert a document and wait for the callback + await c.insertAsync({ name: 'test', value: 42 }); - handle.stop(); - } - ); + // Wait for the change to be detected + await waitFor(() => results.length > 0); - Tinytest.addAsync( - 'changestream - handles rapid updates to same document', - async function(test) { - const c = makeCollection(); - const changes = []; + test.equal(results.length, 1, 'Should have received one added callback'); + test.equal(results[0].type, 'added'); + test.equal(results[0].fields.name, 'test'); + test.equal(results[0].fields.value, 42); - const docId = await c.insertAsync({ counter: 0 }); + handle.stop(); + } +); - const handle = await c.find({}).observeChanges({ - added: function(id, fields) {}, - changed: function(id, fields) { - changes.push(fields); - } - }); +Tinytest.addAsync( + 'changestream - basic update detection', + async function (test) { + const c = makeCollection(); + const results = []; - test.isTrue(isChangeStreamDriver(handle)); + // Insert first to have something to update + const insertedId = await c.insertAsync({ name: 'test', value: 42 }); - // Rapid updates - for (let i = 1; i <= 10; i++) { - await c.updateAsync(docId, { $set: { counter: i } }); + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', id, fields }); } + }); - // Wait for some changes - await waitFor(() => changes.length > 0, 3000); + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - // We should receive at least one change (may coalesce) - test.isTrue(changes.length > 0, 'Should receive at least one change'); + // Clear the initial add + await waitFor(() => results.length > 0); + results.length = 0; - handle.stop(); - } - ); + // Update the document + await c.updateAsync(insertedId, { $set: { value: 100, extra: 'new' } }); - // ============================================================================ - // SORT AND LIMIT TESTS - // ============================================================================ + // Wait for the change to be detected + await waitFor(() => results.length > 0); - Tinytest.addAsync( - 'changestream - works with sort option', - async function(test) { - const c = makeCollection(); - const results = []; + test.equal(results.length, 1, 'Should have received one changed callback'); + test.equal(results[0].type, 'changed'); + test.equal(results[0].fields.value, 100); + test.equal(results[0].fields.extra, 'new'); - await c.insertAsync({ order: 3, name: 'third' }); - await c.insertAsync({ order: 1, name: 'first' }); - await c.insertAsync({ order: 2, name: 'second' }); + handle.stop(); + } +); - const handle = await c.find({}, { sort: { order: 1 } }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); +Tinytest.addAsync( + 'changestream - basic delete detection', + async function (test) { + const c = makeCollection(); + const results = []; - test.isTrue(isChangeStreamDriver(handle)); + // Insert first to have something to delete + const insertedId = await c.insertAsync({ name: 'test', value: 42 }); - await waitFor(() => results.length >= 3); - - test.equal(results.length, 3); - - handle.stop(); - } - ); - - // ============================================================================ - // ENVIRONMENT VARIABLE CONTEXT TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - preserves EnvironmentVariable context', - async function(test) { - const c = makeCollection(); - let contextValue = null; - - const [resolver, promise] = getPromiseAndResolver(); - - const envVar = new Meteor.EnvironmentVariable(); - - await envVar.withValue('test-context', async function() { - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - contextValue = envVar.get(); - handle.stop(); - resolver(); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - }); - - await c.insertAsync({ name: 'trigger' }); - - await promise; - - test.equal(contextValue, 'test-context', 'Should preserve environment context'); - } - ); - - // ============================================================================ - // COMPLEX SELECTOR TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - handles $and selector', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({ - $and: [ - { status: 'active' }, - { level: { $gte: 5 } } - ] - }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Should NOT match (status wrong) - await c.insertAsync({ status: 'inactive', level: 10 }); - - // Should NOT match (level too low) - await c.insertAsync({ status: 'active', level: 3 }); - - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0); - - // Should match - await c.insertAsync({ status: 'active', level: 7 }); - - await waitFor(() => results.length > 0); - - test.equal(results.length, 1); - test.equal(results[0].fields.status, 'active'); - test.equal(results[0].fields.level, 7); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles $or selector', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({ - $or: [ - { type: 'admin' }, - { priority: 'high' } - ] - }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Should NOT match - await c.insertAsync({ type: 'user', priority: 'low' }); - - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0); - - // Should match (first condition) - await c.insertAsync({ type: 'admin', priority: 'low' }); - - await waitFor(() => results.length > 0); - test.equal(results.length, 1); - - results.length = 0; - - // Should match (second condition) - await c.insertAsync({ type: 'user', priority: 'high' }); - - await waitFor(() => results.length > 0); - test.equal(results.length, 1); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles $in selector', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({ - category: { $in: ['electronics', 'books', 'toys'] } - }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Should NOT match - await c.insertAsync({ category: 'furniture' }); - - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0); - - // Should match - await c.insertAsync({ category: 'electronics' }); - - await waitFor(() => results.length > 0); - test.equal(results.length, 1); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - handles $regex selector', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({ - email: { $regex: /@example\.com$/ } - }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Should NOT match - await c.insertAsync({ email: 'user@other.com' }); - - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0); - - // Should match - await c.insertAsync({ email: 'user@example.com' }); - - await waitFor(() => results.length > 0); - test.equal(results.length, 1); - - handle.stop(); - } - ); - - // ============================================================================ - // UPDATE OPERATORS TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - detects $set updates', - async function(test) { - const c = makeCollection(); - const changes = []; - - const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); - - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - await waitFor(() => true); // Ensure initial state - - await c.updateAsync(docId, { $set: { b: 20, d: 4 } }); - - await waitFor(() => changes.length > 0); - - test.isTrue(changes.length > 0); - test.equal(changes[0].b, 20); - test.equal(changes[0].d, 4); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - detects $unset updates', - async function(test) { - const c = makeCollection(); - const changes = []; - - const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); - - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - await waitFor(() => true); - - await c.updateAsync(docId, { $unset: { b: 1 } }); - - await waitFor(() => changes.length > 0); - - test.isTrue(changes.length > 0); - test.equal(changes[0].b, undefined); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - detects $inc updates', - async function(test) { - const c = makeCollection(); - const changes = []; - - const docId = await c.insertAsync({ counter: 0 }); - - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - await waitFor(() => true); - - await c.updateAsync(docId, { $inc: { counter: 5 } }); - - await waitFor(() => changes.length > 0); - - test.isTrue(changes.length > 0); - test.equal(changes[0].counter, 5); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - detects $push updates', - async function(test) { - const c = makeCollection(); - const changes = []; - - const docId = await c.insertAsync({ items: [1, 2] }); - - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - await waitFor(() => true); - - await c.updateAsync(docId, { $push: { items: 3 } }); - - await waitFor(() => changes.length > 0); - - test.isTrue(changes.length > 0); - test.equal(changes[0].items, [1, 2, 3]); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - detects $rename updates', - async function(test) { - const c = makeCollection(); - const changes = []; - - const docId = await c.insertAsync({ oldName: 'value', other: 'unchanged' }); - - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - await waitFor(() => true); - - await c.updateAsync(docId, { $rename: { oldName: 'newName' } }); - - await waitFor(() => changes.length > 0); - - test.isTrue(changes.length > 0); - test.equal(changes[0].newName, 'value'); - test.equal(changes[0].oldName, undefined); - - handle.stop(); - } - ); - - // ============================================================================ - // FENCE SYNCHRONIZATION TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - write fence integration - basic', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Use DDP fence by calling a method that does an insert - // The insert should be visible after the method returns - const insertedId = await c.insertAsync({ name: 'fenced insert' }); - - // After the async insert returns, the change should have been processed - await waitFor(() => results.some(r => r.fields.name === 'fenced insert'), 2000); - - test.isTrue( - results.some(r => r.fields.name === 'fenced insert'), - 'Fenced insert should be visible in observer' - ); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - fence synchronization with multiple writes', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - results.push({ type: 'added', fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Multiple sequential writes - const docId = await c.insertAsync({ step: 1 }); - await c.updateAsync(docId, { $set: { step: 2 } }); - await c.updateAsync(docId, { $set: { step: 3 } }); - - // All operations should eventually be visible - await waitFor(() => - results.some(r => r.type === 'added' && r.fields.step === 1) && - results.some(r => r.type === 'changed'), - 3000 - ); - - test.isTrue(results.some(r => r.type === 'added'), 'Should have added event'); - test.isTrue(results.some(r => r.type === 'changed'), 'Should have changed event'); - - handle.stop(); - } - ); - - Tinytest.addAsync( - 'changestream - write visibility after insert', - async function(test) { - const c = makeCollection(); - const seen = { added: false }; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - if (fields.marker === 'visibility-test') { - seen.added = true; - } - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Insert and immediately check visibility - await c.insertAsync({ marker: 'visibility-test' }); - - // The observer should see the insert - await waitFor(() => seen.added, 2000); - - test.isTrue(seen.added, 'Insert should be visible in observer after insertAsync returns'); - - handle.stop(); - } - ); - - // ============================================================================ - // OPERATION TIME TRACKING TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - tracks operation times for synchronization', - async function(test) { - const c = makeCollection(); - - const handle = await c.find({}).observeChanges({ - added: function() {} - }); - - test.isTrue(isChangeStreamDriver(handle)); - - const driver = handle._multiplexer._observeDriver; - - // Initially, no operation time - // After some operations, we should have a tracked time - await c.insertAsync({ name: 'op-time-test' }); - - // Wait for the change to be processed - await new Promise(r => setTimeout(r, 500)); - - // The driver should have tracked some operation time - // Note: This is internal state, but we're testing the mechanism works - test.isTrue( - driver._lastProcessedOperationTime !== null || true, - 'Should track operation times' - ); - - handle.stop(); - } - ); - - // ============================================================================ - // PENDING WRITES PROCESSING TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - processes pending writes correctly', - async function(test) { - const c = makeCollection(); - const events = []; - - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - events.push({ type: 'added', ts: Date.now(), fields }); - }, - changed: function(id, fields) { - events.push({ type: 'changed', ts: Date.now(), fields }); - }, - removed: function(id) { - events.push({ type: 'removed', ts: Date.now() }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Rapid sequence of operations - const docId = await c.insertAsync({ value: 1 }); - await c.updateAsync(docId, { $set: { value: 2 } }); - await c.updateAsync(docId, { $set: { value: 3 } }); - await c.removeAsync(docId); - - // Wait for all events - await waitFor(() => events.some(e => e.type === 'removed'), 3000); - - // Should have received events in logical order - test.isTrue(events.length >= 2, 'Should receive multiple events'); - - const addedIndex = events.findIndex(e => e.type === 'added'); - const removedIndex = events.findIndex(e => e.type === 'removed'); - - if (addedIndex !== -1 && removedIndex !== -1) { - test.isTrue( - addedIndex < removedIndex, - 'Added should come before removed' - ); + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + }, + removed: function (id) { + results.push({ type: 'removed', id }); } + }); - handle.stop(); - } - ); + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - // ============================================================================ - // READY STATE TESTS - // ============================================================================ + // Wait for initial add + await waitFor(() => results.length > 0); + results.length = 0; - Tinytest.addAsync( - 'changestream - becomes ready after initial adds', - async function(test) { - const c = makeCollection(); + // Delete the document + await c.removeAsync(insertedId); - // Pre-populate - await c.insertAsync({ name: 'pre1' }); - await c.insertAsync({ name: 'pre2' }); + // Wait for the change to be detected + await waitFor(() => results.length > 0); - const initialAdds = []; + test.equal(results.length, 1, 'Should have received one removed callback'); + test.equal(results[0].type, 'removed'); - // Use observeChanges (unordered) since ChangeStreams doesn't support ordered observe - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - initialAdds.push({ id, fields }); - } - }); + handle.stop(); + } +); - test.isTrue(isChangeStreamDriver(handle)); +Tinytest.addAsync( + 'changestream - full document replace', + async function (test) { + const c = makeCollection(); + const results = []; - // After observeChanges returns, ready should have been called and initial adds sent - test.isTrue(initialAdds.length >= 2, 'Should have received initial adds'); + const insertedId = await c.insertAsync({ name: 'original', value: 1 }); - handle.stop(); - } - ); + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', id, fields }); + } + }); - Tinytest.addAsync( - 'changestream - queues writes until ready', - async function(test) { - const c = makeCollection(); - const events = []; + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - // Start observe on empty collection - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - events.push({ type: 'added', fields }); - } - }); + await waitFor(() => results.length > 0); + results.length = 0; - test.isTrue(isChangeStreamDriver(handle)); + // Replace entire document + await c.updateAsync(insertedId, { name: 'replaced', value: 999 }); - // The driver should now be ready (after observeChanges returns) - // New writes should be processed - await c.insertAsync({ name: 'after-ready' }); + await waitFor(() => results.length > 0); - await waitFor(() => events.some(e => e.fields.name === 'after-ready')); + test.equal(results.length, 1); + test.equal(results[0].type, 'changed'); + test.equal(results[0].fields.name, 'replaced'); + test.equal(results[0].fields.value, 999); - test.isTrue( - events.some(e => e.fields.name === 'after-ready'), - 'Writes after ready should be processed' - ); + handle.stop(); + } +); - handle.stop(); - } - ); +// ============================================================================ +// PROJECTION / FIELD FILTERING TESTS +// ============================================================================ - // ============================================================================ - // ERROR HANDLING AND RECOVERY TESTS - // ============================================================================ +Tinytest.addAsync( + 'changestream - projection filters fields correctly', + async function (test) { + const c = makeCollection(); + const results = []; - Tinytest.addAsync( - 'changestream - continues working after transient errors', - async function(test) { - const c = makeCollection(); - const events = []; - let errorCount = 0; + const insertedId = await c.insertAsync({ + name: 'test', + value: 42, + secret: 'hidden' + }); - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - events.push({ type: 'added', fields }); - } - }); + // Only observe 'name' field + const handle = await c.find({}, { fields: { name: 1 } }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', id, fields }); + } + }); - test.isTrue(isChangeStreamDriver(handle)); + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); - // Insert should work normally - await c.insertAsync({ phase: 'before-error' }); + await waitFor(() => results.length > 0); - await waitFor(() => events.some(e => e.fields.phase === 'before-error')); + // Initial add should only have 'name' field + test.equal(results[0].type, 'added'); + test.equal(results[0].fields.name, 'test'); + test.isUndefined(results[0].fields.value, 'value should not be included'); + test.isUndefined(results[0].fields.secret, 'secret should not be included'); - // Simulate continued operation (in real scenario, driver would recover) - await c.insertAsync({ phase: 'after-recovery' }); + results.length = 0; - await waitFor(() => events.some(e => e.fields.phase === 'after-recovery')); + // Update a non-projected field - should produce change only if name is affected + await c.updateAsync(insertedId, { $set: { name: 'updated' } }); - test.isTrue( - events.some(e => e.fields.phase === 'after-recovery'), - 'Should continue receiving events after recovery' - ); + await waitFor(() => results.length > 0); - handle.stop(); - } - ); + test.equal(results[0].type, 'changed'); + test.equal(results[0].fields.name, 'updated'); - Tinytest.addAsync( - 'changestream - stop during processing is safe', - async function(test) { - const c = makeCollection(); - let stopCalled = false; + handle.stop(); + } +); - const handle = await c.find({}).observeChanges({ - added: function(id, fields) { - // Stop during a callback - if (fields.trigger === 'stop' && !stopCalled) { - stopCalled = true; - handle.stop(); +Tinytest.addAsync( + 'changestream - projection with multiple fields', + async function (test) { + const c = makeCollection(); + const results = []; + + await c.insertAsync({ + a: 1, b: 2, c: 3, d: 4 + }); + + // Only observe 'a' and 'c' fields + const handle = await c.find({}, { fields: { a: 1, c: 1 } }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + await waitFor(() => results.length > 0); + + test.equal(results[0].fields.a, 1); + test.equal(results[0].fields.c, 3); + test.isUndefined(results[0].fields.b); + test.isUndefined(results[0].fields.d); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - projection with exclusion', + async function (test) { + const c = makeCollection(); + const results = []; + + await c.insertAsync({ + a: 1, b: 2, c: 3 + }); + + // Exclude 'b' field + const handle = await c.find({}, { fields: { b: 0 } }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + await waitFor(() => results.length > 0); + + test.equal(results[0].fields.a, 1); + test.equal(results[0].fields.c, 3); + test.isUndefined(results[0].fields.b, 'b should be excluded'); + + handle.stop(); + } +); + +// ============================================================================ +// SELECTOR / MATCHER FILTERING TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - selector filters documents correctly', + async function (test) { + const c = makeCollection(); + const results = []; + + // Only observe documents with type: 'visible' + const handle = await c.find({ type: 'visible' }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + // Insert document that should NOT match + await c.insertAsync({ type: 'hidden', name: 'hidden doc' }); + + // Wait a bit to ensure no callback + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0, 'Hidden doc should not trigger callback'); + + // Insert document that SHOULD match + await c.insertAsync({ type: 'visible', name: 'visible doc' }); + + await waitFor(() => results.length > 0); + + test.equal(results.length, 1); + test.equal(results[0].fields.type, 'visible'); + test.equal(results[0].fields.name, 'visible doc'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - selector with comparison operators', + async function (test) { + const c = makeCollection(); + const results = []; + + // Only observe documents with value >= 50 + const handle = await c.find({ value: { $gte: 50 } }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + // Insert document that should NOT match + await c.insertAsync({ value: 25 }); + + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0); + + // Insert document that SHOULD match + await c.insertAsync({ value: 75 }); + + await waitFor(() => results.length > 0); + + test.equal(results.length, 1); + test.equal(results[0].fields.value, 75); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - document enters and exits result set through update', + async function (test) { + const c = makeCollection(); + const results = []; + + // Insert a document that initially matches + const docId = await c.insertAsync({ status: 'active', name: 'doc' }); + + const handle = await c.find({ status: 'active' }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + }, + removed: function (id) { + results.push({ type: 'removed', id }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + // Wait for initial add + await waitFor(() => results.length > 0); + test.equal(results[0].type, 'added'); + results.length = 0; + + // Update to no longer match - should trigger removed + await c.updateAsync(docId, { $set: { status: 'inactive' } }); + + await waitFor(() => results.length > 0); + test.equal(results[0].type, 'removed'); + + results.length = 0; + + // Update to match again - should trigger added + await c.updateAsync(docId, { $set: { status: 'active' } }); + + await waitFor(() => results.length > 0); + test.equal(results[0].type, 'added'); + + handle.stop(); + } +); + +// ============================================================================ +// INITIAL ADDS TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - sends initial adds for existing documents', + async function (test) { + const c = makeCollection(); + const results = []; + + // Pre-populate the collection + for (let i = 1; i <= 3; i++) + await c.insertAsync({ name: `doc${i}`, order: i }); + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + // Wait for all initial adds + await waitFor(() => results.length >= 3); + + test.equal(results.length, 3, 'Should receive 3 initial adds'); + + // Verify all documents were received + const names = results.map(r => r.fields.name).sort(); + test.equal(names, ['doc1', 'doc2', 'doc3']); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - initial adds respect selector', + async function (test) { + const c = makeCollection(); + const results = []; + + // Pre-populate with mixed documents + ['a', 'b', 'a'].forEach(async type => { + await c.insertAsync({ type, name: type + Random.id() }); + }); + + // Only observe type: 'a' + const handle = await c.find({ type: 'a' }).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle), 'Should be using ChangeStream driver'); + + await waitFor(() => results.length >= 2); + + test.equal(results.length, 2, 'Should only receive 2 initial adds'); + test.isTrue(results.every(r => r.fields.type === 'a')); + + handle.stop(); + } +); + +// ============================================================================ +// MULTIPLE OBSERVERS TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - multiple observers on same query share driver', + async function (test) { + const c = makeCollection(); + const results1 = []; + const results2 = []; + + const handle1 = await c.find({}).observeChanges({ + added: function (id, fields) { + results1.push({ id, fields }); + } + }); + + const handle2 = await c.find({}).observeChanges({ + added: function (id, fields) { + results2.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle1), 'Handle 1 should use ChangeStream driver'); + test.isTrue(isChangeStreamDriver(handle2), 'Handle 2 should use ChangeStream driver'); + + // They should share the same multiplexer + test.equal( + handle1._multiplexer, + handle2._multiplexer, + 'Identical queries should share multiplexer' + ); + + // Insert a document + await c.insertAsync({ name: 'shared' }); + + await waitFor(() => results1.length > 0 && results2.length > 0); + + test.equal(results1.length, 1); + test.equal(results2.length, 1); + test.equal(results1[0].fields.name, 'shared'); + test.equal(results2[0].fields.name, 'shared'); + + handle1.stop(); + handle2.stop(); + } +); + +Tinytest.addAsync( + 'changestream - different queries use different drivers', + async function (test) { + const c = makeCollection(); + + const handle1 = await c.find({ type: 'a' }).observeChanges({ + added: function () { } + }); + + const handle2 = await c.find({ type: 'b' }).observeChanges({ + added: function () { } + }); + + test.isTrue(isChangeStreamDriver(handle1)); + test.isTrue(isChangeStreamDriver(handle2)); + + // Different queries should have different multiplexers + test.notEqual( + handle1._multiplexer, + handle2._multiplexer, + 'Different queries should have different multiplexers' + ); + + handle1.stop(); + handle2.stop(); + } +); + +Tinytest.addAsync( + 'changestream - stopping one handle does not affect others', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle1 = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ from: 'handle1', id, fields }); + } + }); + + const handle2 = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ from: 'handle2', id, fields }); + } + }); + + // Wait for any initial state + await new Promise(r => setTimeout(r, 200)); + results.length = 0; + + // Stop handle1 + handle1.stop(); + + // Insert a document + await c.insertAsync({ name: 'after stop' }); + + await waitFor(() => results.length > 0); + + // Only handle2 should receive the callback + test.isTrue(results.every(r => r.from === 'handle2')); + + handle2.stop(); + } +); + +// ============================================================================ +// CALLBACK ISOLATION TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - callbacks receive independent clones', + async function (test) { + const c = makeCollection(); + let fields1 = null; + let fields2 = null; + + const handle1 = await c.find({}).observeChanges({ + added: function (id, fields) { + fields1 = fields; + // Mutate the fields object + fields.mutated = true; + } + }); + + const handle2 = await c.find({}).observeChanges({ + added: function (id, fields) { + fields2 = fields; + } + }); + + await c.insertAsync({ name: 'test' }); + + await waitFor(() => fields1 !== null && fields2 !== null); + + // handle2's fields should not be affected by handle1's mutation + test.isUndefined(fields2.mutated, 'Callbacks should receive independent objects'); + + handle1.stop(); + handle2.stop(); + } +); + +// ============================================================================ +// EDGE CASES TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - handles ObjectID correctly', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Insert with ObjectID + const objectId = new Mongo.ObjectID(); + await c.insertAsync({ _id: objectId, name: 'with objectid' }); + + await waitFor(() => results.length > 0); + + test.equal(results.length, 1); + test.equal(results[0].fields.name, 'with objectid'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles nested documents', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + const docId = await c.insertAsync({ + nested: { + level1: { + level2: { + value: 'deep' } } - }); + } + }); - test.isTrue(isChangeStreamDriver(handle)); + await waitFor(() => results.length > 0); - // Trigger the stop inside callback - await c.insertAsync({ trigger: 'stop' }); + test.equal(results[0].fields.nested.level1.level2.value, 'deep'); - await waitFor(() => stopCalled); + results.length = 0; - // Further operations should not throw - await c.insertAsync({ trigger: 'after-stop' }); + // Update nested field + await c.updateAsync(docId, { $set: { 'nested.level1.level2.value': 'updated' } }); - await new Promise(r => setTimeout(r, 300)); + await waitFor(() => results.length > 0); - test.ok('Stop during processing did not throw'); + test.equal(results[0].type, 'changed'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles arrays correctly', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + const docId = await c.insertAsync({ + items: [1, 2, 3], + tags: ['a', 'b'] + }); + + await waitFor(() => results.length > 0); + + test.equal(results[0].fields.items, [1, 2, 3]); + test.equal(results[0].fields.tags, ['a', 'b']); + + results.length = 0; + + // Push to array + await c.updateAsync(docId, { $push: { items: 4 } }); + + await waitFor(() => results.length > 0); + + test.equal(results[0].type, 'changed'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles Date objects', + async function (test) { + const c = makeCollection(); + const results = []; + const testDate = new Date('2025-01-15T12:00:00Z'); + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await c.insertAsync({ + createdAt: testDate, + name: 'with date' + }); + + await waitFor(() => results.length > 0); + + test.instanceOf(results[0].fields.createdAt, Date); + test.equal(results[0].fields.createdAt.getTime(), testDate.getTime()); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles EJSON types', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Insert with binary data + const binary = EJSON.newBinary(4); + binary[0] = 1; + binary[1] = 2; + binary[2] = 3; + binary[3] = 4; + + await c.insertAsync({ + data: binary, + name: 'with binary' + }); + + await waitFor(() => results.length > 0); + + test.equal(results[0].fields.name, 'with binary'); + + handle.stop(); + } +); + +// ============================================================================ +// STOP / CLEANUP TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - stop prevents further callbacks', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Insert before stop + await c.insertAsync({ name: 'before stop' }); + await waitFor(() => results.length > 0); + + const countBefore = results.length; + + // Stop the handle + handle.stop(); + + // Insert after stop + await c.insertAsync({ name: 'after stop 1' }); + await c.insertAsync({ name: 'after stop 2' }); + + // Wait a bit + await new Promise(r => setTimeout(r, 500)); + + // No new callbacks should have been received + test.equal(results.length, countBefore, 'No callbacks after stop'); + } +); + +Tinytest.addAsync( + 'changestream - multiple stops are safe', + async function (test) { + const c = makeCollection(); + + const handle = await c.find({}).observeChanges({ + added: function () { } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Calling stop multiple times should not throw + handle.stop(); + handle.stop(); + handle.stop(); + + test.ok('Multiple stops did not throw'); + } +); + +// ============================================================================ +// RAPID CHANGES TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - handles rapid inserts', + async function (test) { + const c = makeCollection(); + const results = []; + const COUNT = 20; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Rapid inserts + const insertPromises = []; + for (let i = 0; i < COUNT; i++) { + insertPromises.push(c.insertAsync({ index: i })); } - ); + await Promise.all(insertPromises); - // ============================================================================ - // STOP CALLBACK TESTS - // ============================================================================ + // Wait for all to be detected + await waitFor(() => results.length >= COUNT, 5000); - Tinytest.addAsync( - 'changestream - stop callbacks are executed on stop', - async function(test) { - const c = makeCollection(); + test.equal(results.length, COUNT, `Should receive all ${COUNT} inserts`); + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles rapid updates to same document', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ counter: 0 }); + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Rapid updates + for (let i = 1; i <= 10; i++) { + await c.updateAsync(docId, { $set: { counter: i } }); + } + + // Wait for some changes + await waitFor(() => changes.length > 0, 3000); + + // We should receive at least one change (may coalesce) + test.isTrue(changes.length > 0, 'Should receive at least one change'); + + handle.stop(); + } +); + +// ============================================================================ +// SORT AND LIMIT TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - works with sort option', + async function (test) { + const c = makeCollection(); + const results = []; + + await c.insertAsync({ order: 3, name: 'third' }); + await c.insertAsync({ order: 1, name: 'first' }); + await c.insertAsync({ order: 2, name: 'second' }); + + const handle = await c.find({}, { sort: { order: 1 } }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await waitFor(() => results.length >= 3); + + test.equal(results.length, 3); + + handle.stop(); + } +); + +// ============================================================================ +// ENVIRONMENT VARIABLE CONTEXT TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - preserves EnvironmentVariable context', + async function (test) { + const c = makeCollection(); + let contextValue = null; + + const [resolver, promise] = getPromiseAndResolver(); + + const envVar = new Meteor.EnvironmentVariable(); + + await envVar.withValue('test-context', async function () { const handle = await c.find({}).observeChanges({ - added: function() {} - }); - - test.isTrue(isChangeStreamDriver(handle)); - - const driver = handle._multiplexer._observeDriver; - const initialCallbackCount = driver._stopCallbacks.length; - - // Should have some stop callbacks registered - test.isTrue(initialCallbackCount > 0, 'Should have stop callbacks'); - - handle.stop(); - - // Wait for async stop to complete - await waitFor(() => driver._stopCallbacks.length === 0, 2000); - - // After stop, callbacks should be cleared - test.equal(driver._stopCallbacks.length, 0, 'Callbacks should be cleared after stop'); - } - ); - - Tinytest.addAsync( - 'changestream - cleanup on stop is complete', - async function(test) { - const c = makeCollection(); - - const handle = await c.find({}).observeChanges({ - added: function() {} - }); - - test.isTrue(isChangeStreamDriver(handle)); - - const driver = handle._multiplexer._observeDriver; - - // Stop and verify cleanup - handle.stop(); - - // Wait for async stop to complete - await waitFor(() => driver._stopped && driver._stopCallbacks.length === 0, 2000); - - test.isTrue(driver._stopped, 'Driver should be marked as stopped'); - test.equal(driver._pendingWrites.length, 0, 'Pending writes should be cleared'); - test.equal(driver._writesToCommitWhenReady.length, 0, 'Writes to commit should be cleared'); - test.equal(driver._stopCallbacks.length, 0, 'Stop callbacks should be cleared'); - } - ); - - // ============================================================================ - // MATCHER EDGE CASES TESTS - // ============================================================================ - - Tinytest.addAsync( - 'changestream - matcher handles null values', - async function(test) { - const c = makeCollection(); - const results = []; - - const handle = await c.find({ status: null }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); + added: function (id, fields) { + contextValue = envVar.get(); + handle.stop(); + resolver(); } }); test.isTrue(isChangeStreamDriver(handle)); + }); - // Insert with explicit null - await c.insertAsync({ status: null, name: 'null-status' }); + await c.insertAsync({ name: 'trigger' }); - await waitFor(() => results.length > 0); + await promise; - test.equal(results.length, 1); - test.equal(results[0].fields.status, null); + test.equal(contextValue, 'test-context', 'Should preserve environment context'); + } +); - handle.stop(); - } - ); +// ============================================================================ +// COMPLEX SELECTOR TESTS +// ============================================================================ - Tinytest.addAsync( - 'changestream - matcher handles $exists', - async function(test) { - const c = makeCollection(); - const results = []; +Tinytest.addAsync( + 'changestream - handles $and selector', + async function (test) { + const c = makeCollection(); + const results = []; - const handle = await c.find({ optional: { $exists: false } }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); + const handle = await c.find({ + $and: [ + { status: 'active' }, + { level: { $gte: 5 } } + ] + }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Should NOT match (status wrong) + await c.insertAsync({ status: 'inactive', level: 10 }); + + // Should NOT match (level too low) + await c.insertAsync({ status: 'active', level: 3 }); + + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0); + + // Should match + await c.insertAsync({ status: 'active', level: 7 }); + + await waitFor(() => results.length > 0); + + test.equal(results.length, 1); + test.equal(results[0].fields.status, 'active'); + test.equal(results[0].fields.level, 7); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles $or selector', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({ + $or: [ + { type: 'admin' }, + { priority: 'high' } + ] + }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Should NOT match + await c.insertAsync({ type: 'user', priority: 'low' }); + + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0); + + // Should match (first condition) + await c.insertAsync({ type: 'admin', priority: 'low' }); + + await waitFor(() => results.length > 0); + test.equal(results.length, 1); + + results.length = 0; + + // Should match (second condition) + await c.insertAsync({ type: 'user', priority: 'high' }); + + await waitFor(() => results.length > 0); + test.equal(results.length, 1); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles $in selector', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({ + category: { $in: ['electronics', 'books', 'toys'] } + }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Should NOT match + await c.insertAsync({ category: 'furniture' }); + + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0); + + // Should match + await c.insertAsync({ category: 'electronics' }); + + await waitFor(() => results.length > 0); + test.equal(results.length, 1); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles $regex selector', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({ + email: { $regex: /@example\.com$/ } + }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Should NOT match + await c.insertAsync({ email: 'user@other.com' }); + + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0); + + // Should match + await c.insertAsync({ email: 'user@example.com' }); + + await waitFor(() => results.length > 0); + test.equal(results.length, 1); + + handle.stop(); + } +); + +// ============================================================================ +// UPDATE OPERATORS TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - detects $set updates', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await waitFor(() => true); // Ensure initial state + + await c.updateAsync(docId, { $set: { b: 20, d: 4 } }); + + await waitFor(() => changes.length > 0); + + test.isTrue(changes.length > 0); + test.equal(changes[0].b, 20); + test.equal(changes[0].d, 4); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - detects $unset updates', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await waitFor(() => true); + + await c.updateAsync(docId, { $unset: { b: 1 } }); + + await waitFor(() => changes.length > 0); + + test.isTrue(changes.length > 0); + test.equal(changes[0].b, undefined); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - detects $inc updates', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ counter: 0 }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await waitFor(() => true); + + await c.updateAsync(docId, { $inc: { counter: 5 } }); + + await waitFor(() => changes.length > 0); + + test.isTrue(changes.length > 0); + test.equal(changes[0].counter, 5); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - detects $push updates', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ items: [1, 2] }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await waitFor(() => true); + + await c.updateAsync(docId, { $push: { items: 3 } }); + + await waitFor(() => changes.length > 0); + + test.isTrue(changes.length > 0); + test.equal(changes[0].items, [1, 2, 3]); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - detects $rename updates', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ oldName: 'value', other: 'unchanged' }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + await waitFor(() => true); + + await c.updateAsync(docId, { $rename: { oldName: 'newName' } }); + + await waitFor(() => changes.length > 0); + + test.isTrue(changes.length > 0); + test.equal(changes[0].newName, 'value'); + test.equal(changes[0].oldName, undefined); + + handle.stop(); + } +); + +// ============================================================================ +// FENCE SYNCHRONIZATION TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - write fence integration - basic', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Use DDP fence by calling a method that does an insert + // The insert should be visible after the method returns + const insertedId = await c.insertAsync({ name: 'fenced insert' }); + + // After the async insert returns, the change should have been processed + await waitFor(() => results.some(r => r.fields.name === 'fenced insert'), 2000); + + test.isTrue( + results.some(r => r.fields.name === 'fenced insert'), + 'Fenced insert should be visible in observer' + ); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - fence synchronization with multiple writes', + async function (test) { + const c = makeCollection(); + const results = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + results.push({ type: 'added', fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Multiple sequential writes + const docId = await c.insertAsync({ step: 1 }); + await c.updateAsync(docId, { $set: { step: 2 } }); + await c.updateAsync(docId, { $set: { step: 3 } }); + + // All operations should eventually be visible + await waitFor(() => + results.some(r => r.type === 'added' && r.fields.step === 1) && + results.some(r => r.type === 'changed'), + 3000 + ); + + test.isTrue(results.some(r => r.type === 'added'), 'Should have added event'); + test.isTrue(results.some(r => r.type === 'changed'), 'Should have changed event'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - write visibility after insert', + async function (test) { + const c = makeCollection(); + const seen = { added: false }; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + if (fields.marker === 'visibility-test') { + seen.added = true; } - }); + } + }); - test.isTrue(isChangeStreamDriver(handle)); + test.isTrue(isChangeStreamDriver(handle)); - // Insert without the field - should match - await c.insertAsync({ name: 'no-optional' }); + // Insert and immediately check visibility + await c.insertAsync({ marker: 'visibility-test' }); - // Insert with the field - should NOT match - await c.insertAsync({ name: 'has-optional', optional: 'value' }); + // The observer should see the insert + await waitFor(() => seen.added, 2000); - await waitFor(() => results.length > 0); + test.isTrue(seen.added, 'Insert should be visible in observer after insertAsync returns'); - test.equal(results.length, 1); - test.equal(results[0].fields.name, 'no-optional'); + handle.stop(); + } +); - handle.stop(); +// ============================================================================ +// OPERATION TIME TRACKING TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - tracks operation times for synchronization', + async function (test) { + const c = makeCollection(); + + const handle = await c.find({}).observeChanges({ + added: function () { } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + const driver = handle._multiplexer._observeDriver; + + // Initially, no operation time + // After some operations, we should have a tracked time + await c.insertAsync({ name: 'op-time-test' }); + + // Wait for the change to be processed + await new Promise(r => setTimeout(r, 500)); + + // The driver should have tracked some operation time + // Note: This is internal state, but we're testing the mechanism works + test.isTrue( + driver._lastProcessedOperationTime !== null || true, + 'Should track operation times' + ); + + handle.stop(); + } +); + +// ============================================================================ +// PENDING WRITES PROCESSING TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - processes pending writes correctly', + async function (test) { + const c = makeCollection(); + const events = []; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + events.push({ type: 'added', ts: Date.now(), fields }); + }, + changed: function (id, fields) { + events.push({ type: 'changed', ts: Date.now(), fields }); + }, + removed: function (id) { + events.push({ type: 'removed', ts: Date.now() }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Rapid sequence of operations + const docId = await c.insertAsync({ value: 1 }); + await c.updateAsync(docId, { $set: { value: 2 } }); + await c.updateAsync(docId, { $set: { value: 3 } }); + await c.removeAsync(docId); + + // Wait for all events + await waitFor(() => events.some(e => e.type === 'removed'), 3000); + + // Should have received events in logical order + test.isTrue(events.length >= 2, 'Should receive multiple events'); + + const addedIndex = events.findIndex(e => e.type === 'added'); + const removedIndex = events.findIndex(e => e.type === 'removed'); + + if (addedIndex !== -1 && removedIndex !== -1) { + test.isTrue( + addedIndex < removedIndex, + 'Added should come before removed' + ); } - ); - Tinytest.addAsync( - 'changestream - matcher handles $ne', - async function(test) { - const c = makeCollection(); - const results = []; + handle.stop(); + } +); - const handle = await c.find({ status: { $ne: 'deleted' } }).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); +// ============================================================================ +// READY STATE TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - becomes ready after initial adds', + async function (test) { + const c = makeCollection(); + + // Pre-populate + await c.insertAsync({ name: 'pre1' }); + await c.insertAsync({ name: 'pre2' }); + + const initialAdds = []; + + // Use observeChanges (unordered) since ChangeStreams doesn't support ordered observe + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + initialAdds.push({ id, fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // After observeChanges returns, ready should have been called and initial adds sent + test.isTrue(initialAdds.length >= 2, 'Should have received initial adds'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - queues writes until ready', + async function (test) { + const c = makeCollection(); + const events = []; + + // Start observe on empty collection + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + events.push({ type: 'added', fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // The driver should now be ready (after observeChanges returns) + // New writes should be processed + await c.insertAsync({ name: 'after-ready' }); + + await waitFor(() => events.some(e => e.fields.name === 'after-ready')); + + test.isTrue( + events.some(e => e.fields.name === 'after-ready'), + 'Writes after ready should be processed' + ); + + handle.stop(); + } +); + +// ============================================================================ +// ERROR HANDLING AND RECOVERY TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - continues working after transient errors', + async function (test) { + const c = makeCollection(); + const events = []; + let errorCount = 0; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + events.push({ type: 'added', fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Insert should work normally + await c.insertAsync({ phase: 'before-error' }); + + await waitFor(() => events.some(e => e.fields.phase === 'before-error')); + + // Simulate continued operation (in real scenario, driver would recover) + await c.insertAsync({ phase: 'after-recovery' }); + + await waitFor(() => events.some(e => e.fields.phase === 'after-recovery')); + + test.isTrue( + events.some(e => e.fields.phase === 'after-recovery'), + 'Should continue receiving events after recovery' + ); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - stop during processing is safe', + async function (test) { + const c = makeCollection(); + let stopCalled = false; + + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { + // Stop during a callback + if (fields.trigger === 'stop' && !stopCalled) { + stopCalled = true; + handle.stop(); } - }); + } + }); - test.isTrue(isChangeStreamDriver(handle)); + test.isTrue(isChangeStreamDriver(handle)); - // Should match - await c.insertAsync({ status: 'active', name: 'active-doc' }); + // Trigger the stop inside callback + await c.insertAsync({ trigger: 'stop' }); - // Should NOT match - await c.insertAsync({ status: 'deleted', name: 'deleted-doc' }); + await waitFor(() => stopCalled); - // Should match (no status field) - await c.insertAsync({ name: 'no-status-doc' }); + // Further operations should not throw + await c.insertAsync({ trigger: 'after-stop' }); - await waitFor(() => results.length >= 2, 2000); + await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 2); - test.isFalse(results.some(r => r.fields.status === 'deleted')); + test.ok('Stop during processing did not throw'); + } +); - handle.stop(); - } - ); +// ============================================================================ +// STOP CALLBACK TESTS +// ============================================================================ - // ============================================================================ - // DIFFING AND CHANGED FIELDS TESTS - // ============================================================================ +Tinytest.addAsync( + 'changestream - stop callbacks are executed on stop', + async function (test) { + const c = makeCollection(); - Tinytest.addAsync( - 'changestream - only sends changed fields in update', - async function(test) { - const c = makeCollection(); - const changes = []; + const handle = await c.find({}).observeChanges({ + added: function () { } + }); - const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); + test.isTrue(isChangeStreamDriver(handle)); - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); + const driver = handle._multiplexer._observeDriver; + const initialCallbackCount = driver._stopCallbacks.length; - test.isTrue(isChangeStreamDriver(handle)); + // Should have some stop callbacks registered + test.isTrue(initialCallbackCount > 0, 'Should have stop callbacks'); - // Update only field 'b' - await c.updateAsync(docId, { $set: { b: 20 } }); + handle.stop(); - await waitFor(() => changes.length > 0); + // Wait for async stop to complete + await waitFor(() => driver._stopCallbacks.length === 0, 2000); - // Should only receive the changed field - test.isTrue(changes.length > 0); - test.equal(changes[0].b, 20); - // Other fields should not be in the change object (unless driver sends full doc) - // This depends on implementation - some drivers send only diff, some send all + // After stop, callbacks should be cleared + test.equal(driver._stopCallbacks.length, 0, 'Callbacks should be cleared after stop'); + } +); - handle.stop(); - } - ); +Tinytest.addAsync( + 'changestream - cleanup on stop is complete', + async function (test) { + const c = makeCollection(); - Tinytest.addAsync( - 'changestream - handles document replacement correctly', - async function(test) { - const c = makeCollection(); - const changes = []; + const handle = await c.find({}).observeChanges({ + added: function () { } + }); - const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); + test.isTrue(isChangeStreamDriver(handle)); - const handle = await c.find({}).observeChanges({ - added: function() {}, - changed: function(id, fields) { - changes.push(fields); - } - }); + const driver = handle._multiplexer._observeDriver; - test.isTrue(isChangeStreamDriver(handle)); + // Stop and verify cleanup + handle.stop(); - // Replace entire document (not using $set) - await c.updateAsync(docId, { x: 10, y: 20 }); + // Wait for async stop to complete + await waitFor(() => driver._stopped && driver._stopCallbacks.length === 0, 2000); - await waitFor(() => changes.length > 0); + test.isTrue(driver._stopped, 'Driver should be marked as stopped'); + test.equal(driver._pendingWrites.length, 0, 'Pending writes should be cleared'); + test.equal(driver._writesToCommitWhenReady.length, 0, 'Writes to commit should be cleared'); + test.equal(driver._stopCallbacks.length, 0, 'Stop callbacks should be cleared'); + } +); - test.isTrue(changes.length > 0); - // The change should reflect the new document structure - test.equal(changes[0].x, 10); - test.equal(changes[0].y, 20); +// ============================================================================ +// MATCHER EDGE CASES TESTS +// ============================================================================ - handle.stop(); - } - ); +Tinytest.addAsync( + 'changestream - matcher handles null values', + async function (test) { + const c = makeCollection(); + const results = []; - // ============================================================================ - // SORT OPTION TESTS (ChangeStreams only supports unordered cursors) - // ============================================================================ + const handle = await c.find({ status: null }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); - Tinytest.addAsync( - 'changestream - works with sort option on observeChanges', - async function(test) { - const c = makeCollection(); - const events = []; + test.isTrue(isChangeStreamDriver(handle)); - await c.insertAsync({ order: 2, name: 'second' }); - await c.insertAsync({ order: 1, name: 'first' }); - await c.insertAsync({ order: 3, name: 'third' }); + // Insert with explicit null + await c.insertAsync({ status: null, name: 'null-status' }); - // ChangeStreams only supports unordered observeChanges, but sort affects initial query - const handle = await c.find({}, { sort: { order: 1 } }).observeChanges({ - added: function(id, fields) { - events.push({ type: 'added', fields }); - } - }); + await waitFor(() => results.length > 0); - test.isTrue(isChangeStreamDriver(handle)); + test.equal(results.length, 1); + test.equal(results[0].fields.status, null); - // Wait for initial adds - await waitFor(() => events.length >= 3); + handle.stop(); + } +); - test.equal(events.length, 3); - // All documents should be received (order may vary in callbacks) - const names = events.map(e => e.fields.name).sort(); - test.equal(names, ['first', 'second', 'third']); +Tinytest.addAsync( + 'changestream - matcher handles $exists', + async function (test) { + const c = makeCollection(); + const results = []; - handle.stop(); - } - ); + const handle = await c.find({ optional: { $exists: false } }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); - // ============================================================================ - // CONCURRENT OPERATIONS TESTS - // ============================================================================ + test.isTrue(isChangeStreamDriver(handle)); - Tinytest.addAsync( - 'changestream - handles concurrent inserts from multiple collections', - async function(test) { - const c1 = makeCollection(); - const c2 = makeCollection(); - const results1 = []; - const results2 = []; + // Insert without the field - should match + await c.insertAsync({ name: 'no-optional' }); - const handle1 = await c1.find({}).observeChanges({ - added: function(id, fields) { - results1.push(fields); - } - }); + // Insert with the field - should NOT match + await c.insertAsync({ name: 'has-optional', optional: 'value' }); - const handle2 = await c2.find({}).observeChanges({ - added: function(id, fields) { - results2.push(fields); - } - }); + await waitFor(() => results.length > 0); - test.isTrue(isChangeStreamDriver(handle1)); - test.isTrue(isChangeStreamDriver(handle2)); + test.equal(results.length, 1); + test.equal(results[0].fields.name, 'no-optional'); - // Concurrent inserts to both collections - await Promise.all([ - c1.insertAsync({ source: 'c1' }), - c2.insertAsync({ source: 'c2' }), - c1.insertAsync({ source: 'c1' }), - c2.insertAsync({ source: 'c2' }) - ]); + handle.stop(); + } +); - await waitFor(() => results1.length >= 2 && results2.length >= 2); +Tinytest.addAsync( + 'changestream - matcher handles $ne', + async function (test) { + const c = makeCollection(); + const results = []; - test.equal(results1.length, 2); - test.equal(results2.length, 2); - test.isTrue(results1.every(r => r.source === 'c1')); - test.isTrue(results2.every(r => r.source === 'c2')); + const handle = await c.find({ status: { $ne: 'deleted' } }).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + } + }); - handle1.stop(); - handle2.stop(); - } - ); + test.isTrue(isChangeStreamDriver(handle)); + + // Should match + await c.insertAsync({ status: 'active', name: 'active-doc' }); + + // Should NOT match + await c.insertAsync({ status: 'deleted', name: 'deleted-doc' }); + + // Should match (no status field) + await c.insertAsync({ name: 'no-status-doc' }); + + await waitFor(() => results.length >= 2, 2000); + + test.equal(results.length, 2); + test.isFalse(results.some(r => r.fields.status === 'deleted')); + + handle.stop(); + } +); + +// ============================================================================ +// DIFFING AND CHANGED FIELDS TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - only sends changed fields in update', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Update only field 'b' + await c.updateAsync(docId, { $set: { b: 20 } }); + + await waitFor(() => changes.length > 0); + + // Should only receive the changed field + test.isTrue(changes.length > 0); + test.equal(changes[0].b, 20); + // Other fields should not be in the change object (unless driver sends full doc) + // This depends on implementation - some drivers send only diff, some send all + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - handles document replacement correctly', + async function (test) { + const c = makeCollection(); + const changes = []; + + const docId = await c.insertAsync({ a: 1, b: 2, c: 3 }); + + const handle = await c.find({}).observeChanges({ + added: function () { }, + changed: function (id, fields) { + changes.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Replace entire document (not using $set) + await c.updateAsync(docId, { x: 10, y: 20 }); + + await waitFor(() => changes.length > 0); + + test.isTrue(changes.length > 0); + // The change should reflect the new document structure + test.equal(changes[0].x, 10); + test.equal(changes[0].y, 20); + + handle.stop(); + } +); + +// ============================================================================ +// SORT OPTION TESTS (ChangeStreams only supports unordered cursors) +// ============================================================================ + +Tinytest.addAsync( + 'changestream - works with sort option on observeChanges', + async function (test) { + const c = makeCollection(); + const events = []; + + await c.insertAsync({ order: 2, name: 'second' }); + await c.insertAsync({ order: 1, name: 'first' }); + await c.insertAsync({ order: 3, name: 'third' }); + + // ChangeStreams only supports unordered observeChanges, but sort affects initial query + const handle = await c.find({}, { sort: { order: 1 } }).observeChanges({ + added: function (id, fields) { + events.push({ type: 'added', fields }); + } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + // Wait for initial adds + await waitFor(() => events.length >= 3); + + test.equal(events.length, 3); + // All documents should be received (order may vary in callbacks) + const names = events.map(e => e.fields.name).sort(); + test.equal(names, ['first', 'second', 'third']); + + handle.stop(); + } +); + +// ============================================================================ +// CONCURRENT OPERATIONS TESTS +// ============================================================================ + +Tinytest.addAsync( + 'changestream - handles concurrent inserts from multiple collections', + async function (test) { + const c1 = makeCollection(); + const c2 = makeCollection(); + const results1 = []; + const results2 = []; + + const handle1 = await c1.find({}).observeChanges({ + added: function (id, fields) { + results1.push(fields); + } + }); + + const handle2 = await c2.find({}).observeChanges({ + added: function (id, fields) { + results2.push(fields); + } + }); + + test.isTrue(isChangeStreamDriver(handle1)); + test.isTrue(isChangeStreamDriver(handle2)); + + // Concurrent inserts to both collections + await Promise.all([ + c1.insertAsync({ source: 'c1' }), + c2.insertAsync({ source: 'c2' }), + c1.insertAsync({ source: 'c1' }), + c2.insertAsync({ source: 'c2' }) + ]); + + await waitFor(() => results1.length >= 2 && results2.length >= 2); + + test.equal(results1.length, 2); + test.equal(results2.length, 2); + test.isTrue(results1.every(r => r.source === 'c1')); + test.isTrue(results2.every(r => r.source === 'c2')); + + handle1.stop(); + handle2.stop(); + } +); // ============================================================================ // TESTS THAT RUN REGARDLESS OF DRIVER @@ -1964,7 +1965,7 @@ Tinytest.addAsync( Tinytest.addAsync( 'changestream - collection operations work', - async function(test) { + async function (test) { const c = makeCollection(); // Basic CRUD should work @@ -1990,156 +1991,421 @@ Tinytest.addAsync( Tinytest.addAsync( 'changestream - _buildPipeline returns correct structure', - async function(test) { - const c = makeCollection(); + async function (test) { + const c = makeCollection(); - const handle = await c.find({ type: 'test' }).observeChanges({ - added: function() {} - }); + const handle = await c.find({ type: 'test' }).observeChanges({ + added: function () { } + }); - test.isTrue(isChangeStreamDriver(handle)); + test.isTrue(isChangeStreamDriver(handle)); - const driver = handle._multiplexer._observeDriver; - const pipeline = driver._buildPipeline(); + const driver = handle._multiplexer._observeDriver; + const pipeline = driver._buildPipeline(); - // Should be an array - test.isTrue(Array.isArray(pipeline)); + // Should be an array + test.isTrue(Array.isArray(pipeline)); - // If there's a selector, should have a $match stage - if (pipeline.length > 0) { - test.isTrue(pipeline[0].$match !== undefined); + // If there's a selector, should have a $match stage + if (pipeline.length > 0) { + test.isTrue(pipeline[0].$match !== undefined); + } + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - _projectionFn works correctly', + async function (test) { + const c = makeCollection(); + + const handle = await c.find({}, { fields: { a: 1, b: 1 } }).observeChanges({ + added: function () { } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + const driver = handle._multiplexer._observeDriver; + + // Test projection function + const doc = { _id: 'test', a: 1, b: 2, c: 3 }; + const projected = driver._projectionFn(doc); + + test.equal(projected.a, 1); + test.equal(projected.b, 2); + test.isUndefined(projected.c); + test.isUndefined(projected._id); // _id should be removed by projection + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - _addStopCallback validates input', + async function (test) { + const c = makeCollection(); + + const handle = await c.find({}).observeChanges({ + added: function () { } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + const driver = handle._multiplexer._observeDriver; + + // Should throw on non-function + try { + driver._addStopCallback('not a function'); + test.fail('Should throw on non-function'); + } catch (e) { + test.isTrue(e.message.includes('function')); + } + + // Should accept function + const callbackCount = driver._stopCallbacks.length; + driver._addStopCallback(() => { }); + test.equal(driver._stopCallbacks.length, callbackCount + 1); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - driver has correct initial state', + async function (test) { + const c = makeCollection(); + + const handle = await c.find({}).observeChanges({ + added: function () { } + }); + + test.isTrue(isChangeStreamDriver(handle)); + + const driver = handle._multiplexer._observeDriver; + + // Check initial state properties + test.isTrue(driver._usesChangeStreams); + test.isFalse(driver._stopped); + test.isTrue(Array.isArray(driver._stopCallbacks)); + test.isTrue(Array.isArray(driver._pendingWrites)); + test.isTrue(Array.isArray(driver._writesToCommitWhenReady)); + test.isTrue(Array.isArray(driver._catchingUpResolvers)); + test.isTrue(typeof driver._projectionFn === 'function'); + + handle.stop(); + } +); + +Tinytest.addAsync( + 'changestream - supports single document query by _id', + async function (test) { + const c = makeCollection(); + const results = []; + + // Insert some documents + const targetId = await c.insertAsync({ name: 'target', value: 1 }); + await c.insertAsync({ name: 'other', value: 2 }); + + // Query by single _id + const handle = await c.find(targetId).observeChanges({ + added: function (id, fields) { + results.push({ id, fields }); + }, + changed: function (id, fields) { + results.push({ type: 'changed', id, fields }); } + }); - handle.stop(); + test.isTrue(isChangeStreamDriver(handle)); + + // Wait for initial add + await waitFor(() => results.length > 0); + + test.equal(results.length, 1); + test.equal(results[0].fields.name, 'target'); + + results.length = 0; + + // Update the target document + await c.updateAsync(targetId, { $set: { value: 100 } }); + + await waitFor(() => results.length > 0); + + test.isTrue(results.some(r => r.type === 'changed')); + + // Update the other document - should NOT trigger callback + results.length = 0; + await c.updateAsync({ name: 'other' }, { $set: { value: 200 } }); + + await new Promise(r => setTimeout(r, 300)); + test.equal(results.length, 0, 'Should not receive events for other documents'); + + handle.stop(); + } +); + +// Run `fn` inside a write fence; return the fence so tests can inspect it +// before/after it fires. Inside `fn`, writes will annotate the fence. +const withFence = async (fn) => { + const fence = new DDPServer._WriteFence(); + await DDPServer._CurrentWriteFence.withValue(fence, async () => { + await fn(fence); + }); + await fence.armAndWait(); + return fence; +}; + +const isBsonTimestamp = (ts) => + ts != null && typeof ts === 'object' + && typeof ts.t === 'number' && typeof ts.i === 'number'; + +Tinytest.addAsync( + 'changestream - insertAsync annotates fence with per-collection ts', + async function (test) { + const c = makeCollection(); + let snapshot = null; + const fence = await withFence(async (f) => { + await c.insertAsync({ name: 'n1' }); + snapshot = f._csTargetTsByCollection && { ...f._csTargetTsByCollection }; + }); + test.isTrue(snapshot !== null, 'fence should have been annotated during the fn'); + test.isTrue(snapshot[c._name] !== undefined, 'map should contain the collection name key'); + test.isTrue(isBsonTimestamp(snapshot[c._name]), 'annotation value should be a BSON Timestamp'); + test.isTrue(fence.fired, 'fence should have fired'); + } +); + +Tinytest.addAsync( + 'changestream- updateAsync annotates fence', + async function (test) { + const c = makeCollection(); + const id = await c.insertAsync({ name: 'before' }); + let snapshot = null; + await withFence(async (f) => { + await c.updateAsync(id, { $set: { name: 'after' } }); + snapshot = f._csTargetTsByCollection && { ...f._csTargetTsByCollection }; + }); + test.isTrue(snapshot !== null); + test.isTrue(isBsonTimestamp(snapshot[c._name]), 'update should annotate with a Timestamp'); + } +); + +Tinytest.addAsync( + 'changestream- removeAsync annotates fence', + async function (test) { + const c = makeCollection(); + const id = await c.insertAsync({ name: 'doomed' }); + let snapshot = null; + await withFence(async (f) => { + await c.removeAsync(id); + snapshot = f._csTargetTsByCollection && { ...f._csTargetTsByCollection }; + }); + test.isTrue(snapshot !== null); + test.isTrue(isBsonTimestamp(snapshot[c._name]), 'remove should annotate with a Timestamp'); + } +); + +Tinytest.addAsync( + 'changestream- writes to different collections create separate entries', + async function (test) { + const a = makeCollection(); + const b = makeCollection(); + let snapshot = null; + await withFence(async (f) => { + await a.insertAsync({ n: 1 }); + await b.insertAsync({ n: 1 }); + snapshot = f._csTargetTsByCollection && { ...f._csTargetTsByCollection }; + }); + test.isTrue(snapshot !== null); + test.isTrue(isBsonTimestamp(snapshot[a._name]), 'collection A should be annotated'); + test.isTrue(isBsonTimestamp(snapshot[b._name]), 'collection B should be annotated'); + test.notEqual(a._name, b._name, 'sanity: distinct collection names'); + } +); + +Tinytest.addAsync( + 'changestream- two writes to same collection keep the later ts', + async function (test) { + const c = makeCollection(); + let tsFirst = null; + let tsFinal = null; + await withFence(async (f) => { + await c.insertAsync({ n: 1 }); + tsFirst = { ...f._csTargetTsByCollection[c._name] }; + await c.insertAsync({ n: 2 }); + tsFinal = { ...f._csTargetTsByCollection[c._name] }; + }); + test.isTrue(isBsonTimestamp(tsFirst) && isBsonTimestamp(tsFinal)); + const firstLessOrEqual = tsFirst.t < tsFinal.t + || (tsFirst.t === tsFinal.t && tsFirst.i <= tsFinal.i); + test.isTrue( + firstLessOrEqual, + `later write should have ts >= earlier; got ${JSON.stringify(tsFirst)} → ${JSON.stringify(tsFinal)}` + ); + } +); + +Tinytest.addAsync( + 'changestream- writes without an active fence do not throw', + async function (test) { + // No withFence wrapper: _getCurrentFence() returns undefined, + // _annotateFenceWithWriteTs short-circuits. + const c = makeCollection(); + let threw = null; + try { + await c.insertAsync({ n: 1 }); + await c.updateAsync({ n: 1 }, { $set: { n: 2 } }); + await c.removeAsync({ n: 2 }); + } catch (e) { + threw = e; } - ); + test.isNull(threw, 'writes without a fence should succeed silently'); + } +); - Tinytest.addAsync( - 'changestream - _projectionFn works correctly', - async function(test) { - const c = makeCollection(); +Tinytest.addAsync( + 'changestream- _waitUntilCaughtUp returns fast when fence ts is already processed', + async function (test) { + const c = makeCollection(); + const handle = await c.find({}).observeChanges({ added: function () { } }); + test.isTrue(isChangeStreamDriver(handle)); + const driver = handle._multiplexer._observeDriver; - const handle = await c.find({}, { fields: { a: 1, b: 1 } }).observeChanges({ - added: function() {} - }); + // Seed: insert so the driver's _lastProcessedOperationTime advances. + await c.insertAsync({ n: 1 }); + await waitFor(() => driver._lastProcessedOperationTime !== null, 2000); - test.isTrue(isChangeStreamDriver(handle)); + // Build a fake fence whose target ts is <= _lastProcessedOperationTime. + // _waitUntilCaughtUp should hit the 'already-caught-up' early exit + // without falling back to the (network) _getServerOperationTime call. + const pastTs = driver._lastProcessedOperationTime; + const fakeFence = { _csTargetTsByCollection: { [c._name]: pastTs } }; - const driver = handle._multiplexer._observeDriver; + const t0 = Date.now(); + await driver._waitUntilCaughtUp(fakeFence); + const elapsed = Date.now() - t0; - // Test projection function - const doc = { _id: 'test', a: 1, b: 2, c: 3 }; - const projected = driver._projectionFn(doc); + test.isTrue(elapsed < 50, `fence hit should short-circuit fast; elapsed=${elapsed}ms`); + handle.stop(); + } +); - test.equal(projected.a, 1); - test.equal(projected.b, 2); - test.isUndefined(projected.c); - test.isUndefined(projected._id); // _id should be removed by projection +Tinytest.addAsync( + 'changestream- _waitUntilCaughtUp falls back to server ping when no fence annotation', + async function (test) { + const c = makeCollection(); + const handle = await c.find({}).observeChanges({ added: function () { } }); + test.isTrue(isChangeStreamDriver(handle)); + const driver = handle._multiplexer._observeDriver; - handle.stop(); - } - ); + // Undefined fence → fallback path. Should complete without timing out. + const t0 = Date.now(); + await driver._waitUntilCaughtUp(undefined); + const elapsed = Date.now() - t0; - Tinytest.addAsync( - 'changestream - _addStopCallback validates input', - async function(test) { - const c = makeCollection(); + test.isTrue(elapsed < 1000, `fallback path should not hit the safety timeout; elapsed=${elapsed}ms`); + handle.stop(); + } +); - const handle = await c.find({}).observeChanges({ - added: function() {} - }); +Tinytest.addAsync( + 'changestream- _waitUntilCaughtUp ignores annotation for a different collection', + async function (test) { + // If _waitUntilCaughtUp took ts from another collection, we'd spin + // waiting for a clusterTime this driver's stream never observes. + // The correct behaviour is to ignore that key and fall back. + const c = makeCollection(); + const handle = await c.find({}).observeChanges({ added: function () { } }); + test.isTrue(isChangeStreamDriver(handle)); + const driver = handle._multiplexer._observeDriver; - test.isTrue(isChangeStreamDriver(handle)); + const farFutureTs = { t: Math.floor(Date.now() / 1000) + 3600, i: 1 }; + const strayFence = { + _csTargetTsByCollection: { ['not_' + c._name]: farFutureTs }, + }; - const driver = handle._multiplexer._observeDriver; + const t0 = Date.now(); + await driver._waitUntilCaughtUp(strayFence); + const elapsed = Date.now() - t0; - // Should throw on non-function - try { - driver._addStopCallback('not a function'); - test.fail('Should throw on non-function'); - } catch (e) { - test.isTrue(e.message.includes('function')); - } + test.isTrue( + elapsed < 1000, + `annotation for another collection should be ignored (no timeout); elapsed=${elapsed}ms` + ); + handle.stop(); + } +); - // Should accept function - const callbackCount = driver._stopCallbacks.length; - driver._addStopCallback(() => {}); - test.equal(driver._stopCallbacks.length, callbackCount + 1); +Tinytest.addAsync( + 'changestream- annotation is cleared after fence fires (with active observer)', + async function (test) { + const c = makeCollection(); + const handle = await c.find({}).observeChanges({ added: function () { } }); + test.isTrue(isChangeStreamDriver(handle)); - handle.stop(); - } - ); + const fence = await withFence(async () => { + await c.insertAsync({ n: 1 }); + }); - Tinytest.addAsync( - 'changestream - driver has correct initial state', - async function(test) { - const c = makeCollection(); + // After onBeforeFire ran, cleanup `delete fence._csTargetTsByCollection` + // should have removed the map. + test.isTrue(fence.fired, 'fence should have fired'); + test.isUndefined( + fence._csTargetTsByCollection, + 'annotation map should be cleared after drivers caught up' + ); + handle.stop(); + } +); - const handle = await c.find({}).observeChanges({ - added: function() {} - }); +Tinytest.addAsync( + 'changestream- timeout default is 250ms unless overridden', + async function (test) { + const setting = Meteor.settings + && Meteor.settings.packages + && Meteor.settings.packages.mongo + && Meteor.settings.packages.mongo.changeStream + && Meteor.settings.packages.mongo.changeStream.waitUntilCaughtUpTimeoutMs; + const effective = setting ?? 250; + test.equal(effective, 250, 'pre-fix default of 1000ms should have been lowered to 250ms'); + } +); - test.isTrue(isChangeStreamDriver(handle)); +Tinytest.addAsync( + 'changestream- insert under fence with observer resolves well under 1s', + async function (test) { + // Pre-fix pathology was a hard ~2s wait (2x 1000ms timeout) because + // _waitUntilCaughtUp asked the server for a ts the stream hadn't seen + // yet. With the fix the fence carries the exact write ts, the change + // event carries the same ts, and the wait resolves immediately. + // 500ms bound catches a regression without flaking on slow CI. + const c = makeCollection(); + const added = []; + const handle = await c.find({}).observeChanges({ + added: function (id, fields) { added.push(fields); }, + }); + test.isTrue(isChangeStreamDriver(handle)); - const driver = handle._multiplexer._observeDriver; + await new Promise(r => setTimeout(r, 50)); + added.length = 0; - // Check initial state properties - test.isTrue(driver._usesChangeStreams); - test.isFalse(driver._stopped); - test.isTrue(Array.isArray(driver._stopCallbacks)); - test.isTrue(Array.isArray(driver._pendingWrites)); - test.isTrue(Array.isArray(driver._writesToCommitWhenReady)); - test.isTrue(Array.isArray(driver._catchingUpResolvers)); - test.isTrue(typeof driver._projectionFn === 'function'); + const t0 = Date.now(); + await withFence(async () => { + await c.insertAsync({ n: 'fenced-latency' }); + }); + const elapsed = Date.now() - t0; - handle.stop(); - } - ); + test.isTrue( + elapsed < 500, + `fenced insert+fire should be fast with the fix; elapsed=${elapsed}ms (pre-fix ~2000ms)` + ); - Tinytest.addAsync( - 'changestream - supports single document query by _id', - async function(test) { - const c = makeCollection(); - const results = []; - - // Insert some documents - const targetId = await c.insertAsync({ name: 'target', value: 1 }); - await c.insertAsync({ name: 'other', value: 2 }); - - // Query by single _id - const handle = await c.find(targetId).observeChanges({ - added: function(id, fields) { - results.push({ id, fields }); - }, - changed: function(id, fields) { - results.push({ type: 'changed', id, fields }); - } - }); - - test.isTrue(isChangeStreamDriver(handle)); - - // Wait for initial add - await waitFor(() => results.length > 0); - - test.equal(results.length, 1); - test.equal(results[0].fields.name, 'target'); - - results.length = 0; - - // Update the target document - await c.updateAsync(targetId, { $set: { value: 100 } }); - - await waitFor(() => results.length > 0); - - test.isTrue(results.some(r => r.type === 'changed')); - - // Update the other document - should NOT trigger callback - results.length = 0; - await c.updateAsync({ name: 'other' }, { $set: { value: 200 } }); - - await new Promise(r => setTimeout(r, 300)); - test.equal(results.length, 0, 'Should not receive events for other documents'); - - handle.stop(); - } - ); + const sawInsert = await waitFor( + () => added.some(f => f.n === 'fenced-latency'), + 2000 + ); + test.isTrue(sawInsert, 'observer should have received the fenced insert'); + handle.stop(); + } +);