diff --git a/packages/mongo/asynchronous_cursor.js b/packages/mongo/asynchronous_cursor.js new file mode 100644 index 0000000000..d18dba5faf --- /dev/null +++ b/packages/mongo/asynchronous_cursor.js @@ -0,0 +1,155 @@ +import LocalCollection from 'meteor/minimongo/local_collection'; +import { replaceMongoAtomWithMeteor, replaceTypes } from './mongo_common'; + +/** + * This is just a light wrapper for the cursor. The goal here is to ensure compatibility even if + * there are breaking changes on the MongoDB driver. + * + * @constructor + */ +export class AsynchronousCursor { + constructor(dbCursor, cursorDescription, options) { + this._dbCursor = dbCursor; + this._cursorDescription = cursorDescription; + + this._selfForIteration = options.selfForIteration || this; + if (options.useTransform && cursorDescription.options.transform) { + this._transform = LocalCollection.wrapTransform( + cursorDescription.options.transform); + } else { + this._transform = null; + } + + this._visitedIds = new LocalCollection._IdMap; + } + + [Symbol.asyncIterator]() { + var cursor = this; + return { + async next() { + const value = await cursor._nextObjectPromise(); + return { done: !value, value }; + }, + }; + } + + // Returns a Promise for the next object from the underlying cursor (before + // the Mongo->Meteor type replacement). + async _rawNextObjectPromise() { + try { + return this._dbCursor.next(); + } catch (e) { + console.error(e); + } + } + + // Returns a Promise for the next object from the cursor, skipping those whose + // IDs we've already seen and replacing Mongo atoms with Meteor atoms. + async _nextObjectPromise () { + while (true) { + var doc = await this._rawNextObjectPromise(); + + if (!doc) return null; + doc = replaceTypes(doc, replaceMongoAtomWithMeteor); + + if (!this._cursorDescription.options.tailable && _.has(doc, '_id')) { + // Did Mongo give us duplicate documents in the same cursor? If so, + // ignore this one. (Do this before the transform, since transform might + // return some unrelated value.) We don't do this for tailable cursors, + // because we want to maintain O(1) memory usage. And if there isn't _id + // for some reason (maybe it's the oplog), then we don't do this either. + // (Be careful to do this for falsey but existing _id, though.) + if (this._visitedIds.has(doc._id)) continue; + this._visitedIds.set(doc._id, true); + } + + if (this._transform) + doc = this._transform(doc); + + return doc; + } + } + + // Returns a promise which is resolved with the next object (like with + // _nextObjectPromise) or rejected if the cursor doesn't return within + // timeoutMS ms. + _nextObjectPromiseWithTimeout(timeoutMS) { + if (!timeoutMS) { + return this._nextObjectPromise(); + } + const nextObjectPromise = this._nextObjectPromise(); + const timeoutErr = new Error('Client-side timeout waiting for next object'); + const timeoutPromise = new Promise((resolve, reject) => { + setTimeout(() => { + reject(timeoutErr); + }, timeoutMS); + }); + return Promise.race([nextObjectPromise, timeoutPromise]) + .catch((err) => { + if (err === timeoutErr) { + this.close(); + } + throw err; + }); + } + + async forEach(callback, thisArg) { + // Get back to the beginning. + this._rewind(); + + let idx = 0; + while (true) { + const doc = await this._nextObjectPromise(); + if (!doc) return; + await callback.call(thisArg, doc, idx++, this._selfForIteration); + } + } + + async map(callback, thisArg) { + const results = []; + await this.forEach(async (doc, index) => { + results.push(await callback.call(thisArg, doc, index, this._selfForIteration)); + }); + + return results; + } + + _rewind() { + // known to be synchronous + this._dbCursor.rewind(); + + this._visitedIds = new LocalCollection._IdMap; + } + + // Mostly usable for tailable cursors. + close() { + this._dbCursor.close(); + } + + fetch() { + return this.map(_.identity); + } + + /** + * FIXME: (node:34680) [MONGODB DRIVER] Warning: cursor.count is deprecated and will be + * removed in the next major version, please use `collection.estimatedDocumentCount` or + * `collection.countDocuments` instead. + */ + count() { + return this._dbCursor.count(); + } + + // This method is NOT wrapped in Cursor. + async getRawObjects(ordered) { + var self = this; + if (ordered) { + return self.fetch(); + } else { + var results = new LocalCollection._IdMap; + await self.forEach(function (doc) { + results.set(doc._id, doc); + }); + return results; + } + } +} \ No newline at end of file diff --git a/packages/mongo/cursor.js b/packages/mongo/cursor.js new file mode 100644 index 0000000000..cc6826c616 --- /dev/null +++ b/packages/mongo/cursor.js @@ -0,0 +1,147 @@ +// CURSORS + +// There are several classes which relate to cursors: +// +// CursorDescription represents the arguments used to construct a cursor: +// collectionName, selector, and (find) options. Because it is used as a key +// for cursor de-dup, everything in it should either be JSON-stringifiable or +// not affect observeChanges output (eg, options.transform functions are not +// stringifiable but do not affect observeChanges). +// +// SynchronousCursor is a wrapper around a MongoDB cursor +// which includes fully-synchronous versions of forEach, etc. +// +// Cursor is the cursor object returned from find(), which implements the +// documented Mongo.Collection cursor API. It wraps a CursorDescription and a +// SynchronousCursor (lazily: it doesn't contact Mongo until you call a method +// like fetch or forEach on it). +// +// ObserveHandle is the "observe handle" returned from observeChanges. It has a +// reference to an ObserveMultiplexer. +// +// ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a +// single observe driver. +// +// There are two "observe drivers" which drive ObserveMultiplexers: +// - PollingObserveDriver caches the results of a query and reruns it when +// necessary. +// - OplogObserveDriver follows the Mongo operation log to directly observe +// database changes. +// Both implementations follow the same simple interface: when you create them, +// they start sending observeChanges callbacks (and a ready() invocation) to +// their ObserveMultiplexer, and you stop them by calling their stop() method. + +import { ASYNC_CURSOR_METHODS, getAsyncMethodName } from 'meteor/minimongo/constants'; +import { replaceMeteorAtomWithMongo, replaceTypes } from './mongo_common'; +import LocalCollection from 'meteor/minimongo/local_collection'; + +export const Cursor = function (mongo, cursorDescription) { + var self = this; + + self._mongo = mongo; + self._cursorDescription = cursorDescription; + self._synchronousCursor = null; +}; + +Cursor.prototype.countAsync = async function () { + const collection = this._mongo.rawCollection(this._cursorDescription.collectionName); + return await collection.countDocuments( + replaceTypes(this._cursorDescription.selector, replaceMeteorAtomWithMongo), + replaceTypes(this._cursorDescription.options, replaceMeteorAtomWithMongo), + ); +}; + +Cursor.prototype.count = function () { + throw new Error( + "count() is not available on the server. Please use countAsync() instead." + ); +}; + +[...ASYNC_CURSOR_METHODS, Symbol.iterator, Symbol.asyncIterator].forEach(methodName => { + // count is handled specially since we don't want to create a cursor. + // it is still included in ASYNC_CURSOR_METHODS because we still want an async version of it to exist. + if (methodName === 'count') { + return + } + Cursor.prototype[methodName] = function (...args) { + const cursor = setupAsynchronousCursor(this, methodName); + return cursor[methodName](...args); + }; + + // These methods are handled separately. + if (methodName === Symbol.iterator || methodName === Symbol.asyncIterator) { + return; + } + + const methodNameAsync = getAsyncMethodName(methodName); + Cursor.prototype[methodNameAsync] = function (...args) { + try { + return Promise.resolve(this[methodName](...args)); + } catch (error) { + return Promise.reject(error); + } + }; +}); + +Cursor.prototype.getTransform = function () { + return this._cursorDescription.options.transform; +}; + +// When you call Meteor.publish() with a function that returns a Cursor, we need +// to transmute it into the equivalent subscription. This is the function that +// does that. +Cursor.prototype._publishCursor = function (sub) { + var self = this; + var collection = self._cursorDescription.collectionName; + return Mongo.Collection._publishCursor(self, sub, collection); +}; + +// Used to guarantee that publish functions return at most one cursor per +// collection. Private, because we might later have cursors that include +// documents from multiple collections somehow. +Cursor.prototype._getCollectionName = function () { + var self = this; + return self._cursorDescription.collectionName; +}; + +Cursor.prototype.observe = function (callbacks) { + var self = this; + return LocalCollection._observeFromObserveChanges(self, callbacks); +}; + +Cursor.prototype.observeAsync = function (callbacks) { + return new Promise(resolve => resolve(this.observe(callbacks))); +}; + +Cursor.prototype.observeChanges = function (callbacks, options = {}) { + var self = this; + + var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks); + + return self._mongo._observeChanges( + self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks); +}; + +Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) { + return this.observeChanges(callbacks, options); +}; + +function setupAsynchronousCursor(cursor, method) { + // You can only observe a tailable cursor. + if (cursor._cursorDescription.options.tailable) + throw new Error('Cannot call ' + method + ' on a tailable cursor'); + + if (!cursor._synchronousCursor) { + cursor._synchronousCursor = cursor._mongo._createAsynchronousCursor( + cursor._cursorDescription, + { + // Make sure that the "cursor" argument to forEach/map callbacks is the + // Cursor, not the SynchronousCursor. + selfForIteration: cursor, + useTransform: true, + } + ); + } + + return cursor._synchronousCursor; +} \ No newline at end of file diff --git a/packages/mongo/cursor_description.ts b/packages/mongo/cursor_description.ts new file mode 100644 index 0000000000..3efa18b56f --- /dev/null +++ b/packages/mongo/cursor_description.ts @@ -0,0 +1,23 @@ +interface CursorOptions { + limit?: number; + skip?: number; + sort?: Record; + fields?: Record; + projection?: Record; + disableOplog?: boolean; + _disableOplog?: boolean; + tailable?: boolean; +} + +export class CursorDescription { + collectionName: string; + selector: Record; + options: CursorOptions; + + constructor(collectionName: string, selector: any, options?: CursorOptions) { + this.collectionName = collectionName; + // @ts-ignore + this.selector = Mongo.Collection._rewriteSelector(selector); + this.options = options || {}; + } +} \ No newline at end of file diff --git a/packages/mongo/mongo_common.js b/packages/mongo/mongo_common.js new file mode 100644 index 0000000000..53e037e86e --- /dev/null +++ b/packages/mongo/mongo_common.js @@ -0,0 +1,149 @@ +/** @type {import('mongodb')} */ +export const MongoDB = NpmModuleMongodb; + +// The write methods block until the database has confirmed the write (it may +// not be replicated or stable on disk, but one server has confirmed it) if no +// callback is provided. If a callback is provided, then they call the callback +// when the write is confirmed. They return nothing on success, and raise an +// exception on failure. +// +// After making a write (with insert, update, remove), observers are +// notified asynchronously. If you want to receive a callback once all +// of the observer notifications have landed for your write, do the +// writes inside a write fence (set DDPServer._CurrentWriteFence to a new +// _WriteFence, and then set a callback on the write fence.) +// +// Since our execution environment is single-threaded, this is +// well-defined -- a write "has been made" if it's returned, and an +// observer "has been notified" if its callback has returned. + +export const writeCallback = function (write, refresh, callback) { + return function (err, result) { + if (! err) { + // XXX We don't have to run this on error, right? + try { + refresh(); + } catch (refreshErr) { + if (callback) { + callback(refreshErr); + return; + } else { + throw refreshErr; + } + } + } + write.committed(); + if (callback) { + callback(err, result); + } else if (err) { + throw err; + } + }; +}; + + +export const transformResult = function (driverResult) { + var meteorResult = { numberAffected: 0 }; + if (driverResult) { + var mongoResult = driverResult.result; + // On updates with upsert:true, the inserted values come as a list of + // upserted values -- even with options.multi, when the upsert does insert, + // it only inserts one element. + if (mongoResult.upsertedCount) { + meteorResult.numberAffected = mongoResult.upsertedCount; + + if (mongoResult.upsertedId) { + meteorResult.insertedId = mongoResult.upsertedId; + } + } else { + // n was used before Mongo 5.0, in Mongo 5.0 we are not receiving this n + // field and so we are using modifiedCount instead + meteorResult.numberAffected = mongoResult.n || mongoResult.matchedCount || mongoResult.modifiedCount; + } + } + + return meteorResult; +}; + +export const replaceMeteorAtomWithMongo = function (document) { + if (EJSON.isBinary(document)) { + // This does more copies than we'd like, but is necessary because + // MongoDB.BSON only looks like it takes a Uint8Array (and doesn't actually + // serialize it correctly). + return new MongoDB.Binary(Buffer.from(document)); + } + if (document instanceof MongoDB.Binary) { + return document; + } + if (document instanceof Mongo.ObjectID) { + return new MongoDB.ObjectID(document.toHexString()); + } + if (document instanceof MongoDB.Timestamp) { + // For now, the Meteor representation of a Mongo timestamp type (not a date! + // this is a weird internal thing used in the oplog!) is the same as the + // Mongo representation. We need to do this explicitly or else we would do a + // structural clone and lose the prototype. + return document; + } + if (document instanceof Decimal) { + return MongoDB.Decimal128.fromString(document.toString()); + } + if (EJSON._isCustomType(document)) { + return replaceNames(makeMongoLegal, EJSON.toJSONValue(document)); + } + // It is not ordinarily possible to stick dollar-sign keys into mongo + // so we don't bother checking for things that need escaping at this time. + return undefined; +}; + +export const replaceTypes = function (document, atomTransformer) { + if (typeof document !== 'object' || document === null) + return document; + + var replacedTopLevelAtom = atomTransformer(document); + if (replacedTopLevelAtom !== undefined) + return replacedTopLevelAtom; + + var ret = document; + Object.entries(document).forEach(function ([key, val]) { + var valReplaced = replaceTypes(val, atomTransformer); + if (val !== valReplaced) { + // Lazy clone. Shallow copy. + if (ret === document) + ret = clone(document); + ret[key] = valReplaced; + } + }); + return ret; +}; + +export const replaceMongoAtomWithMeteor = function (document) { + if (document instanceof MongoDB.Binary) { + // for backwards compatibility + if (document.sub_type !== 0) { + return document; + } + var buffer = document.value(true); + return new Uint8Array(buffer); + } + if (document instanceof MongoDB.ObjectID) { + return new Mongo.ObjectID(document.toHexString()); + } + if (document instanceof MongoDB.Decimal128) { + return Decimal(document.toString()); + } + if (document["EJSON$type"] && document["EJSON$value"] && Object.keys(document).length === 2) { + return EJSON.fromJSONValue(replaceNames(unmakeMongoLegal, document)); + } + if (document instanceof MongoDB.Timestamp) { + // For now, the Meteor representation of a Mongo timestamp type (not a date! + // this is a weird internal thing used in the oplog!) is the same as the + // Mongo representation. We need to do this explicitly or else we would do a + // structural clone and lose the prototype. + return document; + } + return undefined; +}; + +const makeMongoLegal = name => "EJSON" + name; +const unmakeMongoLegal = name => name.substr(5); \ No newline at end of file diff --git a/packages/mongo/mongo_connection.js b/packages/mongo/mongo_connection.js new file mode 100644 index 0000000000..5db1b2f458 --- /dev/null +++ b/packages/mongo/mongo_connection.js @@ -0,0 +1,936 @@ +import { Meteor } from 'meteor/meteor'; +import path from 'path'; +import { DocFetcher } from './doc_fetcher'; +import { ObserveMultiplexer } from './observe_multiplex'; +import { ObserveHandle } from './observe_handle'; +import { OPLOG_COLLECTION, OplogHandle } from './oplog_tailing'; +import { CLIENT_ONLY_METHODS, getAsyncMethodName } from 'meteor/minimongo/constants'; +import { OplogObserveDriver } from './oplog_observe_driver'; +import { PollingObserveDriver } from './polling_observe_driver'; +import { replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common'; +import { AsynchronousCursor } from './asynchronous_cursor'; +import { MongoDB } from './mongo_common'; +import { Cursor } from './cursor'; +import { CursorDescription } from './cursor_description'; + +const FILE_ASSET_SUFFIX = 'Asset'; +const ASSETS_FOLDER = 'assets'; +const APP_FOLDER = 'app'; + +const oplogCollectionWarnings = []; + +export const MongoConnection = function (url, options) { + var self = this; + options = options || {}; + self._observeMultiplexers = {}; + self._onFailoverHook = new Hook; + + const userOptions = { + ...(Mongo._connectionOptions || {}), + ...(Meteor.settings?.packages?.mongo?.options || {}) + }; + + var mongoOptions = Object.assign({ + ignoreUndefined: true, + }, userOptions); + + + + // Internally the oplog connections specify their own maxPoolSize + // which we don't want to overwrite with any user defined value + if ('maxPoolSize' in options) { + // If we just set this for "server", replSet will override it. If we just + // set it for replSet, it will be ignored if we're not using a replSet. + mongoOptions.maxPoolSize = options.maxPoolSize; + } + if ('minPoolSize' in options) { + mongoOptions.minPoolSize = options.minPoolSize; + } + + // Transform options like "tlsCAFileAsset": "filename.pem" into + // "tlsCAFile": "//filename.pem" + Object.entries(mongoOptions || {}) + .filter(([key]) => key && key.endsWith(FILE_ASSET_SUFFIX)) + .forEach(([key, value]) => { + const optionName = key.replace(FILE_ASSET_SUFFIX, ''); + mongoOptions[optionName] = path.join(Assets.getServerDir(), + ASSETS_FOLDER, APP_FOLDER, value); + delete mongoOptions[key]; + }); + + self.db = null; + self._oplogHandle = null; + self._docFetcher = null; + + mongoOptions.driverInfo = { + name: 'Meteor', + version: Meteor.release + } + + self.client = new MongoDB.MongoClient(url, mongoOptions); + self.db = self.client.db(); + + self.client.on('serverDescriptionChanged', Meteor.bindEnvironment(event => { + // When the connection is no longer against the primary node, execute all + // failover hooks. This is important for the driver as it has to re-pool the + // query when it happens. + if ( + event.previousDescription.type !== 'RSPrimary' && + event.newDescription.type === 'RSPrimary' + ) { + self._onFailoverHook.each(callback => { + callback(); + return true; + }); + } + })); + + if (options.oplogUrl && ! Package['disable-oplog']) { + self._oplogHandle = new OplogHandle(options.oplogUrl, self.db.databaseName); + self._docFetcher = new DocFetcher(self); + } + +}; + +MongoConnection.prototype._close = async function() { + var self = this; + + if (! self.db) + throw Error("close called before Connection created?"); + + // XXX probably untested + var oplogHandle = self._oplogHandle; + self._oplogHandle = null; + if (oplogHandle) + await oplogHandle.stop(); + + // Use Future.wrap so that errors get thrown. This happens to + // work even outside a fiber since the 'close' method is not + // actually asynchronous. + await self.client.close(); +}; + +MongoConnection.prototype.close = function () { + return this._close(); +}; + +MongoConnection.prototype._setOplogHandle = function(oplogHandle) { + this._oplogHandle = oplogHandle; + return this; +}; + +// Returns the Mongo Collection object; may yield. +MongoConnection.prototype.rawCollection = function (collectionName) { + var self = this; + + if (! self.db) + throw Error("rawCollection called before Connection created?"); + + return self.db.collection(collectionName); +}; + +MongoConnection.prototype.createCappedCollectionAsync = async function ( + collectionName, byteSize, maxDocuments) { + var self = this; + + if (! self.db) + throw Error("createCappedCollectionAsync called before Connection created?"); + + + await self.db.createCollection(collectionName, + { capped: true, size: byteSize, max: maxDocuments }); +}; + +// This should be called synchronously with a write, to create a +// transaction on the current write fence, if any. After we can read +// the write, and after observers have been notified (or at least, +// after the observer notifiers have added themselves to the write +// fence), you should call 'committed()' on the object returned. +MongoConnection.prototype._maybeBeginWrite = function () { + const fence = DDPServer._getCurrentFence(); + if (fence) { + return fence.beginWrite(); + } else { + return {committed: function () {}}; + } +}; + +// Internal interface: adds a callback which is called when the Mongo primary +// changes. Returns a stop handle. +MongoConnection.prototype._onFailover = function (callback) { + return this._onFailoverHook.register(callback); +}; + +MongoConnection.prototype.insertAsync = async function (collection_name, document) { + const self = this; + + if (collection_name === "___meteor_failure_test_collection") { + const e = new Error("Failure test"); + e._expectedByTest = true; + throw e; + } + + if (!(LocalCollection._isPlainObject(document) && + !EJSON._isCustomType(document))) { + throw new Error("Only plain objects may be inserted into MongoDB"); + } + + var write = self._maybeBeginWrite(); + var refresh = async function () { + await Meteor.refresh({collection: collection_name, id: document._id }); + }; + return self.rawCollection(collection_name).insertOne( + replaceTypes(document, replaceMeteorAtomWithMongo), + { + safe: true, + } + ).then(async ({insertedId}) => { + await refresh(); + await write.committed(); + return insertedId; + }).catch(async e => { + await write.committed(); + throw e; + }); +}; + + +// Cause queries that may be affected by the selector to poll in this write +// fence. +MongoConnection.prototype._refresh = async function (collectionName, selector) { + var refreshKey = {collection: collectionName}; + // If we know which documents we're removing, don't poll queries that are + // specific to other documents. (Note that multiple notifications here should + // not cause multiple polls, since all our listener is doing is enqueueing a + // poll.) + var specificIds = LocalCollection._idsMatchedBySelector(selector); + if (specificIds) { + for (const id of specificIds) { + await Meteor.refresh(Object.assign({id: id}, refreshKey)); + }; + } else { + await Meteor.refresh(refreshKey); + } +}; + +MongoConnection.prototype.removeAsync = async function (collection_name, selector) { + var self = this; + + if (collection_name === "___meteor_failure_test_collection") { + var e = new Error("Failure test"); + e._expectedByTest = true; + throw e; + } + + var write = self._maybeBeginWrite(); + var refresh = async function () { + await self._refresh(collection_name, selector); + }; + + return self.rawCollection(collection_name) + .deleteMany(replaceTypes(selector, replaceMeteorAtomWithMongo), { + safe: true, + }) + .then(async ({ deletedCount }) => { + await refresh(); + await write.committed(); + return transformResult({ result : {modifiedCount : deletedCount} }).numberAffected; + }).catch(async (err) => { + await write.committed(); + throw err; + }); +}; + +MongoConnection.prototype.dropCollectionAsync = async function(collectionName) { + var self = this; + + + var write = self._maybeBeginWrite(); + var refresh = function() { + return Meteor.refresh({ + collection: collectionName, + id: null, + dropCollection: true, + }); + }; + + return self + .rawCollection(collectionName) + .drop() + .then(async result => { + await refresh(); + await write.committed(); + return result; + }) + .catch(async e => { + await write.committed(); + throw e; + }); +}; + +// For testing only. Slightly better than `c.rawDatabase().dropDatabase()` +// because it lets the test's fence wait for it to be complete. +MongoConnection.prototype.dropDatabaseAsync = async function () { + var self = this; + + var write = self._maybeBeginWrite(); + var refresh = async function () { + await Meteor.refresh({ dropDatabase: true }); + }; + + try { + await self.db._dropDatabase(); + await refresh(); + await write.committed(); + } catch (e) { + await write.committed(); + throw e; + } +}; + +MongoConnection.prototype.updateAsync = async function (collection_name, selector, mod, options) { + var self = this; + + if (collection_name === "___meteor_failure_test_collection") { + var e = new Error("Failure test"); + e._expectedByTest = true; + throw e; + } + + // explicit safety check. null and undefined can crash the mongo + // driver. Although the node driver and minimongo do 'support' + // non-object modifier in that they don't crash, they are not + // meaningful operations and do not do anything. Defensively throw an + // error here. + if (!mod || typeof mod !== 'object') { + const error = new Error("Invalid modifier. Modifier must be an object."); + + throw error; + } + + if (!(LocalCollection._isPlainObject(mod) && !EJSON._isCustomType(mod))) { + const error = new Error( + "Only plain objects may be used as replacement" + + " documents in MongoDB"); + + throw error; + } + + if (!options) options = {}; + + var write = self._maybeBeginWrite(); + var refresh = async function () { + await self._refresh(collection_name, selector); + }; + + var collection = self.rawCollection(collection_name); + var mongoOpts = {safe: true}; + // Add support for filtered positional operator + if (options.arrayFilters !== undefined) mongoOpts.arrayFilters = options.arrayFilters; + // explictly enumerate options that minimongo supports + if (options.upsert) mongoOpts.upsert = true; + if (options.multi) mongoOpts.multi = true; + // Lets you get a more more full result from MongoDB. Use with caution: + // might not work with C.upsert (as opposed to C.update({upsert:true}) or + // with simulated upsert. + if (options.fullResult) mongoOpts.fullResult = true; + + var mongoSelector = replaceTypes(selector, replaceMeteorAtomWithMongo); + var mongoMod = replaceTypes(mod, replaceMeteorAtomWithMongo); + + var isModify = LocalCollection._isModificationMod(mongoMod); + + if (options._forbidReplace && !isModify) { + var err = new Error("Invalid modifier. Replacements are forbidden."); + throw err; + } + + // We've already run replaceTypes/replaceMeteorAtomWithMongo on + // selector and mod. We assume it doesn't matter, as far as + // the behavior of modifiers is concerned, whether `_modify` + // is run on EJSON or on mongo-converted EJSON. + + // Run this code up front so that it fails fast if someone uses + // a Mongo update operator we don't support. + let knownId; + if (options.upsert) { + try { + let newDoc = LocalCollection._createUpsertDocument(selector, mod); + knownId = newDoc._id; + } catch (err) { + throw err; + } + } + if (options.upsert && + ! isModify && + ! knownId && + options.insertedId && + ! (options.insertedId instanceof Mongo.ObjectID && + options.generatedId)) { + // In case of an upsert with a replacement, where there is no _id defined + // in either the query or the replacement doc, mongo will generate an id itself. + // Therefore we need this special strategy if we want to control the id ourselves. + + // We don't need to do this when: + // - This is not a replacement, so we can add an _id to $setOnInsert + // - The id is defined by query or mod we can just add it to the replacement doc + // - The user did not specify any id preference and the id is a Mongo ObjectId, + // then we can just let Mongo generate the id + return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options) + .then(async result => { + await refresh(); + await write.committed(); + if (result && ! options._returnObject) { + return result.numberAffected; + } else { + return result; + } + }); + } else { + if (options.upsert && !knownId && options.insertedId && isModify) { + if (!mongoMod.hasOwnProperty('$setOnInsert')) { + mongoMod.$setOnInsert = {}; + } + knownId = options.insertedId; + Object.assign(mongoMod.$setOnInsert, replaceTypes({_id: options.insertedId}, replaceMeteorAtomWithMongo)); + } + + const strings = Object.keys(mongoMod).filter((key) => !key.startsWith("$")); + let updateMethod = strings.length > 0 ? 'replaceOne' : 'updateMany'; + updateMethod = + updateMethod === 'updateMany' && !mongoOpts.multi + ? 'updateOne' + : updateMethod; + return collection[updateMethod] + .bind(collection)(mongoSelector, mongoMod, mongoOpts) + .then(async result => { + var meteorResult = transformResult({result}); + if (meteorResult && options._returnObject) { + // If this was an upsertAsync() call, and we ended up + // inserting a new doc and we know its id, then + // return that id as well. + if (options.upsert && meteorResult.insertedId) { + if (knownId) { + meteorResult.insertedId = knownId; + } else if (meteorResult.insertedId instanceof MongoDB.ObjectID) { + meteorResult.insertedId = new Mongo.ObjectID(meteorResult.insertedId.toHexString()); + } + } + await refresh(); + await write.committed(); + return meteorResult; + } else { + await refresh(); + await write.committed(); + return meteorResult.numberAffected; + } + }).catch(async (err) => { + await write.committed(); + throw err; + }); + } +}; + +// exposed for testing +MongoConnection._isCannotChangeIdError = function (err) { + + // Mongo 3.2.* returns error as next Object: + // {name: String, code: Number, errmsg: String} + // Older Mongo returns: + // {name: String, code: Number, err: String} + var error = err.errmsg || err.err; + + // We don't use the error code here + // because the error code we observed it producing (16837) appears to be + // a far more generic error code based on examining the source. + if (error.indexOf('The _id field cannot be changed') === 0 + || error.indexOf("the (immutable) field '_id' was found to have been altered to _id") !== -1) { + return true; + } + + return false; +}; + +// XXX MongoConnection.upsertAsync() does not return the id of the inserted document +// unless you set it explicitly in the selector or modifier (as a replacement +// doc). +MongoConnection.prototype.upsertAsync = async function (collectionName, selector, mod, options) { + var self = this; + + + + if (typeof options === "function" && ! callback) { + callback = options; + options = {}; + } + + return self.updateAsync(collectionName, selector, mod, + Object.assign({}, options, { + upsert: true, + _returnObject: true + })); +}; + +MongoConnection.prototype.find = function (collectionName, selector, options) { + var self = this; + + if (arguments.length === 1) + selector = {}; + + return new Cursor( + self, new CursorDescription(collectionName, selector, options)); +}; + +MongoConnection.prototype.findOneAsync = async function (collection_name, selector, options) { + var self = this; + if (arguments.length === 1) { + selector = {}; + } + + options = options || {}; + options.limit = 1; + + const results = await self.find(collection_name, selector, options).fetch(); + + return results[0]; +}; + +// We'll actually design an index API later. For now, we just pass through to +// Mongo's, but make it synchronous. +MongoConnection.prototype.createIndexAsync = async function (collectionName, index, + options) { + var self = this; + + // We expect this function to be called at startup, not from within a method, + // so we don't interact with the write fence. + var collection = self.rawCollection(collectionName); + await collection.createIndex(index, options); +}; + +// just to be consistent with the other methods +MongoConnection.prototype.createIndex = + MongoConnection.prototype.createIndexAsync; + +MongoConnection.prototype.countDocuments = function (collectionName, ...args) { + args = args.map(arg => replaceTypes(arg, replaceMeteorAtomWithMongo)); + const collection = this.rawCollection(collectionName); + return collection.countDocuments(...args); +}; + +MongoConnection.prototype.estimatedDocumentCount = function (collectionName, ...args) { + args = args.map(arg => replaceTypes(arg, replaceMeteorAtomWithMongo)); + const collection = this.rawCollection(collectionName); + return collection.estimatedDocumentCount(...args); +}; + +MongoConnection.prototype.ensureIndexAsync = MongoConnection.prototype.createIndexAsync; + +MongoConnection.prototype.dropIndexAsync = async function (collectionName, index) { + var self = this; + + + // This function is only used by test code, not within a method, so we don't + // interact with the write fence. + var collection = self.rawCollection(collectionName); + var indexName = await collection.dropIndex(index); +}; + + +CLIENT_ONLY_METHODS.forEach(function (m) { + MongoConnection.prototype[m] = function () { + throw new Error( + `${m} + is not available on the server. Please use ${getAsyncMethodName( + m + )}() instead.` + ); + }; +}); + + +var NUM_OPTIMISTIC_TRIES = 3; + + + +var simulateUpsertWithInsertedId = async function (collection, selector, mod, options) { + // STRATEGY: First try doing an upsert with a generated ID. + // If this throws an error about changing the ID on an existing document + // then without affecting the database, we know we should probably try + // an update without the generated ID. If it affected 0 documents, + // then without affecting the database, we the document that first + // gave the error is probably removed and we need to try an insert again + // We go back to step one and repeat. + // Like all "optimistic write" schemes, we rely on the fact that it's + // unlikely our writes will continue to be interfered with under normal + // circumstances (though sufficiently heavy contention with writers + // disagreeing on the existence of an object will cause writes to fail + // in theory). + + var insertedId = options.insertedId; // must exist + var mongoOptsForUpdate = { + safe: true, + multi: options.multi + }; + var mongoOptsForInsert = { + safe: true, + upsert: true + }; + + var replacementWithId = Object.assign( + replaceTypes({_id: insertedId}, replaceMeteorAtomWithMongo), + mod); + + var tries = NUM_OPTIMISTIC_TRIES; + + var doUpdate = async function () { + tries--; + if (! tries) { + throw new Error("Upsert failed after " + NUM_OPTIMISTIC_TRIES + " tries."); + } else { + let method = collection.updateMany; + if(!Object.keys(mod).some(key => key.startsWith("$"))){ + method = collection.replaceOne.bind(collection); + } + return method( + selector, + mod, + mongoOptsForUpdate).then(result => { + if (result && (result.modifiedCount || result.upsertedCount)) { + return { + numberAffected: result.modifiedCount || result.upsertedCount, + insertedId: result.upsertedId || undefined, + }; + } else { + return doConditionalInsert(); + } + }); + } + }; + + var doConditionalInsert = function() { + return collection.replaceOne(selector, replacementWithId, mongoOptsForInsert) + .then(result => ({ + numberAffected: result.upsertedCount, + insertedId: result.upsertedId, + })).catch(err => { + if (MongoConnection._isCannotChangeIdError(err)) { + return doUpdate(); + } else { + throw err; + } + }); + + }; + return doUpdate(); +}; + +// observeChanges for tailable cursors on capped collections. +// +// Some differences from normal cursors: +// - Will never produce anything other than 'added' or 'addedBefore'. If you +// do update a document that has already been produced, this will not notice +// it. +// - If you disconnect and reconnect from Mongo, it will essentially restart +// the query, which will lead to duplicate results. This is pretty bad, +// but if you include a field called 'ts' which is inserted as +// new MongoInternals.MongoTimestamp(0, 0) (which is initialized to the +// current Mongo-style timestamp), we'll be able to find the place to +// restart properly. (This field is specifically understood by Mongo with an +// optimization which allows it to find the right place to start without +// an index on ts. It's how the oplog works.) +// - No callbacks are triggered synchronously with the call (there's no +// differentiation between "initial data" and "later changes"; everything +// that matches the query gets sent asynchronously). +// - De-duplication is not implemented. +// - Does not yet interact with the write fence. Probably, this should work by +// ignoring removes (which don't work on capped collections) and updates +// (which don't affect tailable cursors), and just keeping track of the ID +// of the inserted object, and closing the write fence once you get to that +// ID (or timestamp?). This doesn't work well if the document doesn't match +// the query, though. On the other hand, the write fence can close +// immediately if it does not match the query. So if we trust minimongo +// enough to accurately evaluate the query against the write fence, we +// should be able to do this... Of course, minimongo doesn't even support +// Mongo Timestamps yet. +MongoConnection.prototype._observeChangesTailable = function ( + cursorDescription, ordered, callbacks) { + var self = this; + + // Tailable cursors only ever call added/addedBefore callbacks, so it's an + // error if you didn't provide them. + if ((ordered && !callbacks.addedBefore) || + (!ordered && !callbacks.added)) { + throw new Error("Can't observe an " + (ordered ? "ordered" : "unordered") + + " tailable cursor without a " + + (ordered ? "addedBefore" : "added") + " callback"); + } + + return self.tail(cursorDescription, function (doc) { + var id = doc._id; + delete doc._id; + // The ts is an implementation detail. Hide it. + delete doc.ts; + if (ordered) { + callbacks.addedBefore(id, doc, null); + } else { + callbacks.added(id, doc); + } + }); +}; + +MongoConnection.prototype._createAsynchronousCursor = function( + cursorDescription, options = {}) { + var self = this; + const { selfForIteration, useTransform } = options; + options = { selfForIteration, useTransform }; + + var collection = self.rawCollection(cursorDescription.collectionName); + var cursorOptions = cursorDescription.options; + var mongoOptions = { + sort: cursorOptions.sort, + limit: cursorOptions.limit, + skip: cursorOptions.skip, + projection: cursorOptions.fields || cursorOptions.projection, + readPreference: cursorOptions.readPreference, + }; + + // Do we want a tailable cursor (which only works on capped collections)? + if (cursorOptions.tailable) { + mongoOptions.numberOfRetries = -1; + } + + var dbCursor = collection.find( + replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo), + mongoOptions); + + // Do we want a tailable cursor (which only works on capped collections)? + if (cursorOptions.tailable) { + // We want a tailable cursor... + dbCursor.addCursorFlag("tailable", true) + // ... and for the server to wait a bit if any getMore has no data (rather + // than making us put the relevant sleeps in the client)... + dbCursor.addCursorFlag("awaitData", true) + + // And if this is on the oplog collection and the cursor specifies a 'ts', + // then set the undocumented oplog replay flag, which does a special scan to + // find the first document (instead of creating an index on ts). This is a + // very hard-coded Mongo flag which only works on the oplog collection and + // only works with the ts field. + if (cursorDescription.collectionName === OPLOG_COLLECTION && + cursorDescription.selector.ts) { + dbCursor.addCursorFlag("oplogReplay", true) + } + } + + if (typeof cursorOptions.maxTimeMs !== 'undefined') { + dbCursor = dbCursor.maxTimeMS(cursorOptions.maxTimeMs); + } + if (typeof cursorOptions.hint !== 'undefined') { + dbCursor = dbCursor.hint(cursorOptions.hint); + } + + return new AsynchronousCursor(dbCursor, cursorDescription, options, collection); +}; + +// Tails the cursor described by cursorDescription, most likely on the +// oplog. Calls docCallback with each document found. Ignores errors and just +// restarts the tail on error. +// +// If timeoutMS is set, then if we don't get a new document every timeoutMS, +// kill and restart the cursor. This is primarily a workaround for #8598. +MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeoutMS) { + var self = this; + if (!cursorDescription.options.tailable) + throw new Error("Can only tail a tailable cursor"); + + var cursor = self._createAsynchronousCursor(cursorDescription); + + var stopped = false; + var lastTS; + + Meteor.defer(async function loop() { + var doc = null; + while (true) { + if (stopped) + return; + try { + doc = await cursor._nextObjectPromiseWithTimeout(timeoutMS); + } catch (err) { + // There's no good way to figure out if this was actually an error from + // Mongo, or just client-side (including our own timeout error). Ah + // well. But either way, we need to retry the cursor (unless the failure + // was because the observe got stopped). + doc = null; + } + // Since we awaited a promise above, we need to check again to see if + // we've been stopped before calling the callback. + if (stopped) + return; + if (doc) { + // If a tailable cursor contains a "ts" field, use it to recreate the + // cursor on error. ("ts" is a standard that Mongo uses internally for + // the oplog, and there's a special flag that lets you do binary search + // on it instead of needing to use an index.) + lastTS = doc.ts; + docCallback(doc); + } else { + var newSelector = Object.assign({}, cursorDescription.selector); + if (lastTS) { + newSelector.ts = {$gt: lastTS}; + } + cursor = self._createAsynchronousCursor(new CursorDescription( + cursorDescription.collectionName, + newSelector, + cursorDescription.options)); + // Mongo failover takes many seconds. Retry in a bit. (Without this + // setTimeout, we peg the CPU at 100% and never notice the actual + // failover. + setTimeout(loop, 100); + break; + } + } + }); + + return { + stop: function () { + stopped = true; + cursor.close(); + } + }; +}; + +Object.assign(MongoConnection.prototype, { + _observeChanges: async function ( + cursorDescription, ordered, callbacks, nonMutatingCallbacks) { + var self = this; + const collectionName = cursorDescription.collectionName; + + if (cursorDescription.options.tailable) { + return self._observeChangesTailable(cursorDescription, ordered, callbacks); + } + + // You may not filter out _id when observing changes, because the id is a core + // part of the observeChanges API. + const fieldsOptions = cursorDescription.options.projection || cursorDescription.options.fields; + if (fieldsOptions && + (fieldsOptions._id === 0 || + fieldsOptions._id === false)) { + throw Error("You may not observe a cursor with {fields: {_id: 0}}"); + } + + var observeKey = EJSON.stringify( + Object.assign({ordered: ordered}, cursorDescription)); + + var multiplexer, observeDriver; + var firstHandle = false; + + // Find a matching ObserveMultiplexer, or create a new one. This next block is + // guaranteed to not yield (and it doesn't call anything that can observe a + // new query), so no other calls to this function can interleave with it. + if (observeKey in self._observeMultiplexers) { + multiplexer = self._observeMultiplexers[observeKey]; + } else { + firstHandle = true; + // Create a new ObserveMultiplexer. + multiplexer = new ObserveMultiplexer({ + ordered: ordered, + onStop: function () { + delete self._observeMultiplexers[observeKey]; + return observeDriver.stop(); + } + }); + } + + var observeHandle = new ObserveHandle(multiplexer, + callbacks, + nonMutatingCallbacks, + ); + + const oplogOptions = self?._oplogHandle?._oplogOptions || {}; + const { includeCollections, excludeCollections } = oplogOptions; + if (firstHandle) { + var matcher, sorter; + var canUseOplog = [ + function () { + // At a bare minimum, using the oplog requires us to have an oplog, to + // want unordered callbacks, and to not want a callback on the polls + // that won't happen. + return self._oplogHandle && !ordered && + !callbacks._testOnlyPollCallback; + }, + function () { + // We also need to check, if the collection of this Cursor is actually being "watched" by the Oplog handle + // if not, we have to fallback to long polling + if (excludeCollections?.length && excludeCollections.includes(collectionName)) { + if (!oplogCollectionWarnings.includes(collectionName)) { + console.warn(`Meteor.settings.packages.mongo.oplogExcludeCollections includes the collection ${collectionName} - your subscriptions will only use long polling!`); + oplogCollectionWarnings.push(collectionName); // we only want to show the warnings once per collection! + } + return false; + } + if (includeCollections?.length && !includeCollections.includes(collectionName)) { + if (!oplogCollectionWarnings.includes(collectionName)) { + console.warn(`Meteor.settings.packages.mongo.oplogIncludeCollections does not include the collection ${collectionName} - your subscriptions will only use long polling!`); + oplogCollectionWarnings.push(collectionName); // we only want to show the warnings once per collection! + } + return false; + } + return true; + }, + function () { + // We need to be able to compile the selector. Fall back to polling for + // some newfangled $selector that minimongo doesn't support yet. + try { + matcher = new Minimongo.Matcher(cursorDescription.selector); + return true; + } catch (e) { + // XXX make all compilation errors MinimongoError or something + // so that this doesn't ignore unrelated exceptions + return false; + } + }, + function () { + // ... and the selector itself needs to support oplog. + return OplogObserveDriver.cursorSupported(cursorDescription, matcher); + }, + function () { + // And we need to be able to compile the sort, if any. eg, can't be + // {$natural: 1}. + if (!cursorDescription.options.sort) + return true; + try { + sorter = new Minimongo.Sorter(cursorDescription.options.sort); + return true; + } catch (e) { + // XXX make all compilation errors MinimongoError or something + // so that this doesn't ignore unrelated exceptions + return false; + } + } + ].every(f => f()); // invoke each function and check if all return true + + var driverClass = canUseOplog ? OplogObserveDriver : PollingObserveDriver; + observeDriver = new driverClass({ + cursorDescription: cursorDescription, + mongoHandle: self, + multiplexer: multiplexer, + ordered: ordered, + matcher: matcher, // ignored by polling + sorter: sorter, // ignored by polling + _testOnlyPollCallback: callbacks._testOnlyPollCallback + }); + + if (observeDriver._init) { + await observeDriver._init(); + } + + // This field is only set for use in tests. + multiplexer._observeDriver = observeDriver; + } + self._observeMultiplexers[observeKey] = multiplexer; + // Blocks until the initial adds have been sent. + await multiplexer.addHandleAndSendInitialAdds(observeHandle); + + return observeHandle; + }, + +}); diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 60ee67b857..bb60eaee7b 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -1,32 +1,9 @@ -import has from 'lodash.has'; -import identity from 'lodash.identity'; -import clone from 'lodash.clone'; +import { OplogHandle } from './oplog_tailing'; +import { MongoConnection } from './mongo_connection'; +import { OplogObserveDriver } from './oplog_observe_driver'; +import { MongoDB } from './mongo_common'; -/** - * Provide a synchronous Collection API using fibers, backed by - * MongoDB. This is only for use on the server, and mostly identical - * to the client API. - * - * NOTE: the public API methods must be run within a fiber. If you call - * these outside of a fiber they will explode! - */ - -const path = require("path"); -const util = require("util"); - -/** @type {import('mongodb')} */ -var MongoDB = NpmModuleMongodb; -import { DocFetcher } from "./doc_fetcher.js"; -import { - ASYNC_CURSOR_METHODS, - CLIENT_ONLY_METHODS, - getAsyncMethodName -} from "meteor/minimongo/constants"; -import { Meteor } from "meteor/meteor"; -import { ObserveHandle } from './observe_handle'; -import { ObserveMultiplexer } from './observe_multiplex'; - -MongoInternals = {}; +MongoInternals = global.MongoInternals = {}; MongoInternals.__packageName = 'mongo'; @@ -43,9 +20,11 @@ MongoInternals.NpmModules = { // XXX COMPAT WITH 1.0.3.2 MongoInternals.NpmModule = MongoDB; -const FILE_ASSET_SUFFIX = 'Asset'; -const ASSETS_FOLDER = 'assets'; -const APP_FOLDER = 'app'; +MongoInternals.OplogHandle = OplogHandle; + +MongoInternals.Connection = MongoConnection; + +MongoInternals.OplogObserveDriver = OplogObserveDriver; // This is used to add or remove EJSON from the beginning of everything nested // inside an EJSON custom type. It should only be called on pure JSON! @@ -71,1530 +50,13 @@ MongoDB.Timestamp.prototype.clone = function () { return this; }; -var makeMongoLegal = function (name) { return "EJSON" + name; }; -var unmakeMongoLegal = function (name) { return name.substr(5); }; - -var replaceMongoAtomWithMeteor = function (document) { - if (document instanceof MongoDB.Binary) { - // for backwards compatibility - if (document.sub_type !== 0) { - return document; - } - var buffer = document.value(true); - return new Uint8Array(buffer); - } - if (document instanceof MongoDB.ObjectID) { - return new Mongo.ObjectID(document.toHexString()); - } - if (document instanceof MongoDB.Decimal128) { - return Decimal(document.toString()); - } - if (document["EJSON$type"] && document["EJSON$value"] && Object.keys(document).length === 2) { - return EJSON.fromJSONValue(replaceNames(unmakeMongoLegal, document)); - } - if (document instanceof MongoDB.Timestamp) { - // For now, the Meteor representation of a Mongo timestamp type (not a date! - // this is a weird internal thing used in the oplog!) is the same as the - // Mongo representation. We need to do this explicitly or else we would do a - // structural clone and lose the prototype. - return document; - } - return undefined; -}; - -var replaceMeteorAtomWithMongo = function (document) { - if (EJSON.isBinary(document)) { - // This does more copies than we'd like, but is necessary because - // MongoDB.BSON only looks like it takes a Uint8Array (and doesn't actually - // serialize it correctly). - return new MongoDB.Binary(Buffer.from(document)); - } - if (document instanceof MongoDB.Binary) { - return document; - } - if (document instanceof Mongo.ObjectID) { - return new MongoDB.ObjectID(document.toHexString()); - } - if (document instanceof MongoDB.Timestamp) { - // For now, the Meteor representation of a Mongo timestamp type (not a date! - // this is a weird internal thing used in the oplog!) is the same as the - // Mongo representation. We need to do this explicitly or else we would do a - // structural clone and lose the prototype. - return document; - } - if (document instanceof Decimal) { - return MongoDB.Decimal128.fromString(document.toString()); - } - if (EJSON._isCustomType(document)) { - return replaceNames(makeMongoLegal, EJSON.toJSONValue(document)); - } - // It is not ordinarily possible to stick dollar-sign keys into mongo - // so we don't bother checking for things that need escaping at this time. - return undefined; -}; - -var replaceTypes = function (document, atomTransformer) { - if (typeof document !== 'object' || document === null) - return document; - - var replacedTopLevelAtom = atomTransformer(document); - if (replacedTopLevelAtom !== undefined) - return replacedTopLevelAtom; - - var ret = document; - Object.entries(document).forEach(function ([key, val]) { - var valReplaced = replaceTypes(val, atomTransformer); - if (val !== valReplaced) { - // Lazy clone. Shallow copy. - if (ret === document) - ret = clone(document); - ret[key] = valReplaced; - } - }); - return ret; -}; - - -MongoConnection = function (url, options) { - var self = this; - options = options || {}; - self._observeMultiplexers = {}; - self._onFailoverHook = new Hook; - - const userOptions = { - ...(Mongo._connectionOptions || {}), - ...(Meteor.settings?.packages?.mongo?.options || {}) - }; - - var mongoOptions = Object.assign({ - ignoreUndefined: true, - }, userOptions); - - - - // Internally the oplog connections specify their own maxPoolSize - // which we don't want to overwrite with any user defined value - if (has(options, 'maxPoolSize')) { - // If we just set this for "server", replSet will override it. If we just - // set it for replSet, it will be ignored if we're not using a replSet. - mongoOptions.maxPoolSize = options.maxPoolSize; - } - if (has(options, 'minPoolSize')) { - mongoOptions.minPoolSize = options.minPoolSize; - } - - // Transform options like "tlsCAFileAsset": "filename.pem" into - // "tlsCAFile": "//filename.pem" - Object.entries(mongoOptions || {}) - .filter(([key]) => key && key.endsWith(FILE_ASSET_SUFFIX)) - .forEach(([key, value]) => { - const optionName = key.replace(FILE_ASSET_SUFFIX, ''); - mongoOptions[optionName] = path.join(Assets.getServerDir(), - ASSETS_FOLDER, APP_FOLDER, value); - delete mongoOptions[key]; - }); - - self.db = null; - self._oplogHandle = null; - self._docFetcher = null; - - mongoOptions.driverInfo = { - name: 'Meteor', - version: Meteor.release - } - - self.client = new MongoDB.MongoClient(url, mongoOptions); - self.db = self.client.db(); - - self.client.on('serverDescriptionChanged', Meteor.bindEnvironment(event => { - // When the connection is no longer against the primary node, execute all - // failover hooks. This is important for the driver as it has to re-pool the - // query when it happens. - if ( - event.previousDescription.type !== 'RSPrimary' && - event.newDescription.type === 'RSPrimary' - ) { - self._onFailoverHook.each(callback => { - callback(); - return true; - }); - } - })); - - if (options.oplogUrl && ! Package['disable-oplog']) { - self._oplogHandle = new OplogHandle(options.oplogUrl, self.db.databaseName); - self._docFetcher = new DocFetcher(self); - } - -}; - -MongoConnection.prototype._close = async function() { - var self = this; - - if (! self.db) - throw Error("close called before Connection created?"); - - // XXX probably untested - var oplogHandle = self._oplogHandle; - self._oplogHandle = null; - if (oplogHandle) - await oplogHandle.stop(); - - // Use Future.wrap so that errors get thrown. This happens to - // work even outside a fiber since the 'close' method is not - // actually asynchronous. - await self.client.close(); -}; - -MongoConnection.prototype.close = function () { - return this._close(); -}; - -MongoConnection.prototype._setOplogHandle = function(oplogHandle) { - this._oplogHandle = oplogHandle; - return this; -}; - -// Returns the Mongo Collection object; may yield. -MongoConnection.prototype.rawCollection = function (collectionName) { - var self = this; - - if (! self.db) - throw Error("rawCollection called before Connection created?"); - - return self.db.collection(collectionName); -}; - -MongoConnection.prototype.createCappedCollectionAsync = async function ( - collectionName, byteSize, maxDocuments) { - var self = this; - - if (! self.db) - throw Error("createCappedCollectionAsync called before Connection created?"); - - - await self.db.createCollection(collectionName, - { capped: true, size: byteSize, max: maxDocuments }); -}; - -// This should be called synchronously with a write, to create a -// transaction on the current write fence, if any. After we can read -// the write, and after observers have been notified (or at least, -// after the observer notifiers have added themselves to the write -// fence), you should call 'committed()' on the object returned. -MongoConnection.prototype._maybeBeginWrite = function () { - const fence = DDPServer._getCurrentFence(); - if (fence) { - return fence.beginWrite(); - } else { - return {committed: function () {}}; - } -}; - -// Internal interface: adds a callback which is called when the Mongo primary -// changes. Returns a stop handle. -MongoConnection.prototype._onFailover = function (callback) { - return this._onFailoverHook.register(callback); -}; - - -//////////// Public API ////////// - -// The write methods block until the database has confirmed the write (it may -// not be replicated or stable on disk, but one server has confirmed it) if no -// callback is provided. If a callback is provided, then they call the callback -// when the write is confirmed. They return nothing on success, and raise an -// exception on failure. -// -// After making a write (with insert, update, remove), observers are -// notified asynchronously. If you want to receive a callback once all -// of the observer notifications have landed for your write, do the -// writes inside a write fence (set DDPServer._CurrentWriteFence to a new -// _WriteFence, and then set a callback on the write fence.) -// -// Since our execution environment is single-threaded, this is -// well-defined -- a write "has been made" if it's returned, and an -// observer "has been notified" if its callback has returned. - -var writeCallback = function (write, refresh, callback) { - return function (err, result) { - if (! err) { - // XXX We don't have to run this on error, right? - try { - refresh(); - } catch (refreshErr) { - if (callback) { - callback(refreshErr); - return; - } else { - throw refreshErr; - } - } - } - write.committed(); - if (callback) { - callback(err, result); - } else if (err) { - throw err; - } - }; -}; - -MongoConnection.prototype.insertAsync = async function (collection_name, document) { - const self = this; - - if (collection_name === "___meteor_failure_test_collection") { - const e = new Error("Failure test"); - e._expectedByTest = true; - throw e; - } - - if (!(LocalCollection._isPlainObject(document) && - !EJSON._isCustomType(document))) { - throw new Error("Only plain objects may be inserted into MongoDB"); - } - - var write = self._maybeBeginWrite(); - var refresh = async function () { - await Meteor.refresh({collection: collection_name, id: document._id }); - }; - return self.rawCollection(collection_name).insertOne( - replaceTypes(document, replaceMeteorAtomWithMongo), - { - safe: true, - } - ).then(async ({insertedId}) => { - await refresh(); - await write.committed(); - return insertedId; - }).catch(async e => { - await write.committed(); - throw e; - }); -}; - - -// Cause queries that may be affected by the selector to poll in this write -// fence. -MongoConnection.prototype._refresh = async function (collectionName, selector) { - var refreshKey = {collection: collectionName}; - // If we know which documents we're removing, don't poll queries that are - // specific to other documents. (Note that multiple notifications here should - // not cause multiple polls, since all our listener is doing is enqueueing a - // poll.) - var specificIds = LocalCollection._idsMatchedBySelector(selector); - if (specificIds) { - for (const id of specificIds) { - await Meteor.refresh(Object.assign({id: id}, refreshKey)); - }; - } else { - await Meteor.refresh(refreshKey); - } -}; - -MongoConnection.prototype.removeAsync = async function (collection_name, selector) { - var self = this; - - if (collection_name === "___meteor_failure_test_collection") { - var e = new Error("Failure test"); - e._expectedByTest = true; - throw e; - } - - var write = self._maybeBeginWrite(); - var refresh = async function () { - await self._refresh(collection_name, selector); - }; - - return self.rawCollection(collection_name) - .deleteMany(replaceTypes(selector, replaceMeteorAtomWithMongo), { - safe: true, - }) - .then(async ({ deletedCount }) => { - await refresh(); - await write.committed(); - return transformResult({ result : {modifiedCount : deletedCount} }).numberAffected; - }).catch(async (err) => { - await write.committed(); - throw err; - }); -}; - -MongoConnection.prototype.dropCollectionAsync = async function(collectionName) { - var self = this; - - - var write = self._maybeBeginWrite(); - var refresh = function() { - return Meteor.refresh({ - collection: collectionName, - id: null, - dropCollection: true, - }); - }; - - return self - .rawCollection(collectionName) - .drop() - .then(async result => { - await refresh(); - await write.committed(); - return result; - }) - .catch(async e => { - await write.committed(); - throw e; - }); -}; - -// For testing only. Slightly better than `c.rawDatabase().dropDatabase()` -// because it lets the test's fence wait for it to be complete. -MongoConnection.prototype.dropDatabaseAsync = async function () { - var self = this; - - var write = self._maybeBeginWrite(); - var refresh = async function () { - await Meteor.refresh({ dropDatabase: true }); - }; - - try { - await self.db._dropDatabase(); - await refresh(); - await write.committed(); - } catch (e) { - await write.committed(); - throw e; - } -}; - -MongoConnection.prototype.updateAsync = async function (collection_name, selector, mod, options) { - var self = this; - - if (collection_name === "___meteor_failure_test_collection") { - var e = new Error("Failure test"); - e._expectedByTest = true; - throw e; - } - - // explicit safety check. null and undefined can crash the mongo - // driver. Although the node driver and minimongo do 'support' - // non-object modifier in that they don't crash, they are not - // meaningful operations and do not do anything. Defensively throw an - // error here. - if (!mod || typeof mod !== 'object') { - const error = new Error("Invalid modifier. Modifier must be an object."); - - throw error; - } - - if (!(LocalCollection._isPlainObject(mod) && !EJSON._isCustomType(mod))) { - const error = new Error( - "Only plain objects may be used as replacement" + - " documents in MongoDB"); - - throw error; - } - - if (!options) options = {}; - - var write = self._maybeBeginWrite(); - var refresh = async function () { - await self._refresh(collection_name, selector); - }; - - var collection = self.rawCollection(collection_name); - var mongoOpts = {safe: true}; - // Add support for filtered positional operator - if (options.arrayFilters !== undefined) mongoOpts.arrayFilters = options.arrayFilters; - // explictly enumerate options that minimongo supports - if (options.upsert) mongoOpts.upsert = true; - if (options.multi) mongoOpts.multi = true; - // Lets you get a more more full result from MongoDB. Use with caution: - // might not work with C.upsert (as opposed to C.update({upsert:true}) or - // with simulated upsert. - if (options.fullResult) mongoOpts.fullResult = true; - - var mongoSelector = replaceTypes(selector, replaceMeteorAtomWithMongo); - var mongoMod = replaceTypes(mod, replaceMeteorAtomWithMongo); - - var isModify = LocalCollection._isModificationMod(mongoMod); - - if (options._forbidReplace && !isModify) { - var err = new Error("Invalid modifier. Replacements are forbidden."); - throw err; - } - - // We've already run replaceTypes/replaceMeteorAtomWithMongo on - // selector and mod. We assume it doesn't matter, as far as - // the behavior of modifiers is concerned, whether `_modify` - // is run on EJSON or on mongo-converted EJSON. - - // Run this code up front so that it fails fast if someone uses - // a Mongo update operator we don't support. - let knownId; - if (options.upsert) { - try { - let newDoc = LocalCollection._createUpsertDocument(selector, mod); - knownId = newDoc._id; - } catch (err) { - throw err; - } - } - if (options.upsert && - ! isModify && - ! knownId && - options.insertedId && - ! (options.insertedId instanceof Mongo.ObjectID && - options.generatedId)) { - // In case of an upsert with a replacement, where there is no _id defined - // in either the query or the replacement doc, mongo will generate an id itself. - // Therefore we need this special strategy if we want to control the id ourselves. - - // We don't need to do this when: - // - This is not a replacement, so we can add an _id to $setOnInsert - // - The id is defined by query or mod we can just add it to the replacement doc - // - The user did not specify any id preference and the id is a Mongo ObjectId, - // then we can just let Mongo generate the id - return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options) - .then(async result => { - await refresh(); - await write.committed(); - if (result && ! options._returnObject) { - return result.numberAffected; - } else { - return result; - } - }); - } else { - if (options.upsert && !knownId && options.insertedId && isModify) { - if (!mongoMod.hasOwnProperty('$setOnInsert')) { - mongoMod.$setOnInsert = {}; - } - knownId = options.insertedId; - Object.assign(mongoMod.$setOnInsert, replaceTypes({_id: options.insertedId}, replaceMeteorAtomWithMongo)); - } - - const strings = Object.keys(mongoMod).filter((key) => !key.startsWith("$")); - let updateMethod = strings.length > 0 ? 'replaceOne' : 'updateMany'; - updateMethod = - updateMethod === 'updateMany' && !mongoOpts.multi - ? 'updateOne' - : updateMethod; - return collection[updateMethod] - .bind(collection)(mongoSelector, mongoMod, mongoOpts) - .then(async result => { - var meteorResult = transformResult({result}); - if (meteorResult && options._returnObject) { - // If this was an upsertAsync() call, and we ended up - // inserting a new doc and we know its id, then - // return that id as well. - if (options.upsert && meteorResult.insertedId) { - if (knownId) { - meteorResult.insertedId = knownId; - } else if (meteorResult.insertedId instanceof MongoDB.ObjectID) { - meteorResult.insertedId = new Mongo.ObjectID(meteorResult.insertedId.toHexString()); - } - } - await refresh(); - await write.committed(); - return meteorResult; - } else { - await refresh(); - await write.committed(); - return meteorResult.numberAffected; - } - }).catch(async (err) => { - await write.committed(); - throw err; - }); - } -}; - -var transformResult = function (driverResult) { - var meteorResult = { numberAffected: 0 }; - if (driverResult) { - var mongoResult = driverResult.result; - // On updates with upsert:true, the inserted values come as a list of - // upserted values -- even with options.multi, when the upsert does insert, - // it only inserts one element. - if (mongoResult.upsertedCount) { - meteorResult.numberAffected = mongoResult.upsertedCount; - - if (mongoResult.upsertedId) { - meteorResult.insertedId = mongoResult.upsertedId; - } - } else { - // n was used before Mongo 5.0, in Mongo 5.0 we are not receiving this n - // field and so we are using modifiedCount instead - meteorResult.numberAffected = mongoResult.n || mongoResult.matchedCount || mongoResult.modifiedCount; - } - } - - return meteorResult; -}; - - -var NUM_OPTIMISTIC_TRIES = 3; - -// exposed for testing -MongoConnection._isCannotChangeIdError = function (err) { - - // Mongo 3.2.* returns error as next Object: - // {name: String, code: Number, errmsg: String} - // Older Mongo returns: - // {name: String, code: Number, err: String} - var error = err.errmsg || err.err; - - // We don't use the error code here - // because the error code we observed it producing (16837) appears to be - // a far more generic error code based on examining the source. - if (error.indexOf('The _id field cannot be changed') === 0 - || error.indexOf("the (immutable) field '_id' was found to have been altered to _id") !== -1) { - return true; - } - - return false; -}; - -var simulateUpsertWithInsertedId = async function (collection, selector, mod, options) { - // STRATEGY: First try doing an upsert with a generated ID. - // If this throws an error about changing the ID on an existing document - // then without affecting the database, we know we should probably try - // an update without the generated ID. If it affected 0 documents, - // then without affecting the database, we the document that first - // gave the error is probably removed and we need to try an insert again - // We go back to step one and repeat. - // Like all "optimistic write" schemes, we rely on the fact that it's - // unlikely our writes will continue to be interfered with under normal - // circumstances (though sufficiently heavy contention with writers - // disagreeing on the existence of an object will cause writes to fail - // in theory). - - var insertedId = options.insertedId; // must exist - var mongoOptsForUpdate = { - safe: true, - multi: options.multi - }; - var mongoOptsForInsert = { - safe: true, - upsert: true - }; - - var replacementWithId = Object.assign( - replaceTypes({_id: insertedId}, replaceMeteorAtomWithMongo), - mod); - - var tries = NUM_OPTIMISTIC_TRIES; - - var doUpdate = async function () { - tries--; - if (! tries) { - throw new Error("Upsert failed after " + NUM_OPTIMISTIC_TRIES + " tries."); - } else { - let method = collection.updateMany; - if(!Object.keys(mod).some(key => key.startsWith("$"))){ - method = collection.replaceOne.bind(collection); - } - return method( - selector, - mod, - mongoOptsForUpdate).then(result => { - if (result && (result.modifiedCount || result.upsertedCount)) { - return { - numberAffected: result.modifiedCount || result.upsertedCount, - insertedId: result.upsertedId || undefined, - }; - } else { - return doConditionalInsert(); - } - }); - } - }; - - var doConditionalInsert = function() { - return collection.replaceOne(selector, replacementWithId, mongoOptsForInsert) - .then(result => ({ - numberAffected: result.upsertedCount, - insertedId: result.upsertedId, - })).catch(err => { - if (MongoConnection._isCannotChangeIdError(err)) { - return doUpdate(); - } else { - throw err; - } - }); - - }; - return doUpdate(); -}; - - -// XXX MongoConnection.upsertAsync() does not return the id of the inserted document -// unless you set it explicitly in the selector or modifier (as a replacement -// doc). -MongoConnection.prototype.upsertAsync = async function (collectionName, selector, mod, options) { - var self = this; - - - - if (typeof options === "function" && ! callback) { - callback = options; - options = {}; - } - - return self.updateAsync(collectionName, selector, mod, - Object.assign({}, options, { - upsert: true, - _returnObject: true - })); -}; - -MongoConnection.prototype.find = function (collectionName, selector, options) { - var self = this; - - if (arguments.length === 1) - selector = {}; - - return new Cursor( - self, new CursorDescription(collectionName, selector, options)); -}; - -MongoConnection.prototype.findOneAsync = async function (collection_name, selector, options) { - var self = this; - if (arguments.length === 1) { - selector = {}; - } - - options = options || {}; - options.limit = 1; - - const results = await self.find(collection_name, selector, options).fetch(); - - return results[0]; -}; - -// We'll actually design an index API later. For now, we just pass through to -// Mongo's, but make it synchronous. -MongoConnection.prototype.createIndexAsync = async function (collectionName, index, - options) { - var self = this; - - // We expect this function to be called at startup, not from within a method, - // so we don't interact with the write fence. - var collection = self.rawCollection(collectionName); - await collection.createIndex(index, options); -}; - -// just to be consistent with the other methods -MongoConnection.prototype.createIndex = - MongoConnection.prototype.createIndexAsync; - -MongoConnection.prototype.countDocuments = function (collectionName, ...args) { - args = args.map(arg => replaceTypes(arg, replaceMeteorAtomWithMongo)); - const collection = this.rawCollection(collectionName); - return collection.countDocuments(...args); -}; - -MongoConnection.prototype.estimatedDocumentCount = function (collectionName, ...args) { - args = args.map(arg => replaceTypes(arg, replaceMeteorAtomWithMongo)); - const collection = this.rawCollection(collectionName); - return collection.estimatedDocumentCount(...args); -}; - -MongoConnection.prototype.ensureIndexAsync = MongoConnection.prototype.createIndexAsync; - -MongoConnection.prototype.dropIndexAsync = async function (collectionName, index) { - var self = this; - - - // This function is only used by test code, not within a method, so we don't - // interact with the write fence. - var collection = self.rawCollection(collectionName); - var indexName = await collection.dropIndex(index); -}; - - -CLIENT_ONLY_METHODS.forEach(function (m) { - MongoConnection.prototype[m] = function () { - throw new Error( - `${m} + is not available on the server. Please use ${getAsyncMethodName( - m - )}() instead.` - ); - }; -}); - -// CURSORS - -// There are several classes which relate to cursors: -// -// CursorDescription represents the arguments used to construct a cursor: -// collectionName, selector, and (find) options. Because it is used as a key -// for cursor de-dup, everything in it should either be JSON-stringifiable or -// not affect observeChanges output (eg, options.transform functions are not -// stringifiable but do not affect observeChanges). -// -// SynchronousCursor is a wrapper around a MongoDB cursor -// which includes fully-synchronous versions of forEach, etc. -// -// Cursor is the cursor object returned from find(), which implements the -// documented Mongo.Collection cursor API. It wraps a CursorDescription and a -// SynchronousCursor (lazily: it doesn't contact Mongo until you call a method -// like fetch or forEach on it). -// -// ObserveHandle is the "observe handle" returned from observeChanges. It has a -// reference to an ObserveMultiplexer. -// -// ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a -// single observe driver. -// -// There are two "observe drivers" which drive ObserveMultiplexers: -// - PollingObserveDriver caches the results of a query and reruns it when -// necessary. -// - OplogObserveDriver follows the Mongo operation log to directly observe -// database changes. -// Both implementations follow the same simple interface: when you create them, -// they start sending observeChanges callbacks (and a ready() invocation) to -// their ObserveMultiplexer, and you stop them by calling their stop() method. - -CursorDescription = function (collectionName, selector, options) { - var self = this; - self.collectionName = collectionName; - self.selector = Mongo.Collection._rewriteSelector(selector); - self.options = options || {}; -}; - -Cursor = function (mongo, cursorDescription) { - var self = this; - - self._mongo = mongo; - self._cursorDescription = cursorDescription; - self._synchronousCursor = null; -}; - -function setupSynchronousCursor(cursor, method) { - // You can only observe a tailable cursor. - if (cursor._cursorDescription.options.tailable) - throw new Error('Cannot call ' + method + ' on a tailable cursor'); - - if (!cursor._synchronousCursor) { - cursor._synchronousCursor = cursor._mongo._createSynchronousCursor( - cursor._cursorDescription, - { - // Make sure that the "cursor" argument to forEach/map callbacks is the - // Cursor, not the SynchronousCursor. - selfForIteration: cursor, - useTransform: true, - } - ); - } - - return cursor._synchronousCursor; -} - - -Cursor.prototype.countAsync = async function () { - const collection = this._mongo.rawCollection(this._cursorDescription.collectionName); - return await collection.countDocuments( - replaceTypes(this._cursorDescription.selector, replaceMeteorAtomWithMongo), - replaceTypes(this._cursorDescription.options, replaceMeteorAtomWithMongo), - ); -}; - -Cursor.prototype.count = function () { - throw new Error( - "count() is not available on the server. Please use countAsync() instead." - ); -}; - -[...ASYNC_CURSOR_METHODS, Symbol.iterator, Symbol.asyncIterator].forEach(methodName => { - // count is handled specially since we don't want to create a cursor. - // it is still included in ASYNC_CURSOR_METHODS because we still want an async version of it to exist. - if (methodName === 'count') { - return - } - Cursor.prototype[methodName] = function (...args) { - const cursor = setupSynchronousCursor(this, methodName); - return cursor[methodName](...args); - }; - - // These methods are handled separately. - if (methodName === Symbol.iterator || methodName === Symbol.asyncIterator) { - return; - } - - const methodNameAsync = getAsyncMethodName(methodName); - Cursor.prototype[methodNameAsync] = function (...args) { - try { - return Promise.resolve(this[methodName](...args)); - } catch (error) { - return Promise.reject(error); - } - }; -}); - -Cursor.prototype.getTransform = function () { - return this._cursorDescription.options.transform; -}; - -// When you call Meteor.publish() with a function that returns a Cursor, we need -// to transmute it into the equivalent subscription. This is the function that -// does that. -Cursor.prototype._publishCursor = function (sub) { - var self = this; - var collection = self._cursorDescription.collectionName; - return Mongo.Collection._publishCursor(self, sub, collection); -}; - -// Used to guarantee that publish functions return at most one cursor per -// collection. Private, because we might later have cursors that include -// documents from multiple collections somehow. -Cursor.prototype._getCollectionName = function () { - var self = this; - return self._cursorDescription.collectionName; -}; - -Cursor.prototype.observe = function (callbacks) { - var self = this; - return LocalCollection._observeFromObserveChanges(self, callbacks); -}; - -Cursor.prototype.observeAsync = function (callbacks) { - return new Promise(resolve => resolve(this.observe(callbacks))); -}; - -Cursor.prototype.observeChanges = function (callbacks, options = {}) { - var self = this; - - var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks); - - return self._mongo._observeChanges( - self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks); -}; - -Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) { - return this.observeChanges(callbacks, options); -}; - -MongoConnection.prototype._createSynchronousCursor = function( - cursorDescription, options = {}) { - var self = this; - const { selfForIteration, useTransform } = options; - options = { selfForIteration, useTransform }; - - var collection = self.rawCollection(cursorDescription.collectionName); - var cursorOptions = cursorDescription.options; - var mongoOptions = { - sort: cursorOptions.sort, - limit: cursorOptions.limit, - skip: cursorOptions.skip, - projection: cursorOptions.fields || cursorOptions.projection, - readPreference: cursorOptions.readPreference, - }; - - // Do we want a tailable cursor (which only works on capped collections)? - if (cursorOptions.tailable) { - mongoOptions.numberOfRetries = -1; - } - - var dbCursor = collection.find( - replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo), - mongoOptions); - - // Do we want a tailable cursor (which only works on capped collections)? - if (cursorOptions.tailable) { - // We want a tailable cursor... - dbCursor.addCursorFlag("tailable", true) - // ... and for the server to wait a bit if any getMore has no data (rather - // than making us put the relevant sleeps in the client)... - dbCursor.addCursorFlag("awaitData", true) - - // And if this is on the oplog collection and the cursor specifies a 'ts', - // then set the undocumented oplog replay flag, which does a special scan to - // find the first document (instead of creating an index on ts). This is a - // very hard-coded Mongo flag which only works on the oplog collection and - // only works with the ts field. - if (cursorDescription.collectionName === OPLOG_COLLECTION && - cursorDescription.selector.ts) { - dbCursor.addCursorFlag("oplogReplay", true) - } - } - - if (typeof cursorOptions.maxTimeMs !== 'undefined') { - dbCursor = dbCursor.maxTimeMS(cursorOptions.maxTimeMs); - } - if (typeof cursorOptions.hint !== 'undefined') { - dbCursor = dbCursor.hint(cursorOptions.hint); - } - - return new AsynchronousCursor(dbCursor, cursorDescription, options, collection); -}; - -/** - * This is just a light wrapper for the cursor. The goal here is to ensure compatibility even if - * there are breaking changes on the MongoDB driver. - * - * @constructor - */ -class AsynchronousCursor { - constructor(dbCursor, cursorDescription, options) { - this._dbCursor = dbCursor; - this._cursorDescription = cursorDescription; - - this._selfForIteration = options.selfForIteration || this; - if (options.useTransform && cursorDescription.options.transform) { - this._transform = LocalCollection.wrapTransform( - cursorDescription.options.transform); - } else { - this._transform = null; - } - - this._visitedIds = new LocalCollection._IdMap; - } - - [Symbol.asyncIterator]() { - var cursor = this; - return { - async next() { - const value = await cursor._nextObjectPromise(); - return { done: !value, value }; - }, - }; - } - - // Returns a Promise for the next object from the underlying cursor (before - // the Mongo->Meteor type replacement). - async _rawNextObjectPromise() { - try { - return this._dbCursor.next(); - } catch (e) { - console.error(e); - } - } - - // Returns a Promise for the next object from the cursor, skipping those whose - // IDs we've already seen and replacing Mongo atoms with Meteor atoms. - async _nextObjectPromise () { - while (true) { - var doc = await this._rawNextObjectPromise(); - - if (!doc) return null; - doc = replaceTypes(doc, replaceMongoAtomWithMeteor); - - if (!this._cursorDescription.options.tailable && _.has(doc, '_id')) { - // Did Mongo give us duplicate documents in the same cursor? If so, - // ignore this one. (Do this before the transform, since transform might - // return some unrelated value.) We don't do this for tailable cursors, - // because we want to maintain O(1) memory usage. And if there isn't _id - // for some reason (maybe it's the oplog), then we don't do this either. - // (Be careful to do this for falsey but existing _id, though.) - if (this._visitedIds.has(doc._id)) continue; - this._visitedIds.set(doc._id, true); - } - - if (this._transform) - doc = this._transform(doc); - - return doc; - } - } - - // Returns a promise which is resolved with the next object (like with - // _nextObjectPromise) or rejected if the cursor doesn't return within - // timeoutMS ms. - _nextObjectPromiseWithTimeout(timeoutMS) { - if (!timeoutMS) { - return this._nextObjectPromise(); - } - const nextObjectPromise = this._nextObjectPromise(); - const timeoutErr = new Error('Client-side timeout waiting for next object'); - const timeoutPromise = new Promise((resolve, reject) => { - setTimeout(() => { - reject(timeoutErr); - }, timeoutMS); - }); - return Promise.race([nextObjectPromise, timeoutPromise]) - .catch((err) => { - if (err === timeoutErr) { - this.close(); - } - throw err; - }); - } - - async forEach(callback, thisArg) { - // Get back to the beginning. - this._rewind(); - - let idx = 0; - while (true) { - const doc = await this._nextObjectPromise(); - if (!doc) return; - await callback.call(thisArg, doc, idx++, this._selfForIteration); - } - } - - async map(callback, thisArg) { - const results = []; - await this.forEach(async (doc, index) => { - results.push(await callback.call(thisArg, doc, index, this._selfForIteration)); - }); - - return results; - } - - _rewind() { - // known to be synchronous - this._dbCursor.rewind(); - - this._visitedIds = new LocalCollection._IdMap; - } - - // Mostly usable for tailable cursors. - close() { - this._dbCursor.close(); - } - - fetch() { - return this.map(_.identity); - } - - /** - * FIXME: (node:34680) [MONGODB DRIVER] Warning: cursor.count is deprecated and will be - * removed in the next major version, please use `collection.estimatedDocumentCount` or - * `collection.countDocuments` instead. - */ - count() { - return this._dbCursor.count(); - } - - // This method is NOT wrapped in Cursor. - async getRawObjects(ordered) { - var self = this; - if (ordered) { - return self.fetch(); - } else { - var results = new LocalCollection._IdMap; - await self.forEach(function (doc) { - results.set(doc._id, doc); - }); - return results; - } - } -} - -var SynchronousCursor = function (dbCursor, cursorDescription, options, collection) { - var self = this; - const { selfForIteration, useTransform } = options; - options = { selfForIteration, useTransform }; - - self._dbCursor = dbCursor; - self._cursorDescription = cursorDescription; - // The "self" argument passed to forEach/map callbacks. If we're wrapped - // inside a user-visible Cursor, we want to provide the outer cursor! - self._selfForIteration = options.selfForIteration || self; - if (options.useTransform && cursorDescription.options.transform) { - self._transform = LocalCollection.wrapTransform( - cursorDescription.options.transform); - } else { - self._transform = null; - } - - self._synchronousCount = Future.wrap( - collection.countDocuments.bind( - collection, - replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo), - replaceTypes(cursorDescription.options, replaceMeteorAtomWithMongo), - ) - ); - self._visitedIds = new LocalCollection._IdMap; -}; - -Object.assign(SynchronousCursor.prototype, { - // Returns a Promise for the next object from the underlying cursor (before - // the Mongo->Meteor type replacement). - _rawNextObjectPromise: function () { - const self = this; - return new Promise((resolve, reject) => { - self._dbCursor.next((err, doc) => { - if (err) { - reject(err); - } else { - resolve(doc); - } - }); - }); - }, - - // Returns a Promise for the next object from the cursor, skipping those whose - // IDs we've already seen and replacing Mongo atoms with Meteor atoms. - _nextObjectPromise: async function () { - var self = this; - - while (true) { - var doc = await self._rawNextObjectPromise(); - - if (!doc) return null; - doc = replaceTypes(doc, replaceMongoAtomWithMeteor); - - if (!self._cursorDescription.options.tailable && has(doc, '_id')) { - // Did Mongo give us duplicate documents in the same cursor? If so, - // ignore this one. (Do this before the transform, since transform might - // return some unrelated value.) We don't do this for tailable cursors, - // because we want to maintain O(1) memory usage. And if there isn't _id - // for some reason (maybe it's the oplog), then we don't do this either. - // (Be careful to do this for falsey but existing _id, though.) - if (self._visitedIds.has(doc._id)) continue; - self._visitedIds.set(doc._id, true); - } - - if (self._transform) - doc = self._transform(doc); - - return doc; - } - }, - - // Returns a promise which is resolved with the next object (like with - // _nextObjectPromise) or rejected if the cursor doesn't return within - // timeoutMS ms. - _nextObjectPromiseWithTimeout: function (timeoutMS) { - const self = this; - if (!timeoutMS) { - return self._nextObjectPromise(); - } - const nextObjectPromise = self._nextObjectPromise(); - const timeoutErr = new Error('Client-side timeout waiting for next object'); - const timeoutPromise = new Promise((resolve, reject) => { - const timer = setTimeout(() => { - reject(timeoutErr); - }, timeoutMS); - }); - return Promise.race([nextObjectPromise, timeoutPromise]) - .catch((err) => { - if (err === timeoutErr) { - self.close(); - } - throw err; - }); - }, - - _nextObject: function () { - var self = this; - return self._nextObjectPromise().await(); - }, - - forEach: function (callback, thisArg) { - var self = this; - const wrappedFn = Meteor.wrapFn(callback); - - // Get back to the beginning. - self._rewind(); - - // We implement the loop ourself instead of using self._dbCursor.each, - // because "each" will call its callback outside of a fiber which makes it - // much more complex to make this function synchronous. - var index = 0; - while (true) { - var doc = self._nextObject(); - if (!doc) return; - wrappedFn.call(thisArg, doc, index++, self._selfForIteration); - } - }, - - // XXX Allow overlapping callback executions if callback yields. - map: function (callback, thisArg) { - var self = this; - const wrappedFn = Meteor.wrapFn(callback); - var res = []; - self.forEach(function (doc, index) { - res.push(wrappedFn.call(thisArg, doc, index, self._selfForIteration)); - }); - return res; - }, - - _rewind: function () { - var self = this; - - // known to be synchronous - self._dbCursor.rewind(); - - self._visitedIds = new LocalCollection._IdMap; - }, - - // Mostly usable for tailable cursors. - close: function () { - var self = this; - - self._dbCursor.close(); - }, - - fetch: function () { - var self = this; - return self.map(identity); - }, - - count: function () { - var self = this; - return self._synchronousCount().wait(); - }, - - // This method is NOT wrapped in Cursor. - getRawObjects: function (ordered) { - var self = this; - if (ordered) { - return self.fetch(); - } else { - var results = new LocalCollection._IdMap; - self.forEach(function (doc) { - results.set(doc._id, doc); - }); - return results; - } - } -}); - -SynchronousCursor.prototype[Symbol.iterator] = function () { - var self = this; - - // Get back to the beginning. - self._rewind(); - - return { - next() { - const doc = self._nextObject(); - return doc ? { - value: doc - } : { - done: true - }; - } - }; -}; - -SynchronousCursor.prototype[Symbol.asyncIterator] = function () { - const syncResult = this[Symbol.iterator](); - return { - async next() { - return Promise.resolve(syncResult.next()); - } - }; -} - -// Tails the cursor described by cursorDescription, most likely on the -// oplog. Calls docCallback with each document found. Ignores errors and just -// restarts the tail on error. -// -// If timeoutMS is set, then if we don't get a new document every timeoutMS, -// kill and restart the cursor. This is primarily a workaround for #8598. -MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeoutMS) { - var self = this; - if (!cursorDescription.options.tailable) - throw new Error("Can only tail a tailable cursor"); - - var cursor = self._createSynchronousCursor(cursorDescription); - - var stopped = false; - var lastTS; - - Meteor.defer(async function loop() { - var doc = null; - while (true) { - if (stopped) - return; - try { - doc = await cursor._nextObjectPromiseWithTimeout(timeoutMS); - } catch (err) { - // There's no good way to figure out if this was actually an error from - // Mongo, or just client-side (including our own timeout error). Ah - // well. But either way, we need to retry the cursor (unless the failure - // was because the observe got stopped). - doc = null; - } - // Since we awaited a promise above, we need to check again to see if - // we've been stopped before calling the callback. - if (stopped) - return; - if (doc) { - // If a tailable cursor contains a "ts" field, use it to recreate the - // cursor on error. ("ts" is a standard that Mongo uses internally for - // the oplog, and there's a special flag that lets you do binary search - // on it instead of needing to use an index.) - lastTS = doc.ts; - docCallback(doc); - } else { - var newSelector = Object.assign({}, cursorDescription.selector); - if (lastTS) { - newSelector.ts = {$gt: lastTS}; - } - cursor = self._createSynchronousCursor(new CursorDescription( - cursorDescription.collectionName, - newSelector, - cursorDescription.options)); - // Mongo failover takes many seconds. Retry in a bit. (Without this - // setTimeout, we peg the CPU at 100% and never notice the actual - // failover. - setTimeout(loop, 100); - break; - } - } - }); - - return { - stop: function () { - stopped = true; - cursor.close(); - } - }; -}; - -const oplogCollectionWarnings = []; - -Object.assign(MongoConnection.prototype, { - _observeChanges: async function ( - cursorDescription, ordered, callbacks, nonMutatingCallbacks) { - var self = this; - const collectionName = cursorDescription.collectionName; - - if (cursorDescription.options.tailable) { - return self._observeChangesTailable(cursorDescription, ordered, callbacks); - } - - // You may not filter out _id when observing changes, because the id is a core - // part of the observeChanges API. - const fieldsOptions = cursorDescription.options.projection || cursorDescription.options.fields; - if (fieldsOptions && - (fieldsOptions._id === 0 || - fieldsOptions._id === false)) { - throw Error("You may not observe a cursor with {fields: {_id: 0}}"); - } - - var observeKey = EJSON.stringify( - Object.assign({ordered: ordered}, cursorDescription)); - - var multiplexer, observeDriver; - var firstHandle = false; - - // Find a matching ObserveMultiplexer, or create a new one. This next block is - // guaranteed to not yield (and it doesn't call anything that can observe a - // new query), so no other calls to this function can interleave with it. - if (has(self._observeMultiplexers, observeKey)) { - multiplexer = self._observeMultiplexers[observeKey]; - } else { - firstHandle = true; - // Create a new ObserveMultiplexer. - multiplexer = new ObserveMultiplexer({ - ordered: ordered, - onStop: function () { - delete self._observeMultiplexers[observeKey]; - return observeDriver.stop(); - } - }); - } - - var observeHandle = new ObserveHandle(multiplexer, - callbacks, - nonMutatingCallbacks, - ); - - const oplogOptions = self?._oplogHandle?._oplogOptions || {}; - const { includeCollections, excludeCollections } = oplogOptions; - if (firstHandle) { - var matcher, sorter; - var canUseOplog = [ - function () { - // At a bare minimum, using the oplog requires us to have an oplog, to - // want unordered callbacks, and to not want a callback on the polls - // that won't happen. - return self._oplogHandle && !ordered && - !callbacks._testOnlyPollCallback; - }, - function () { - // We also need to check, if the collection of this Cursor is actually being "watched" by the Oplog handle - // if not, we have to fallback to long polling - if (excludeCollections?.length && excludeCollections.includes(collectionName)) { - if (!oplogCollectionWarnings.includes(collectionName)) { - console.warn(`Meteor.settings.packages.mongo.oplogExcludeCollections includes the collection ${collectionName} - your subscriptions will only use long polling!`); - oplogCollectionWarnings.push(collectionName); // we only want to show the warnings once per collection! - } - return false; - } - if (includeCollections?.length && !includeCollections.includes(collectionName)) { - if (!oplogCollectionWarnings.includes(collectionName)) { - console.warn(`Meteor.settings.packages.mongo.oplogIncludeCollections does not include the collection ${collectionName} - your subscriptions will only use long polling!`); - oplogCollectionWarnings.push(collectionName); // we only want to show the warnings once per collection! - } - return false; - } - return true; - }, - function () { - // We need to be able to compile the selector. Fall back to polling for - // some newfangled $selector that minimongo doesn't support yet. - try { - matcher = new Minimongo.Matcher(cursorDescription.selector); - return true; - } catch (e) { - // XXX make all compilation errors MinimongoError or something - // so that this doesn't ignore unrelated exceptions - return false; - } - }, - function () { - // ... and the selector itself needs to support oplog. - return OplogObserveDriver.cursorSupported(cursorDescription, matcher); - }, - function () { - // And we need to be able to compile the sort, if any. eg, can't be - // {$natural: 1}. - if (!cursorDescription.options.sort) - return true; - try { - sorter = new Minimongo.Sorter(cursorDescription.options.sort); - return true; - } catch (e) { - // XXX make all compilation errors MinimongoError or something - // so that this doesn't ignore unrelated exceptions - return false; - } - } - ].every(f => f()); // invoke each function and check if all return true - - var driverClass = canUseOplog ? OplogObserveDriver : PollingObserveDriver; - observeDriver = new driverClass({ - cursorDescription: cursorDescription, - mongoHandle: self, - multiplexer: multiplexer, - ordered: ordered, - matcher: matcher, // ignored by polling - sorter: sorter, // ignored by polling - _testOnlyPollCallback: callbacks._testOnlyPollCallback -}); - - if (observeDriver._init) { - await observeDriver._init(); - } - - // This field is only set for use in tests. - multiplexer._observeDriver = observeDriver; - } - self._observeMultiplexers[observeKey] = multiplexer; - // Blocks until the initial adds have been sent. - await multiplexer.addHandleAndSendInitialAdds(observeHandle); - - return observeHandle; -}, - -}); - - // Listen for the invalidation messages that will trigger us to poll the // database for changes. If this selector specifies specific IDs, specify them // here, so that updates to different specific IDs don't cause us to poll. // listenCallback is the same kind of (notification, complete) callback passed // to InvalidationCrossbar.listen. -listenAll = async function (cursorDescription, listenCallback) { +export const listenAll = async function (cursorDescription, listenCallback) { const listeners = []; await forEachTrigger(cursorDescription, function (trigger) { listeners.push(DDPServer._InvalidationCrossbar.listen( @@ -1610,7 +72,7 @@ listenAll = async function (cursorDescription, listenCallback) { }; }; -forEachTrigger = async function (cursorDescription, triggerCallback) { +export const forEachTrigger = async function (cursorDescription, triggerCallback) { const key = {collection: cursorDescription.collectionName}; const specificIds = LocalCollection._idsMatchedBySelector( cursorDescription.selector); @@ -1626,63 +88,9 @@ forEachTrigger = async function (cursorDescription, triggerCallback) { await triggerCallback({ dropDatabase: true }); }; -// observeChanges for tailable cursors on capped collections. -// -// Some differences from normal cursors: -// - Will never produce anything other than 'added' or 'addedBefore'. If you -// do update a document that has already been produced, this will not notice -// it. -// - If you disconnect and reconnect from Mongo, it will essentially restart -// the query, which will lead to duplicate results. This is pretty bad, -// but if you include a field called 'ts' which is inserted as -// new MongoInternals.MongoTimestamp(0, 0) (which is initialized to the -// current Mongo-style timestamp), we'll be able to find the place to -// restart properly. (This field is specifically understood by Mongo with an -// optimization which allows it to find the right place to start without -// an index on ts. It's how the oplog works.) -// - No callbacks are triggered synchronously with the call (there's no -// differentiation between "initial data" and "later changes"; everything -// that matches the query gets sent asynchronously). -// - De-duplication is not implemented. -// - Does not yet interact with the write fence. Probably, this should work by -// ignoring removes (which don't work on capped collections) and updates -// (which don't affect tailable cursors), and just keeping track of the ID -// of the inserted object, and closing the write fence once you get to that -// ID (or timestamp?). This doesn't work well if the document doesn't match -// the query, though. On the other hand, the write fence can close -// immediately if it does not match the query. So if we trust minimongo -// enough to accurately evaluate the query against the write fence, we -// should be able to do this... Of course, minimongo doesn't even support -// Mongo Timestamps yet. -MongoConnection.prototype._observeChangesTailable = function ( - cursorDescription, ordered, callbacks) { - var self = this; - // Tailable cursors only ever call added/addedBefore callbacks, so it's an - // error if you didn't provide them. - if ((ordered && !callbacks.addedBefore) || - (!ordered && !callbacks.added)) { - throw new Error("Can't observe an " + (ordered ? "ordered" : "unordered") - + " tailable cursor without a " - + (ordered ? "addedBefore" : "added") + " callback"); - } - - return self.tail(cursorDescription, function (doc) { - var id = doc._id; - delete doc._id; - // The ts is an implementation detail. Hide it. - delete doc.ts; - if (ordered) { - callbacks.addedBefore(id, doc, null); - } else { - callbacks.added(id, doc); - } - }); -}; // XXX We probably need to find a better way to expose this. Right now // it's only used by tests, but in fact you need it in normal // operation to interact with capped collections. MongoInternals.MongoTimestamp = MongoDB.Timestamp; - -MongoInternals.Connection = MongoConnection; \ No newline at end of file diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index 3e13047ba3..0edd862ebf 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -2,6 +2,11 @@ import has from 'lodash.has'; import isEmpty from 'lodash.isempty'; import { oplogV2V1Converter } from "./oplog_v2_converter"; import { check, Match } from 'meteor/check'; +import { CursorDescription } from './cursor_description'; +import { forEachTrigger, listenAll } from './mongo_driver'; +import { Cursor } from './cursor'; +import LocalCollection from 'meteor/minimongo/local_collection'; +import { idForOp } from './oplog_tailing'; var PHASE = { QUERYING: "QUERYING", @@ -30,7 +35,7 @@ var currentId = 0; // same simple interface: constructing it starts sending observeChanges // callbacks (and a ready() invocation) to the ObserveMultiplexer, and you stop // it by calling the stop() method. -OplogObserveDriver = function (options) { +export const OplogObserveDriver = function (options) { const self = this; self._usesOplog = true; // tests look at this @@ -113,9 +118,6 @@ OplogObserveDriver = function (options) { self._requeryWhenDoneThisQuery = false; self._writesToCommitWhenWeReachSteady = []; - - - }; _.extend(OplogObserveDriver.prototype, { @@ -1051,6 +1053,4 @@ var modifierCanBeDirectlyApplied = function (modifier) { return !/EJSON\$/.test(field); }); }); -}; - -MongoInternals.OplogObserveDriver = OplogObserveDriver; \ No newline at end of file +}; \ No newline at end of file diff --git a/packages/mongo/oplog_tailing.js b/packages/mongo/oplog_tailing.js deleted file mode 100644 index 3e29bdd322..0000000000 --- a/packages/mongo/oplog_tailing.js +++ /dev/null @@ -1,432 +0,0 @@ -import isEmpty from 'lodash.isempty'; -import has from 'lodash.has'; - -import { NpmModuleMongodb } from "meteor/npm-mongo"; -const { Long } = NpmModuleMongodb; - -OPLOG_COLLECTION = 'oplog.rs'; - -var TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000; -var TAIL_TIMEOUT = +process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000; - -idForOp = function (op) { - if (op.op === 'd') - return op.o._id; - else if (op.op === 'i') - return op.o._id; - else if (op.op === 'u') - return op.o2._id; - else if (op.op === 'c') - throw Error("Operator 'c' doesn't supply an object with id: " + - EJSON.stringify(op)); - else - throw Error("Unknown op: " + EJSON.stringify(op)); -}; - -OplogHandle = function (oplogUrl, dbName) { - var self = this; - self._oplogUrl = oplogUrl; - self._dbName = dbName; - - self._oplogLastEntryConnection = null; - self._oplogTailConnection = null; - self._oplogOptions = null; - self._stopped = false; - self._tailHandle = null; - self._readyPromiseResolver = null; - self._readyPromise = new Promise(r => self._readyPromiseResolver = r); - self._crossbar = new DDPServer._Crossbar({ - factPackage: "mongo-livedata", factName: "oplog-watchers" - }); - self._baseOplogSelector = { - ns: new RegExp("^(?:" + [ - Meteor._escapeRegExp(self._dbName + "."), - Meteor._escapeRegExp("admin.$cmd"), - ].join("|") + ")"), - - $or: [ - { op: { $in: ['i', 'u', 'd'] } }, - // drop collection - { op: 'c', 'o.drop': { $exists: true } }, - { op: 'c', 'o.dropDatabase': 1 }, - { op: 'c', 'o.applyOps': { $exists: true } }, - ] - }; - - // Data structures to support waitUntilCaughtUp(). Each oplog entry has a - // MongoTimestamp object on it (which is not the same as a Date --- it's a - // combination of time and an incrementing counter; see - // http://docs.mongodb.org/manual/reference/bson-types/#timestamps). - // - // _catchingUpFutures is an array of {ts: MongoTimestamp, future: Future} - // objects, sorted by ascending timestamp. _lastProcessedTS is the - // MongoTimestamp of the last oplog entry we've processed. - // - // Each time we call waitUntilCaughtUp, we take a peek at the final oplog - // entry in the db. If we've already processed it (ie, it is not greater than - // _lastProcessedTS), waitUntilCaughtUp immediately returns. Otherwise, - // waitUntilCaughtUp makes a new Future and inserts it along with the final - // timestamp entry that it read, into _catchingUpFutures. waitUntilCaughtUp - // then waits on that future, which is resolved once _lastProcessedTS is - // incremented to be past its timestamp by the worker fiber. - // - // XXX use a priority queue or something else that's faster than an array - self._catchingUpResolvers = []; - self._lastProcessedTS = null; - - self._onSkippedEntriesHook = new Hook({ - debugPrintExceptions: "onSkippedEntries callback" - }); - - self._entryQueue = new Meteor._DoubleEndedQueue(); - self._workerActive = false; - - self._startTrailingPromise = self._startTailing(); - //TODO[fibers] Why wait? -}; - -MongoInternals.OplogHandle = OplogHandle; - -Object.assign(OplogHandle.prototype, { - stop: async function () { - var self = this; - if (self._stopped) - return; - self._stopped = true; - if (self._tailHandle) - await self._tailHandle.stop(); - // XXX should close connections too - }, - _onOplogEntry: async function(trigger, callback) { - var self = this; - if (self._stopped) - throw new Error("Called onOplogEntry on stopped handle!"); - - // Calling onOplogEntry requires us to wait for the tailing to be ready. - await self._readyPromise; - - var listenHandle = self._crossbar.listen(trigger, callback); - return { - stop: async function () { - await listenHandle.stop(); - } - }; - }, - onOplogEntry: function (trigger, callback) { - return this._onOplogEntry(trigger, callback); - }, - // Register a callback to be invoked any time we skip oplog entries (eg, - // because we are too far behind). - onSkippedEntries: function (callback) { - var self = this; - if (self._stopped) - throw new Error("Called onSkippedEntries on stopped handle!"); - return self._onSkippedEntriesHook.register(callback); - }, - - async _waitUntilCaughtUp() { - var self = this; - if (self._stopped) - throw new Error("Called waitUntilCaughtUp on stopped handle!"); - - // Calling waitUntilCaughtUp requries us to wait for the oplog connection to - // be ready. - await self._readyPromise; - var lastEntry; - - while (!self._stopped) { - // We need to make the selector at least as restrictive as the actual - // tailing selector (ie, we need to specify the DB name) or else we might - // find a TS that won't show up in the actual tail stream. - try { - lastEntry = await self._oplogLastEntryConnection.findOneAsync( - OPLOG_COLLECTION, - self._baseOplogSelector, - { projection: { ts: 1 }, sort: { $natural: -1 } } - ); - break; - } catch (e) { - // During failover (eg) if we get an exception we should log and retry - // instead of crashing. - Meteor._debug("Got exception while reading last entry", e); - await Meteor._sleepForMs(100); - } - } - - if (self._stopped) - return; - - if (!lastEntry) { - // Really, nothing in the oplog? Well, we've processed everything. - return; - } - - var ts = lastEntry.ts; - if (!ts) - throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry)); - - if (self._lastProcessedTS && ts.lessThanOrEqual(self._lastProcessedTS)) { - // We've already caught up to here. - return; - } - - - // Insert the future into our list. Almost always, this will be at the end, - // but it's conceivable that if we fail over from one primary to another, - // the oplog entries we see will go backwards. - var insertAfter = self._catchingUpResolvers.length; - while (insertAfter - 1 > 0 && self._catchingUpResolvers[insertAfter - 1].ts.greaterThan(ts)) { - insertAfter--; - } - let promiseResolver = null; - const promiseToAwait = new Promise(r => promiseResolver = r); - self._catchingUpResolvers.splice(insertAfter, 0, {ts: ts, resolver: promiseResolver}); - await promiseToAwait; - }, - - // Calls `callback` once the oplog has been processed up to a point that is - // roughly "now": specifically, once we've processed all ops that are - // currently visible. - // XXX become convinced that this is actually safe even if oplogConnection - // is some kind of pool - waitUntilCaughtUp: async function () { - return this._waitUntilCaughtUp(); - }, - - _startTailing: async function () { - var self = this; - // First, make sure that we're talking to the local database. - var mongodbUri = Npm.require('mongodb-uri'); - if (mongodbUri.parse(self._oplogUrl).database !== 'local') { - throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " + - "a Mongo replica set"); - } - - // We make two separate connections to Mongo. The Node Mongo driver - // implements a naive round-robin connection pool: each "connection" is a - // pool of several (5 by default) TCP connections, and each request is - // rotated through the pools. Tailable cursor queries block on the server - // until there is some data to return (or until a few seconds have - // passed). So if the connection pool used for tailing cursors is the same - // pool used for other queries, the other queries will be delayed by seconds - // 1/5 of the time. - // - // The tail connection will only ever be running a single tail command, so - // it only needs to make one underlying TCP connection. - self._oplogTailConnection = new MongoConnection( - self._oplogUrl, {maxPoolSize: 1, minPoolSize: 1}); - // XXX better docs, but: it's to get monotonic results - // XXX is it safe to say "if there's an in flight query, just use its - // results"? I don't think so but should consider that - self._oplogLastEntryConnection = new MongoConnection( - self._oplogUrl, {maxPoolSize: 1, minPoolSize: 1}); - - - // Now, make sure that there actually is a repl set here. If not, oplog - // tailing won't ever find anything! - // More on the isMasterDoc - // https://docs.mongodb.com/manual/reference/command/isMaster/ - const isMasterDoc = await new Promise(function (resolve, reject) { - self._oplogLastEntryConnection.db - .admin() - .command({ ismaster: 1 }, function (err, result) { - if (err) reject(err); - else resolve(result); - }); - }); - - if (!(isMasterDoc && isMasterDoc.setName)) { - throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " + - "a Mongo replica set"); - } - - // Find the last oplog entry. - var lastOplogEntry = await self._oplogLastEntryConnection.findOneAsync( - OPLOG_COLLECTION, - {}, - { sort: { $natural: -1 }, projection: { ts: 1 } } - ); - - var oplogSelector = Object.assign({}, self._baseOplogSelector); - if (lastOplogEntry) { - // Start after the last entry that currently exists. - oplogSelector.ts = {$gt: lastOplogEntry.ts}; - // If there are any calls to callWhenProcessedLatest before any other - // oplog entries show up, allow callWhenProcessedLatest to call its - // callback immediately. - self._lastProcessedTS = lastOplogEntry.ts; - } - - // These 2 settings allow you to either only watch certain collections (oplogIncludeCollections), or exclude some collections you don't want to watch for oplog updates (oplogExcludeCollections) - // Usage: - // settings.json = { - // "packages": { - // "mongo": { - // "oplogExcludeCollections": ["products", "prices"] // This would exclude both collections "products" and "prices" from any oplog tailing. - // Beware! This means, that no subscriptions on these 2 collections will update anymore! - // } - // } - // } - const includeCollections = Meteor.settings?.packages?.mongo?.oplogIncludeCollections; - const excludeCollections = Meteor.settings?.packages?.mongo?.oplogExcludeCollections; - if (includeCollections?.length && excludeCollections?.length) { - throw new Error("Can't use both mongo oplog settings oplogIncludeCollections and oplogExcludeCollections at the same time."); - } - if (excludeCollections?.length) { - oplogSelector.ns = { - $regex: oplogSelector.ns, - $nin: excludeCollections.map((collName) => `${self._dbName}.${collName}`) - } - self._oplogOptions = { excludeCollections }; - } - else if (includeCollections?.length) { - oplogSelector = { $and: [ - { $or: [ - { ns: /^admin\.\$cmd/ }, - { ns: { $in: includeCollections.map((collName) => `${self._dbName}.${collName}`) } } - ] }, - { $or: oplogSelector.$or }, // the initial $or to select only certain operations (op) - { ts: oplogSelector.ts } - ] }; - self._oplogOptions = { includeCollections }; - } - - var cursorDescription = new CursorDescription( - OPLOG_COLLECTION, oplogSelector, {tailable: true}); - - // Start tailing the oplog. - // - // We restart the low-level oplog query every 30 seconds if we didn't get a - // doc. This is a workaround for #8598: the Node Mongo driver has at least - // one bug that can lead to query callbacks never getting called (even with - // an error) when leadership failover occur. - self._tailHandle = self._oplogTailConnection.tail( - cursorDescription, - function (doc) { - self._entryQueue.push(doc); - self._maybeStartWorker(); - }, - TAIL_TIMEOUT - ); - - self._readyPromiseResolver(); - }, - - _maybeStartWorker: function () { - var self = this; - if (self._workerActive) return; - self._workerActive = true; - - Meteor.defer(async function () { - // May be called recursively in case of transactions. - async function handleDoc(doc) { - if (doc.ns === "admin.$cmd") { - if (doc.o.applyOps) { - // This was a successful transaction, so we need to apply the - // operations that were involved. - let nextTimestamp = doc.ts; - for (const op of doc.o.applyOps) { - // See https://github.com/meteor/meteor/issues/10420. - if (!op.ts) { - op.ts = nextTimestamp; - nextTimestamp = nextTimestamp.add(Long.ONE); - } - await handleDoc(op); - } - return; - } - throw new Error("Unknown command " + EJSON.stringify(doc)); - } - - const trigger = { - dropCollection: false, - dropDatabase: false, - op: doc, - }; - - if (typeof doc.ns === "string" && - doc.ns.startsWith(self._dbName + ".")) { - trigger.collection = doc.ns.slice(self._dbName.length + 1); - } - - // Is it a special command and the collection name is hidden - // somewhere in operator? - if (trigger.collection === "$cmd") { - if (doc.o.dropDatabase) { - delete trigger.collection; - trigger.dropDatabase = true; - } else if (has(doc.o, "drop")) { - trigger.collection = doc.o.drop; - trigger.dropCollection = true; - trigger.id = null; - } else if ("create" in doc.o && "idIndex" in doc.o) { - // A collection got implicitly created within a transaction. There's - // no need to do anything about it. - } else { - throw Error("Unknown command " + EJSON.stringify(doc)); - } - - } else { - // All other ops have an id. - trigger.id = idForOp(doc); - } - - await self._crossbar.fire(trigger); - } - - try { - while (! self._stopped && - ! self._entryQueue.isEmpty()) { - // Are we too far behind? Just tell our observers that they need to - // repoll, and drop our queue. - if (self._entryQueue.length > TOO_FAR_BEHIND) { - var lastEntry = self._entryQueue.pop(); - self._entryQueue.clear(); - - self._onSkippedEntriesHook.each(function (callback) { - callback(); - return true; - }); - - // Free any waitUntilCaughtUp() calls that were waiting for us to - // pass something that we just skipped. - self._setLastProcessedTS(lastEntry.ts); - continue; - } - - const doc = self._entryQueue.shift(); - - // Fire trigger(s) for this doc. - await handleDoc(doc); - - // Now that we've processed this operation, process pending - // sequencers. - if (doc.ts) { - self._setLastProcessedTS(doc.ts); - } else { - throw Error("oplog entry without ts: " + EJSON.stringify(doc)); - } - } - } finally { - self._workerActive = false; - } - }); - }, - - _setLastProcessedTS: function (ts) { - var self = this; - self._lastProcessedTS = ts; - while (!isEmpty(self._catchingUpResolvers) && self._catchingUpResolvers[0].ts.lessThanOrEqual(self._lastProcessedTS)) { - var sequencer = self._catchingUpResolvers.shift(); - sequencer.resolver(); - } - }, - - //Methods used on tests to dinamically change TOO_FAR_BEHIND - _defineTooFarBehind: function(value) { - TOO_FAR_BEHIND = value; - }, - _resetTooFarBehind: function() { - TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000; - } -}); \ No newline at end of file diff --git a/packages/mongo/oplog_tailing.ts b/packages/mongo/oplog_tailing.ts new file mode 100644 index 0000000000..293be2851e --- /dev/null +++ b/packages/mongo/oplog_tailing.ts @@ -0,0 +1,394 @@ +import isEmpty from 'lodash.isempty'; +import { Meteor } from 'meteor/meteor'; +import { CursorDescription } from './cursor_description'; +import { MongoConnection } from './mongo_connection'; + +export const OPLOG_COLLECTION = 'oplog.rs'; + +let TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000); +const TAIL_TIMEOUT = +(process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000); + +interface OplogEntry { + op: string; + o: any; + o2?: any; + ts: any; + ns: string; +} + +interface CatchingUpResolver { + ts: any; + resolver: () => void; +} + +interface OplogTrigger { + dropCollection: boolean; + dropDatabase: boolean; + op: OplogEntry; + collection?: string; + id?: string | null; +} + +export class OplogHandle { + private _oplogUrl: string; + private _dbName: string; + private _oplogLastEntryConnection: MongoConnection | null; + private _oplogTailConnection: MongoConnection | null; + private _oplogOptions: { excludeCollections?: string[]; includeCollections?: string[] } | null; + private _stopped: boolean; + private _tailHandle: any; + private _readyPromiseResolver: (() => void) | null; + private _readyPromise: Promise; + private _crossbar: any; + private _baseOplogSelector: any; + private _catchingUpResolvers: CatchingUpResolver[]; + private _lastProcessedTS: any; + private _onSkippedEntriesHook: any; + private _entryQueue: any; + private _workerActive: boolean; + private _startTrailingPromise: Promise; + + constructor(oplogUrl: string, dbName: string) { + this._oplogUrl = oplogUrl; + this._dbName = dbName; + + this._oplogLastEntryConnection = null; + this._oplogTailConnection = null; + this._oplogOptions = null; + this._stopped = false; + this._tailHandle = null; + this._readyPromiseResolver = null; + this._readyPromise = new Promise(r => this._readyPromiseResolver = r); + this._crossbar = new DDPServer._Crossbar({ + factPackage: "mongo-livedata", factName: "oplog-watchers" + }); + this._baseOplogSelector = { + ns: new RegExp("^(?:" + [ + // @ts-ignore + Meteor._escapeRegExp(this._dbName + "."), + // @ts-ignore + Meteor._escapeRegExp("admin.$cmd"), + ].join("|") + ")"), + + $or: [ + { op: { $in: ['i', 'u', 'd'] } }, + { op: 'c', 'o.drop': { $exists: true } }, + { op: 'c', 'o.dropDatabase': 1 }, + { op: 'c', 'o.applyOps': { $exists: true } }, + ] + }; + + this._catchingUpResolvers = []; + this._lastProcessedTS = null; + + this._onSkippedEntriesHook = new Hook({ + debugPrintExceptions: "onSkippedEntries callback" + }); + + // @ts-ignore + this._entryQueue = new Meteor._DoubleEndedQueue(); + this._workerActive = false; + + this._startTrailingPromise = this._startTailing(); + } + + async stop(): Promise { + if (this._stopped) return; + this._stopped = true; + if (this._tailHandle) { + await this._tailHandle.stop(); + } + } + + async _onOplogEntry(trigger: OplogTrigger, callback: Function): Promise<{ stop: () => Promise }> { + if (this._stopped) { + throw new Error("Called onOplogEntry on stopped handle!"); + } + + await this._readyPromise; + + const listenHandle = this._crossbar.listen(trigger, callback); + return { + stop: async function () { + await listenHandle.stop(); + } + }; + } + + onOplogEntry(trigger: OplogTrigger, callback: Function): Promise<{ stop: () => Promise }> { + return this._onOplogEntry(trigger, callback); + } + + onSkippedEntries(callback: Function): { stop: () => void } { + if (this._stopped) { + throw new Error("Called onSkippedEntries on stopped handle!"); + } + return this._onSkippedEntriesHook.register(callback); + } + + async _waitUntilCaughtUp(): Promise { + if (this._stopped) { + throw new Error("Called waitUntilCaughtUp on stopped handle!"); + } + + await this._readyPromise; + + let lastEntry: OplogEntry | null = null; + + while (!this._stopped) { + try { + lastEntry = await this._oplogLastEntryConnection!.findOneAsync( + OPLOG_COLLECTION, + this._baseOplogSelector, + { projection: { ts: 1 }, sort: { $natural: -1 } } + ); + break; + } catch (e) { + Meteor._debug("Got exception while reading last entry", e); + // @ts-ignore + await Meteor._sleepForMs(100); + } + } + + if (this._stopped) return; + + if (!lastEntry) return; + + const ts = lastEntry.ts; + if (!ts) { + throw Error("oplog entry without ts: " + JSON.stringify(lastEntry)); + } + + if (this._lastProcessedTS && ts.lessThanOrEqual(this._lastProcessedTS)) { + return; + } + + let insertAfter = this._catchingUpResolvers.length; + while (insertAfter - 1 > 0 && this._catchingUpResolvers[insertAfter - 1].ts.greaterThan(ts)) { + insertAfter--; + } + let promiseResolver: (() => void) | null = null; + const promiseToAwait = new Promise(r => promiseResolver = r); + this._catchingUpResolvers.splice(insertAfter, 0, { ts, resolver: promiseResolver! }); + await promiseToAwait; + } + + async waitUntilCaughtUp(): Promise { + return this._waitUntilCaughtUp(); + } + + async _startTailing(): Promise { + const mongodbUri = require('mongodb-uri'); + if (mongodbUri.parse(this._oplogUrl).database !== 'local') { + throw new Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + + this._oplogTailConnection = new MongoConnection( + this._oplogUrl, { maxPoolSize: 1, minPoolSize: 1 } + ); + this._oplogLastEntryConnection = new MongoConnection( + this._oplogUrl, { maxPoolSize: 1, minPoolSize: 1 } + ); + + try { + const isMasterDoc = await new Promise((resolve, reject) => { + this._oplogLastEntryConnection!.db + .admin() + .command({ ismaster: 1 }, (err: Error | null, result: any) => { + if (err) reject(err); + else resolve(result); + }); + }); + + if (!(isMasterDoc && isMasterDoc.setName)) { + throw new Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + + const lastOplogEntry = await this._oplogLastEntryConnection.findOneAsync( + OPLOG_COLLECTION, + {}, + { sort: { $natural: -1 }, projection: { ts: 1 } } + ); + + let oplogSelector: any = { ...this._baseOplogSelector }; + if (lastOplogEntry) { + oplogSelector.ts = { $gt: lastOplogEntry.ts }; + this._lastProcessedTS = lastOplogEntry.ts; + } + + const includeCollections = Meteor.settings?.packages?.mongo?.oplogIncludeCollections; + const excludeCollections = Meteor.settings?.packages?.mongo?.oplogExcludeCollections; + + if (includeCollections?.length && excludeCollections?.length) { + throw new Error("Can't use both mongo oplog settings oplogIncludeCollections and oplogExcludeCollections at the same time."); + } + + if (excludeCollections?.length) { + oplogSelector.ns = { + $regex: oplogSelector.ns, + $nin: excludeCollections.map((collName: string) => `${this._dbName}.${collName}`) + }; + this._oplogOptions = { excludeCollections }; + } else if (includeCollections?.length) { + oplogSelector = { + $and: [ + { + $or: [ + { ns: /^admin\.\$cmd/ }, + { ns: { $in: includeCollections.map((collName: string) => `${this._dbName}.${collName}`) } } + ] + }, + { $or: oplogSelector.$or }, + { ts: oplogSelector.ts } + ] + }; + this._oplogOptions = { includeCollections }; + } + + const cursorDescription = new CursorDescription( + OPLOG_COLLECTION, + oplogSelector, + { tailable: true } + ); + + this._tailHandle = this._oplogTailConnection.tail( + cursorDescription, + (doc: any) => { + this._entryQueue.push(doc); + this._maybeStartWorker(); + }, + TAIL_TIMEOUT + ); + + this._readyPromiseResolver!(); + } catch (error) { + console.error('Error in _startTailing:', error); + throw error; + } + } + + private _maybeStartWorker(): void { + if (this._workerActive) return; + this._workerActive = true; + + Meteor.defer(async () => { + // May be called recursively in case of transactions. + const handleDoc = async (doc: OplogEntry): Promise => { + if (doc.ns === "admin.$cmd") { + if (doc.o.applyOps) { + // This was a successful transaction, so we need to apply the + // operations that were involved. + let nextTimestamp = doc.ts; + for (const op of doc.o.applyOps) { + // See https://github.com/meteor/meteor/issues/10420. + if (!op.ts) { + op.ts = nextTimestamp; + nextTimestamp = nextTimestamp.add(Long.ONE); + } + await handleDoc(op); + } + return; + } + throw new Error("Unknown command " + JSON.stringify(doc)); + } + + const trigger: OplogTrigger = { + dropCollection: false, + dropDatabase: false, + op: doc, + }; + + if (typeof doc.ns === "string" && doc.ns.startsWith(this._dbName + ".")) { + trigger.collection = doc.ns.slice(this._dbName.length + 1); + } + + // Is it a special command and the collection name is hidden + // somewhere in operator? + if (trigger.collection === "$cmd") { + if (doc.o.dropDatabase) { + delete trigger.collection; + trigger.dropDatabase = true; + } else if ("drop" in doc.o) { + trigger.collection = doc.o.drop; + trigger.dropCollection = true; + trigger.id = null; + } else if ("create" in doc.o && "idIndex" in doc.o) { + // A collection got implicitly created within a transaction. There's + // no need to do anything about it. + } else { + throw Error("Unknown command " + JSON.stringify(doc)); + } + } else { + // All other ops have an id. + trigger.id = idForOp(doc); + } + + await this._crossbar.fire(trigger); + }; + + try { + while (!this._stopped && !this._entryQueue.isEmpty()) { + // Are we too far behind? Just tell our observers that they need to + // repoll, and drop our queue. + if (this._entryQueue.length > TOO_FAR_BEHIND) { + const lastEntry = this._entryQueue.pop(); + this._entryQueue.clear(); + + this._onSkippedEntriesHook.each((callback: Function) => { + callback(); + return true; + }); + + // Free any waitUntilCaughtUp() calls that were waiting for us to + // pass something that we just skipped. + this._setLastProcessedTS(lastEntry.ts); + continue; + } + + const doc = this._entryQueue.shift(); + + // Fire trigger(s) for this doc. + await handleDoc(doc); + + // Now that we've processed this operation, process pending + // sequencers. + if (doc.ts) { + this._setLastProcessedTS(doc.ts); + } else { + throw Error("oplog entry without ts: " + JSON.stringify(doc)); + } + } + } finally { + this._workerActive = false; + } + }); + } + + _setLastProcessedTS(ts: any): void { + this._lastProcessedTS = ts; + while (!isEmpty(this._catchingUpResolvers) && this._catchingUpResolvers[0].ts.lessThanOrEqual(this._lastProcessedTS)) { + const sequencer = this._catchingUpResolvers.shift()!; + sequencer.resolver(); + } + } + + _defineTooFarBehind(value: number): void { + TOO_FAR_BEHIND = value; + } + + _resetTooFarBehind(): void { + TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000); + } +} + +export function idForOp(op: OplogEntry): string { + if (op.op === 'd' || op.op === 'i') { + return op.o._id; + } else if (op.op === 'u') { + return op.o2._id; + } else if (op.op === 'c') { + throw Error("Operator 'c' doesn't supply an object with id: " + JSON.stringify(op)); + } else { + throw Error("Unknown op: " + JSON.stringify(op)); + } +} \ No newline at end of file diff --git a/packages/mongo/package.js b/packages/mongo/package.js index 2babd85ac7..bdedaa4c0c 100644 --- a/packages/mongo/package.js +++ b/packages/mongo/package.js @@ -87,7 +87,7 @@ Package.onUse(function (api) { api.addFiles( [ "mongo_driver.js", - "oplog_tailing.js", + "oplog_tailing.ts", "observe_multiplex.ts", "doc_fetcher.js", "polling_observe_driver.js", diff --git a/packages/mongo/polling_observe_driver.js b/packages/mongo/polling_observe_driver.js index 6b4172cd95..e7c3758c73 100644 --- a/packages/mongo/polling_observe_driver.js +++ b/packages/mongo/polling_observe_driver.js @@ -1,9 +1,10 @@ import throttle from 'lodash.throttle'; +import { listenAll } from './mongo_driver'; var POLLING_THROTTLE_MS = +process.env.METEOR_POLLING_THROTTLE_MS || 50; var POLLING_INTERVAL_MS = +process.env.METEOR_POLLING_INTERVAL_MS || 10 * 1000; -PollingObserveDriver = function (options) { +export const PollingObserveDriver = function (options) { const self = this; self._options = options; @@ -14,7 +15,7 @@ PollingObserveDriver = function (options) { self._stopCallbacks = []; self._stopped = false; - self._cursor = self._mongoHandle._createSynchronousCursor( + self._cursor = self._mongoHandle._createAsynchronousCursor( self._cursorDescription); // previous results snapshot. on each poll cycle, diffs against diff --git a/packages/mongo/remote_collection_driver.js b/packages/mongo/remote_collection_driver.js index d8c794abff..5537e80e2f 100644 --- a/packages/mongo/remote_collection_driver.js +++ b/packages/mongo/remote_collection_driver.js @@ -4,11 +4,10 @@ import { getAsyncMethodName, CLIENT_ONLY_METHODS } from "meteor/minimongo/constants"; +import { MongoConnection } from './mongo_connection'; -MongoInternals.RemoteCollectionDriver = function ( - mongo_url, options) { - var self = this; - self.mongo = new MongoConnection(mongo_url, options); +MongoInternals.RemoteCollectionDriver = function (mongo_url, options) { + this.mongo = new MongoConnection(mongo_url, options); }; const REMOTE_COLLECTION_METHODS = [ diff --git a/packages/webapp/.npm/package/npm-shrinkwrap.json b/packages/webapp/.npm/package/npm-shrinkwrap.json index 39ebcd33bb..e904107de5 100644 --- a/packages/webapp/.npm/package/npm-shrinkwrap.json +++ b/packages/webapp/.npm/package/npm-shrinkwrap.json @@ -32,9 +32,9 @@ "integrity": "sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w==" }, "@types/node": { - "version": "22.7.5", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.7.5.tgz", - "integrity": "sha512-jML7s2NAzMWc//QSJ1a3prpk78cOPchGvXJsC3C6R6PSMoooztvRVQEz89gmBTBY1SPMaqo5teB4uNHPdetShQ==" + "version": "22.7.6", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.7.6.tgz", + "integrity": "sha512-/d7Rnj0/ExXDMcioS78/kf1lMzYk4BZV8MZGTBKzTGZ6/406ukkbYlIsZmMPhcR5KlkunDHQLrtAVmSq7r+mSw==" }, "@types/qs": { "version": "6.9.16", @@ -514,11 +514,6 @@ "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz", "integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==" }, - "undici-types": { - "version": "5.26.5", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", - "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==" - }, "unpipe": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz",