fix race condition in oplog tailing

This commit is contained in:
Leonardo Venturini
2024-11-04 18:14:29 -04:00
parent c56361f112
commit a9f2656e22

View File

@@ -11,7 +11,7 @@ export const OPLOG_COLLECTION = 'oplog.rs';
let TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000);
const TAIL_TIMEOUT = +(process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000);
interface OplogEntry {
export interface OplogEntry {
op: string;
o: any;
o2?: any;
@@ -19,12 +19,12 @@ interface OplogEntry {
ns: string;
}
interface CatchingUpResolver {
export interface CatchingUpResolver {
ts: any;
resolver: () => void;
}
interface OplogTrigger {
export interface OplogTrigger {
dropCollection: boolean;
dropDatabase: boolean;
op: OplogEntry;
@@ -34,7 +34,7 @@ interface OplogTrigger {
export class OplogHandle {
private _oplogUrl: string;
private _dbName: string;
public _dbName: string;
private _oplogLastEntryConnection: MongoConnection | null;
private _oplogTailConnection: MongoConnection | null;
private _oplogOptions: { excludeCollections?: string[]; includeCollections?: string[] } | null;
@@ -42,16 +42,18 @@ export class OplogHandle {
private _tailHandle: any;
private _readyPromiseResolver: (() => void) | null;
private _readyPromise: Promise<void>;
private _crossbar: any;
public _crossbar: any;
private _baseOplogSelector: any;
private _catchingUpResolvers: CatchingUpResolver[];
private _lastProcessedTS: any;
private _onSkippedEntriesHook: any;
private _entryQueue: any;
private _workerActive: boolean;
private _startTrailingPromise: Promise<void>;
private _resolveTimeout: any;
private _entryQueue = new Meteor._DoubleEndedQueue();
private _workerActive = false;
private _workerPromise: Promise<void> | null = null;
constructor(oplogUrl: string, dbName: string) {
this._oplogUrl = oplogUrl;
this._dbName = dbName;
@@ -90,10 +92,6 @@ export class OplogHandle {
debugPrintExceptions: "onSkippedEntries callback"
});
// @ts-ignore
this._entryQueue = new Meteor._DoubleEndedQueue();
this._workerActive = false;
this._startTrailingPromise = this._startTailing();
}
@@ -298,64 +296,11 @@ export class OplogHandle {
}
private _maybeStartWorker(): void {
if (this._workerActive) return;
if (this._workerPromise) return;
this._workerActive = true;
Meteor.defer(async () => {
// May be called recursively in case of transactions.
const handleDoc = async (doc: OplogEntry): Promise<void> => {
if (doc.ns === "admin.$cmd") {
if (doc.o.applyOps) {
// This was a successful transaction, so we need to apply the
// operations that were involved.
let nextTimestamp = doc.ts;
for (const op of doc.o.applyOps) {
// See https://github.com/meteor/meteor/issues/10420.
if (!op.ts) {
op.ts = nextTimestamp;
nextTimestamp = nextTimestamp.add(Long.ONE);
}
await handleDoc(op);
}
return;
}
throw new Error("Unknown command " + JSON.stringify(doc));
}
const trigger: OplogTrigger = {
dropCollection: false,
dropDatabase: false,
op: doc,
};
if (typeof doc.ns === "string" && doc.ns.startsWith(this._dbName + ".")) {
trigger.collection = doc.ns.slice(this._dbName.length + 1);
}
// Is it a special command and the collection name is hidden
// somewhere in operator?
if (trigger.collection === "$cmd") {
if (doc.o.dropDatabase) {
delete trigger.collection;
trigger.dropDatabase = true;
} else if ("drop" in doc.o) {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else if ("create" in doc.o && "idIndex" in doc.o) {
// A collection got implicitly created within a transaction. There's
// no need to do anything about it.
} else {
throw Error("Unknown command " + JSON.stringify(doc));
}
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
await this._crossbar.fire(trigger);
};
// Convert to a proper promise-based queue processor
this._workerPromise = (async () => {
try {
while (!this._stopped && !this._entryQueue.isEmpty()) {
// Are we too far behind? Just tell our observers that they need to
@@ -375,23 +320,25 @@ export class OplogHandle {
continue;
}
// Process next batch from the queue
const doc = this._entryQueue.shift();
// Fire trigger(s) for this doc.
await handleDoc(doc);
// Now that we've processed this operation, process pending
// sequencers.
if (doc.ts) {
this._setLastProcessedTS(doc.ts);
} else {
throw Error("oplog entry without ts: " + JSON.stringify(doc));
try {
await handleDoc(this, doc);
// Process any waiting fence callbacks
if (doc.ts) {
this._setLastProcessedTS(doc.ts);
}
} catch (e) {
// Keep processing queue even if one entry fails
console.error('Error processing oplog entry:', e);
}
}
} finally {
this._workerPromise = null;
this._workerActive = false;
}
});
})();
}
_setLastProcessedTS(ts: any): void {
@@ -422,3 +369,58 @@ export function idForOp(op: OplogEntry): string {
throw Error("Unknown op: " + JSON.stringify(op));
}
}
async function handleDoc(handle: OplogHandle, doc: OplogEntry): Promise<void> {
if (doc.ns === "admin.$cmd") {
if (doc.o.applyOps) {
// This was a successful transaction, so we need to apply the
// operations that were involved.
let nextTimestamp = doc.ts;
for (const op of doc.o.applyOps) {
// See https://github.com/meteor/meteor/issues/10420.
if (!op.ts) {
op.ts = nextTimestamp;
nextTimestamp = nextTimestamp.add(Long.ONE);
}
await handleDoc(handle, op);
}
return;
}
throw new Error("Unknown command " + JSON.stringify(doc));
}
const trigger: OplogTrigger = {
dropCollection: false,
dropDatabase: false,
op: doc,
};
if (typeof doc.ns === "string" && doc.ns.startsWith(handle._dbName + ".")) {
trigger.collection = doc.ns.slice(handle._dbName.length + 1);
}
// Is it a special command and the collection name is hidden
// somewhere in operator?
if (trigger.collection === "$cmd") {
if (doc.o.dropDatabase) {
delete trigger.collection;
trigger.dropDatabase = true;
} else if ("drop" in doc.o) {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else if ("create" in doc.o && "idIndex" in doc.o) {
// A collection got implicitly created within a transaction. There's
// no need to do anything about it.
} else {
throw Error("Unknown command " + JSON.stringify(doc));
}
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
await handle._crossbar.fire(trigger);
await new Promise(resolve => setImmediate(resolve));
}