attempt to trigger race condition

This commit is contained in:
Leonardo Venturini
2024-12-20 17:59:25 -04:00
parent 73c7979993
commit d0ed35ab8c
3 changed files with 46 additions and 18 deletions

View File

@@ -537,23 +537,43 @@ Tinytest.addAsync('livedata server - publication stop should not throw error', a
this.added('issueUnreadCount', user._id, {count});
this.onStop(() => {
console.log('onStop');
return handle.stop();
handle.stop();
});
this.ready();
}
});
sub = conn.subscribe(publicationName);
await sleep(100);
// Create multiple competing subscriptions
const sub1 = conn.subscribe(publicationName);
const sub2 = conn.subscribe(publicationName);
const sub3 = conn.subscribe(publicationName);
// Make changes that will affect all subs
await coll.insertAsync({ _id: 'item_10', title: 'Item #10' });
await sleep(100);
// Stop middle subscription during changes
sub2.stop();
await coll.insertAsync({ _id: 'item_11', title: 'Item #11' });
// Create new subscription while changes happening
const sub4 = conn.subscribe(publicationName);
await coll.removeAsync({ _id: 'item_10' });
sub1.stop();
await coll.insertAsync({ _id: 'item_12', title: 'Item #12' });
// Final subscription during teardown of others
const sub5 = conn.subscribe(publicationName);
sub3.stop();
sub4.stop();
await coll.removeAsync({ _id: 'item_0' });
await sleep(100);
await sleep(50);
sub.stop();
sub5.stop();
console.log(messages);

View File

@@ -1,17 +1,16 @@
import { Meteor } from 'meteor/meteor';
import path from 'path';
import { DocFetcher } from './doc_fetcher';
import { ObserveMultiplexer } from './observe_multiplex';
import { ObserveHandle } from './observe_handle';
import { OPLOG_COLLECTION, OplogHandle } from './oplog_tailing';
import { CLIENT_ONLY_METHODS, getAsyncMethodName } from 'meteor/minimongo/constants';
import { OplogObserveDriver } from './oplog_observe_driver';
import { PollingObserveDriver } from './polling_observe_driver';
import { replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common';
import path from 'path';
import { AsynchronousCursor } from './asynchronous_cursor';
import { MongoDB } from './mongo_common';
import { Cursor } from './cursor';
import { CursorDescription } from './cursor_description';
import { DocFetcher } from './doc_fetcher';
import { MongoDB, replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common';
import { ObserveHandle } from './observe_handle';
import { ObserveMultiplexer } from './observe_multiplex';
import { OplogObserveDriver } from './oplog_observe_driver';
import { OPLOG_COLLECTION, OplogHandle } from './oplog_tailing';
import { PollingObserveDriver } from './polling_observe_driver';
const FILE_ASSET_SUFFIX = 'Asset';
const ASSETS_FOLDER = 'assets';
@@ -841,6 +840,10 @@ Object.assign(MongoConnection.prototype, {
}
});
}
if (!multiplexer) {
throw new Error("multiplexer is required");
}
var observeHandle = new ObserveHandle(multiplexer,
callbacks,

View File

@@ -27,7 +27,12 @@ export class ObserveHandle<T = any> {
_movedBefore?: Callback<T>;
_removed?: Callback<T>;
constructor(multiplexer: any, callbacks: Record<ObserveHandleCallback, Callback<T>>, nonMutatingCallbacks: boolean) {
constructor(multiplexer: ObserveMultiplexer, callbacks: Record<ObserveHandleCallback, Callback<T>>, nonMutatingCallbacks: boolean) {
if (!multiplexer) {
throw new Error("Multiplexer is required");
}
this._multiplexer = multiplexer;
multiplexer.callbackNames().forEach((name: ObserveHandleCallback) => {