From a9f2656e22ca36f293c6d0b74867ddffb3faa306 Mon Sep 17 00:00:00 2001 From: Leonardo Venturini Date: Mon, 4 Nov 2024 18:14:29 -0400 Subject: [PATCH] fix race condition in oplog tailing --- packages/mongo/oplog_tailing.ts | 156 ++++++++++++++++---------------- 1 file changed, 79 insertions(+), 77 deletions(-) diff --git a/packages/mongo/oplog_tailing.ts b/packages/mongo/oplog_tailing.ts index 440339fa6f..d56bf8e8eb 100644 --- a/packages/mongo/oplog_tailing.ts +++ b/packages/mongo/oplog_tailing.ts @@ -11,7 +11,7 @@ export const OPLOG_COLLECTION = 'oplog.rs'; let TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000); const TAIL_TIMEOUT = +(process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000); -interface OplogEntry { +export interface OplogEntry { op: string; o: any; o2?: any; @@ -19,12 +19,12 @@ interface OplogEntry { ns: string; } -interface CatchingUpResolver { +export interface CatchingUpResolver { ts: any; resolver: () => void; } -interface OplogTrigger { +export interface OplogTrigger { dropCollection: boolean; dropDatabase: boolean; op: OplogEntry; @@ -34,7 +34,7 @@ interface OplogTrigger { export class OplogHandle { private _oplogUrl: string; - private _dbName: string; + public _dbName: string; private _oplogLastEntryConnection: MongoConnection | null; private _oplogTailConnection: MongoConnection | null; private _oplogOptions: { excludeCollections?: string[]; includeCollections?: string[] } | null; @@ -42,16 +42,18 @@ export class OplogHandle { private _tailHandle: any; private _readyPromiseResolver: (() => void) | null; private _readyPromise: Promise; - private _crossbar: any; + public _crossbar: any; private _baseOplogSelector: any; private _catchingUpResolvers: CatchingUpResolver[]; private _lastProcessedTS: any; private _onSkippedEntriesHook: any; - private _entryQueue: any; - private _workerActive: boolean; private _startTrailingPromise: Promise; private _resolveTimeout: any; + private _entryQueue = new Meteor._DoubleEndedQueue(); + private _workerActive = false; + private _workerPromise: Promise | null = null; + constructor(oplogUrl: string, dbName: string) { this._oplogUrl = oplogUrl; this._dbName = dbName; @@ -90,10 +92,6 @@ export class OplogHandle { debugPrintExceptions: "onSkippedEntries callback" }); - // @ts-ignore - this._entryQueue = new Meteor._DoubleEndedQueue(); - this._workerActive = false; - this._startTrailingPromise = this._startTailing(); } @@ -298,64 +296,11 @@ export class OplogHandle { } private _maybeStartWorker(): void { - if (this._workerActive) return; + if (this._workerPromise) return; this._workerActive = true; - Meteor.defer(async () => { - // May be called recursively in case of transactions. - const handleDoc = async (doc: OplogEntry): Promise => { - if (doc.ns === "admin.$cmd") { - if (doc.o.applyOps) { - // This was a successful transaction, so we need to apply the - // operations that were involved. - let nextTimestamp = doc.ts; - for (const op of doc.o.applyOps) { - // See https://github.com/meteor/meteor/issues/10420. - if (!op.ts) { - op.ts = nextTimestamp; - nextTimestamp = nextTimestamp.add(Long.ONE); - } - await handleDoc(op); - } - return; - } - throw new Error("Unknown command " + JSON.stringify(doc)); - } - - const trigger: OplogTrigger = { - dropCollection: false, - dropDatabase: false, - op: doc, - }; - - if (typeof doc.ns === "string" && doc.ns.startsWith(this._dbName + ".")) { - trigger.collection = doc.ns.slice(this._dbName.length + 1); - } - - // Is it a special command and the collection name is hidden - // somewhere in operator? - if (trigger.collection === "$cmd") { - if (doc.o.dropDatabase) { - delete trigger.collection; - trigger.dropDatabase = true; - } else if ("drop" in doc.o) { - trigger.collection = doc.o.drop; - trigger.dropCollection = true; - trigger.id = null; - } else if ("create" in doc.o && "idIndex" in doc.o) { - // A collection got implicitly created within a transaction. There's - // no need to do anything about it. - } else { - throw Error("Unknown command " + JSON.stringify(doc)); - } - } else { - // All other ops have an id. - trigger.id = idForOp(doc); - } - - await this._crossbar.fire(trigger); - }; - + // Convert to a proper promise-based queue processor + this._workerPromise = (async () => { try { while (!this._stopped && !this._entryQueue.isEmpty()) { // Are we too far behind? Just tell our observers that they need to @@ -375,23 +320,25 @@ export class OplogHandle { continue; } + // Process next batch from the queue const doc = this._entryQueue.shift(); - // Fire trigger(s) for this doc. - await handleDoc(doc); - - // Now that we've processed this operation, process pending - // sequencers. - if (doc.ts) { - this._setLastProcessedTS(doc.ts); - } else { - throw Error("oplog entry without ts: " + JSON.stringify(doc)); + try { + await handleDoc(this, doc); + // Process any waiting fence callbacks + if (doc.ts) { + this._setLastProcessedTS(doc.ts); + } + } catch (e) { + // Keep processing queue even if one entry fails + console.error('Error processing oplog entry:', e); } } } finally { + this._workerPromise = null; this._workerActive = false; } - }); + })(); } _setLastProcessedTS(ts: any): void { @@ -422,3 +369,58 @@ export function idForOp(op: OplogEntry): string { throw Error("Unknown op: " + JSON.stringify(op)); } } + +async function handleDoc(handle: OplogHandle, doc: OplogEntry): Promise { + if (doc.ns === "admin.$cmd") { + if (doc.o.applyOps) { + // This was a successful transaction, so we need to apply the + // operations that were involved. + let nextTimestamp = doc.ts; + for (const op of doc.o.applyOps) { + // See https://github.com/meteor/meteor/issues/10420. + if (!op.ts) { + op.ts = nextTimestamp; + nextTimestamp = nextTimestamp.add(Long.ONE); + } + await handleDoc(handle, op); + } + return; + } + throw new Error("Unknown command " + JSON.stringify(doc)); + } + + const trigger: OplogTrigger = { + dropCollection: false, + dropDatabase: false, + op: doc, + }; + + if (typeof doc.ns === "string" && doc.ns.startsWith(handle._dbName + ".")) { + trigger.collection = doc.ns.slice(handle._dbName.length + 1); + } + + // Is it a special command and the collection name is hidden + // somewhere in operator? + if (trigger.collection === "$cmd") { + if (doc.o.dropDatabase) { + delete trigger.collection; + trigger.dropDatabase = true; + } else if ("drop" in doc.o) { + trigger.collection = doc.o.drop; + trigger.dropCollection = true; + trigger.id = null; + } else if ("create" in doc.o && "idIndex" in doc.o) { + // A collection got implicitly created within a transaction. There's + // no need to do anything about it. + } else { + throw Error("Unknown command " + JSON.stringify(doc)); + } + } else { + // All other ops have an id. + trigger.id = idForOp(doc); + } + + await handle._crossbar.fire(trigger); + + await new Promise(resolve => setImmediate(resolve)); +} \ No newline at end of file