From d0ed35ab8c1ebcdcb66ebe6d7f64199bf414f2e4 Mon Sep 17 00:00:00 2001 From: Leonardo Venturini Date: Fri, 20 Dec 2024 17:59:25 -0400 Subject: [PATCH] attempt to trigger race condition --- packages/ddp-server/livedata_server_tests.js | 36 +++++++++++++++----- packages/mongo/mongo_connection.js | 21 +++++++----- packages/mongo/observe_handle.ts | 7 +++- 3 files changed, 46 insertions(+), 18 deletions(-) diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index 4f3f9b99dd..d6315848c3 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -537,23 +537,43 @@ Tinytest.addAsync('livedata server - publication stop should not throw error', a this.added('issueUnreadCount', user._id, {count}); this.onStop(() => { console.log('onStop'); - return handle.stop(); + handle.stop(); }); this.ready(); } }); - sub = conn.subscribe(publicationName); - - await sleep(100); + // Create multiple competing subscriptions + const sub1 = conn.subscribe(publicationName); + const sub2 = conn.subscribe(publicationName); + const sub3 = conn.subscribe(publicationName); + // Make changes that will affect all subs await coll.insertAsync({ _id: 'item_10', title: 'Item #10' }); - await sleep(100); + + // Stop middle subscription during changes + sub2.stop(); + + await coll.insertAsync({ _id: 'item_11', title: 'Item #11' }); + + // Create new subscription while changes happening + const sub4 = conn.subscribe(publicationName); + + await coll.removeAsync({ _id: 'item_10' }); + + sub1.stop(); + + await coll.insertAsync({ _id: 'item_12', title: 'Item #12' }); + + // Final subscription during teardown of others + const sub5 = conn.subscribe(publicationName); + + sub3.stop(); + sub4.stop(); - await coll.removeAsync({ _id: 'item_0' }); - await sleep(100); + await sleep(50); - sub.stop(); + sub5.stop(); console.log(messages); diff --git a/packages/mongo/mongo_connection.js b/packages/mongo/mongo_connection.js index 47e0f91f75..58760b605a 100644 --- a/packages/mongo/mongo_connection.js +++ b/packages/mongo/mongo_connection.js @@ -1,17 +1,16 @@ import { Meteor } from 'meteor/meteor'; -import path from 'path'; -import { DocFetcher } from './doc_fetcher'; -import { ObserveMultiplexer } from './observe_multiplex'; -import { ObserveHandle } from './observe_handle'; -import { OPLOG_COLLECTION, OplogHandle } from './oplog_tailing'; import { CLIENT_ONLY_METHODS, getAsyncMethodName } from 'meteor/minimongo/constants'; -import { OplogObserveDriver } from './oplog_observe_driver'; -import { PollingObserveDriver } from './polling_observe_driver'; -import { replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common'; +import path from 'path'; import { AsynchronousCursor } from './asynchronous_cursor'; -import { MongoDB } from './mongo_common'; import { Cursor } from './cursor'; import { CursorDescription } from './cursor_description'; +import { DocFetcher } from './doc_fetcher'; +import { MongoDB, replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common'; +import { ObserveHandle } from './observe_handle'; +import { ObserveMultiplexer } from './observe_multiplex'; +import { OplogObserveDriver } from './oplog_observe_driver'; +import { OPLOG_COLLECTION, OplogHandle } from './oplog_tailing'; +import { PollingObserveDriver } from './polling_observe_driver'; const FILE_ASSET_SUFFIX = 'Asset'; const ASSETS_FOLDER = 'assets'; @@ -841,6 +840,10 @@ Object.assign(MongoConnection.prototype, { } }); } + + if (!multiplexer) { + throw new Error("multiplexer is required"); + } var observeHandle = new ObserveHandle(multiplexer, callbacks, diff --git a/packages/mongo/observe_handle.ts b/packages/mongo/observe_handle.ts index 66235fa920..7154ba1395 100644 --- a/packages/mongo/observe_handle.ts +++ b/packages/mongo/observe_handle.ts @@ -27,7 +27,12 @@ export class ObserveHandle { _movedBefore?: Callback; _removed?: Callback; - constructor(multiplexer: any, callbacks: Record>, nonMutatingCallbacks: boolean) { + constructor(multiplexer: ObserveMultiplexer, callbacks: Record>, nonMutatingCallbacks: boolean) { + + if (!multiplexer) { + throw new Error("Multiplexer is required"); + } + this._multiplexer = multiplexer; multiplexer.callbackNames().forEach((name: ObserveHandleCallback) => {