mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
break down into smaller files and refactor some to typescript
This commit is contained in:
155
packages/mongo/asynchronous_cursor.js
Normal file
155
packages/mongo/asynchronous_cursor.js
Normal file
@@ -0,0 +1,155 @@
|
||||
import LocalCollection from 'meteor/minimongo/local_collection';
|
||||
import { replaceMongoAtomWithMeteor, replaceTypes } from './mongo_common';
|
||||
|
||||
/**
|
||||
* This is just a light wrapper for the cursor. The goal here is to ensure compatibility even if
|
||||
* there are breaking changes on the MongoDB driver.
|
||||
*
|
||||
* @constructor
|
||||
*/
|
||||
export class AsynchronousCursor {
|
||||
constructor(dbCursor, cursorDescription, options) {
|
||||
this._dbCursor = dbCursor;
|
||||
this._cursorDescription = cursorDescription;
|
||||
|
||||
this._selfForIteration = options.selfForIteration || this;
|
||||
if (options.useTransform && cursorDescription.options.transform) {
|
||||
this._transform = LocalCollection.wrapTransform(
|
||||
cursorDescription.options.transform);
|
||||
} else {
|
||||
this._transform = null;
|
||||
}
|
||||
|
||||
this._visitedIds = new LocalCollection._IdMap;
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator]() {
|
||||
var cursor = this;
|
||||
return {
|
||||
async next() {
|
||||
const value = await cursor._nextObjectPromise();
|
||||
return { done: !value, value };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Returns a Promise for the next object from the underlying cursor (before
|
||||
// the Mongo->Meteor type replacement).
|
||||
async _rawNextObjectPromise() {
|
||||
try {
|
||||
return this._dbCursor.next();
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a Promise for the next object from the cursor, skipping those whose
|
||||
// IDs we've already seen and replacing Mongo atoms with Meteor atoms.
|
||||
async _nextObjectPromise () {
|
||||
while (true) {
|
||||
var doc = await this._rawNextObjectPromise();
|
||||
|
||||
if (!doc) return null;
|
||||
doc = replaceTypes(doc, replaceMongoAtomWithMeteor);
|
||||
|
||||
if (!this._cursorDescription.options.tailable && _.has(doc, '_id')) {
|
||||
// Did Mongo give us duplicate documents in the same cursor? If so,
|
||||
// ignore this one. (Do this before the transform, since transform might
|
||||
// return some unrelated value.) We don't do this for tailable cursors,
|
||||
// because we want to maintain O(1) memory usage. And if there isn't _id
|
||||
// for some reason (maybe it's the oplog), then we don't do this either.
|
||||
// (Be careful to do this for falsey but existing _id, though.)
|
||||
if (this._visitedIds.has(doc._id)) continue;
|
||||
this._visitedIds.set(doc._id, true);
|
||||
}
|
||||
|
||||
if (this._transform)
|
||||
doc = this._transform(doc);
|
||||
|
||||
return doc;
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a promise which is resolved with the next object (like with
|
||||
// _nextObjectPromise) or rejected if the cursor doesn't return within
|
||||
// timeoutMS ms.
|
||||
_nextObjectPromiseWithTimeout(timeoutMS) {
|
||||
if (!timeoutMS) {
|
||||
return this._nextObjectPromise();
|
||||
}
|
||||
const nextObjectPromise = this._nextObjectPromise();
|
||||
const timeoutErr = new Error('Client-side timeout waiting for next object');
|
||||
const timeoutPromise = new Promise((resolve, reject) => {
|
||||
setTimeout(() => {
|
||||
reject(timeoutErr);
|
||||
}, timeoutMS);
|
||||
});
|
||||
return Promise.race([nextObjectPromise, timeoutPromise])
|
||||
.catch((err) => {
|
||||
if (err === timeoutErr) {
|
||||
this.close();
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
}
|
||||
|
||||
async forEach(callback, thisArg) {
|
||||
// Get back to the beginning.
|
||||
this._rewind();
|
||||
|
||||
let idx = 0;
|
||||
while (true) {
|
||||
const doc = await this._nextObjectPromise();
|
||||
if (!doc) return;
|
||||
await callback.call(thisArg, doc, idx++, this._selfForIteration);
|
||||
}
|
||||
}
|
||||
|
||||
async map(callback, thisArg) {
|
||||
const results = [];
|
||||
await this.forEach(async (doc, index) => {
|
||||
results.push(await callback.call(thisArg, doc, index, this._selfForIteration));
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
_rewind() {
|
||||
// known to be synchronous
|
||||
this._dbCursor.rewind();
|
||||
|
||||
this._visitedIds = new LocalCollection._IdMap;
|
||||
}
|
||||
|
||||
// Mostly usable for tailable cursors.
|
||||
close() {
|
||||
this._dbCursor.close();
|
||||
}
|
||||
|
||||
fetch() {
|
||||
return this.map(_.identity);
|
||||
}
|
||||
|
||||
/**
|
||||
* FIXME: (node:34680) [MONGODB DRIVER] Warning: cursor.count is deprecated and will be
|
||||
* removed in the next major version, please use `collection.estimatedDocumentCount` or
|
||||
* `collection.countDocuments` instead.
|
||||
*/
|
||||
count() {
|
||||
return this._dbCursor.count();
|
||||
}
|
||||
|
||||
// This method is NOT wrapped in Cursor.
|
||||
async getRawObjects(ordered) {
|
||||
var self = this;
|
||||
if (ordered) {
|
||||
return self.fetch();
|
||||
} else {
|
||||
var results = new LocalCollection._IdMap;
|
||||
await self.forEach(function (doc) {
|
||||
results.set(doc._id, doc);
|
||||
});
|
||||
return results;
|
||||
}
|
||||
}
|
||||
}
|
||||
147
packages/mongo/cursor.js
Normal file
147
packages/mongo/cursor.js
Normal file
@@ -0,0 +1,147 @@
|
||||
// CURSORS
|
||||
|
||||
// There are several classes which relate to cursors:
|
||||
//
|
||||
// CursorDescription represents the arguments used to construct a cursor:
|
||||
// collectionName, selector, and (find) options. Because it is used as a key
|
||||
// for cursor de-dup, everything in it should either be JSON-stringifiable or
|
||||
// not affect observeChanges output (eg, options.transform functions are not
|
||||
// stringifiable but do not affect observeChanges).
|
||||
//
|
||||
// SynchronousCursor is a wrapper around a MongoDB cursor
|
||||
// which includes fully-synchronous versions of forEach, etc.
|
||||
//
|
||||
// Cursor is the cursor object returned from find(), which implements the
|
||||
// documented Mongo.Collection cursor API. It wraps a CursorDescription and a
|
||||
// SynchronousCursor (lazily: it doesn't contact Mongo until you call a method
|
||||
// like fetch or forEach on it).
|
||||
//
|
||||
// ObserveHandle is the "observe handle" returned from observeChanges. It has a
|
||||
// reference to an ObserveMultiplexer.
|
||||
//
|
||||
// ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a
|
||||
// single observe driver.
|
||||
//
|
||||
// There are two "observe drivers" which drive ObserveMultiplexers:
|
||||
// - PollingObserveDriver caches the results of a query and reruns it when
|
||||
// necessary.
|
||||
// - OplogObserveDriver follows the Mongo operation log to directly observe
|
||||
// database changes.
|
||||
// Both implementations follow the same simple interface: when you create them,
|
||||
// they start sending observeChanges callbacks (and a ready() invocation) to
|
||||
// their ObserveMultiplexer, and you stop them by calling their stop() method.
|
||||
|
||||
import { ASYNC_CURSOR_METHODS, getAsyncMethodName } from 'meteor/minimongo/constants';
|
||||
import { replaceMeteorAtomWithMongo, replaceTypes } from './mongo_common';
|
||||
import LocalCollection from 'meteor/minimongo/local_collection';
|
||||
|
||||
export const Cursor = function (mongo, cursorDescription) {
|
||||
var self = this;
|
||||
|
||||
self._mongo = mongo;
|
||||
self._cursorDescription = cursorDescription;
|
||||
self._synchronousCursor = null;
|
||||
};
|
||||
|
||||
Cursor.prototype.countAsync = async function () {
|
||||
const collection = this._mongo.rawCollection(this._cursorDescription.collectionName);
|
||||
return await collection.countDocuments(
|
||||
replaceTypes(this._cursorDescription.selector, replaceMeteorAtomWithMongo),
|
||||
replaceTypes(this._cursorDescription.options, replaceMeteorAtomWithMongo),
|
||||
);
|
||||
};
|
||||
|
||||
Cursor.prototype.count = function () {
|
||||
throw new Error(
|
||||
"count() is not available on the server. Please use countAsync() instead."
|
||||
);
|
||||
};
|
||||
|
||||
[...ASYNC_CURSOR_METHODS, Symbol.iterator, Symbol.asyncIterator].forEach(methodName => {
|
||||
// count is handled specially since we don't want to create a cursor.
|
||||
// it is still included in ASYNC_CURSOR_METHODS because we still want an async version of it to exist.
|
||||
if (methodName === 'count') {
|
||||
return
|
||||
}
|
||||
Cursor.prototype[methodName] = function (...args) {
|
||||
const cursor = setupAsynchronousCursor(this, methodName);
|
||||
return cursor[methodName](...args);
|
||||
};
|
||||
|
||||
// These methods are handled separately.
|
||||
if (methodName === Symbol.iterator || methodName === Symbol.asyncIterator) {
|
||||
return;
|
||||
}
|
||||
|
||||
const methodNameAsync = getAsyncMethodName(methodName);
|
||||
Cursor.prototype[methodNameAsync] = function (...args) {
|
||||
try {
|
||||
return Promise.resolve(this[methodName](...args));
|
||||
} catch (error) {
|
||||
return Promise.reject(error);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
Cursor.prototype.getTransform = function () {
|
||||
return this._cursorDescription.options.transform;
|
||||
};
|
||||
|
||||
// When you call Meteor.publish() with a function that returns a Cursor, we need
|
||||
// to transmute it into the equivalent subscription. This is the function that
|
||||
// does that.
|
||||
Cursor.prototype._publishCursor = function (sub) {
|
||||
var self = this;
|
||||
var collection = self._cursorDescription.collectionName;
|
||||
return Mongo.Collection._publishCursor(self, sub, collection);
|
||||
};
|
||||
|
||||
// Used to guarantee that publish functions return at most one cursor per
|
||||
// collection. Private, because we might later have cursors that include
|
||||
// documents from multiple collections somehow.
|
||||
Cursor.prototype._getCollectionName = function () {
|
||||
var self = this;
|
||||
return self._cursorDescription.collectionName;
|
||||
};
|
||||
|
||||
Cursor.prototype.observe = function (callbacks) {
|
||||
var self = this;
|
||||
return LocalCollection._observeFromObserveChanges(self, callbacks);
|
||||
};
|
||||
|
||||
Cursor.prototype.observeAsync = function (callbacks) {
|
||||
return new Promise(resolve => resolve(this.observe(callbacks)));
|
||||
};
|
||||
|
||||
Cursor.prototype.observeChanges = function (callbacks, options = {}) {
|
||||
var self = this;
|
||||
|
||||
var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks);
|
||||
|
||||
return self._mongo._observeChanges(
|
||||
self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks);
|
||||
};
|
||||
|
||||
Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) {
|
||||
return this.observeChanges(callbacks, options);
|
||||
};
|
||||
|
||||
function setupAsynchronousCursor(cursor, method) {
|
||||
// You can only observe a tailable cursor.
|
||||
if (cursor._cursorDescription.options.tailable)
|
||||
throw new Error('Cannot call ' + method + ' on a tailable cursor');
|
||||
|
||||
if (!cursor._synchronousCursor) {
|
||||
cursor._synchronousCursor = cursor._mongo._createAsynchronousCursor(
|
||||
cursor._cursorDescription,
|
||||
{
|
||||
// Make sure that the "cursor" argument to forEach/map callbacks is the
|
||||
// Cursor, not the SynchronousCursor.
|
||||
selfForIteration: cursor,
|
||||
useTransform: true,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
return cursor._synchronousCursor;
|
||||
}
|
||||
23
packages/mongo/cursor_description.ts
Normal file
23
packages/mongo/cursor_description.ts
Normal file
@@ -0,0 +1,23 @@
|
||||
interface CursorOptions {
|
||||
limit?: number;
|
||||
skip?: number;
|
||||
sort?: Record<string, 1 | -1>;
|
||||
fields?: Record<string, 1 | 0>;
|
||||
projection?: Record<string, 1 | 0>;
|
||||
disableOplog?: boolean;
|
||||
_disableOplog?: boolean;
|
||||
tailable?: boolean;
|
||||
}
|
||||
|
||||
export class CursorDescription {
|
||||
collectionName: string;
|
||||
selector: Record<string, any>;
|
||||
options: CursorOptions;
|
||||
|
||||
constructor(collectionName: string, selector: any, options?: CursorOptions) {
|
||||
this.collectionName = collectionName;
|
||||
// @ts-ignore
|
||||
this.selector = Mongo.Collection._rewriteSelector(selector);
|
||||
this.options = options || {};
|
||||
}
|
||||
}
|
||||
149
packages/mongo/mongo_common.js
Normal file
149
packages/mongo/mongo_common.js
Normal file
@@ -0,0 +1,149 @@
|
||||
/** @type {import('mongodb')} */
|
||||
export const MongoDB = NpmModuleMongodb;
|
||||
|
||||
// The write methods block until the database has confirmed the write (it may
|
||||
// not be replicated or stable on disk, but one server has confirmed it) if no
|
||||
// callback is provided. If a callback is provided, then they call the callback
|
||||
// when the write is confirmed. They return nothing on success, and raise an
|
||||
// exception on failure.
|
||||
//
|
||||
// After making a write (with insert, update, remove), observers are
|
||||
// notified asynchronously. If you want to receive a callback once all
|
||||
// of the observer notifications have landed for your write, do the
|
||||
// writes inside a write fence (set DDPServer._CurrentWriteFence to a new
|
||||
// _WriteFence, and then set a callback on the write fence.)
|
||||
//
|
||||
// Since our execution environment is single-threaded, this is
|
||||
// well-defined -- a write "has been made" if it's returned, and an
|
||||
// observer "has been notified" if its callback has returned.
|
||||
|
||||
export const writeCallback = function (write, refresh, callback) {
|
||||
return function (err, result) {
|
||||
if (! err) {
|
||||
// XXX We don't have to run this on error, right?
|
||||
try {
|
||||
refresh();
|
||||
} catch (refreshErr) {
|
||||
if (callback) {
|
||||
callback(refreshErr);
|
||||
return;
|
||||
} else {
|
||||
throw refreshErr;
|
||||
}
|
||||
}
|
||||
}
|
||||
write.committed();
|
||||
if (callback) {
|
||||
callback(err, result);
|
||||
} else if (err) {
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
export const transformResult = function (driverResult) {
|
||||
var meteorResult = { numberAffected: 0 };
|
||||
if (driverResult) {
|
||||
var mongoResult = driverResult.result;
|
||||
// On updates with upsert:true, the inserted values come as a list of
|
||||
// upserted values -- even with options.multi, when the upsert does insert,
|
||||
// it only inserts one element.
|
||||
if (mongoResult.upsertedCount) {
|
||||
meteorResult.numberAffected = mongoResult.upsertedCount;
|
||||
|
||||
if (mongoResult.upsertedId) {
|
||||
meteorResult.insertedId = mongoResult.upsertedId;
|
||||
}
|
||||
} else {
|
||||
// n was used before Mongo 5.0, in Mongo 5.0 we are not receiving this n
|
||||
// field and so we are using modifiedCount instead
|
||||
meteorResult.numberAffected = mongoResult.n || mongoResult.matchedCount || mongoResult.modifiedCount;
|
||||
}
|
||||
}
|
||||
|
||||
return meteorResult;
|
||||
};
|
||||
|
||||
export const replaceMeteorAtomWithMongo = function (document) {
|
||||
if (EJSON.isBinary(document)) {
|
||||
// This does more copies than we'd like, but is necessary because
|
||||
// MongoDB.BSON only looks like it takes a Uint8Array (and doesn't actually
|
||||
// serialize it correctly).
|
||||
return new MongoDB.Binary(Buffer.from(document));
|
||||
}
|
||||
if (document instanceof MongoDB.Binary) {
|
||||
return document;
|
||||
}
|
||||
if (document instanceof Mongo.ObjectID) {
|
||||
return new MongoDB.ObjectID(document.toHexString());
|
||||
}
|
||||
if (document instanceof MongoDB.Timestamp) {
|
||||
// For now, the Meteor representation of a Mongo timestamp type (not a date!
|
||||
// this is a weird internal thing used in the oplog!) is the same as the
|
||||
// Mongo representation. We need to do this explicitly or else we would do a
|
||||
// structural clone and lose the prototype.
|
||||
return document;
|
||||
}
|
||||
if (document instanceof Decimal) {
|
||||
return MongoDB.Decimal128.fromString(document.toString());
|
||||
}
|
||||
if (EJSON._isCustomType(document)) {
|
||||
return replaceNames(makeMongoLegal, EJSON.toJSONValue(document));
|
||||
}
|
||||
// It is not ordinarily possible to stick dollar-sign keys into mongo
|
||||
// so we don't bother checking for things that need escaping at this time.
|
||||
return undefined;
|
||||
};
|
||||
|
||||
export const replaceTypes = function (document, atomTransformer) {
|
||||
if (typeof document !== 'object' || document === null)
|
||||
return document;
|
||||
|
||||
var replacedTopLevelAtom = atomTransformer(document);
|
||||
if (replacedTopLevelAtom !== undefined)
|
||||
return replacedTopLevelAtom;
|
||||
|
||||
var ret = document;
|
||||
Object.entries(document).forEach(function ([key, val]) {
|
||||
var valReplaced = replaceTypes(val, atomTransformer);
|
||||
if (val !== valReplaced) {
|
||||
// Lazy clone. Shallow copy.
|
||||
if (ret === document)
|
||||
ret = clone(document);
|
||||
ret[key] = valReplaced;
|
||||
}
|
||||
});
|
||||
return ret;
|
||||
};
|
||||
|
||||
export const replaceMongoAtomWithMeteor = function (document) {
|
||||
if (document instanceof MongoDB.Binary) {
|
||||
// for backwards compatibility
|
||||
if (document.sub_type !== 0) {
|
||||
return document;
|
||||
}
|
||||
var buffer = document.value(true);
|
||||
return new Uint8Array(buffer);
|
||||
}
|
||||
if (document instanceof MongoDB.ObjectID) {
|
||||
return new Mongo.ObjectID(document.toHexString());
|
||||
}
|
||||
if (document instanceof MongoDB.Decimal128) {
|
||||
return Decimal(document.toString());
|
||||
}
|
||||
if (document["EJSON$type"] && document["EJSON$value"] && Object.keys(document).length === 2) {
|
||||
return EJSON.fromJSONValue(replaceNames(unmakeMongoLegal, document));
|
||||
}
|
||||
if (document instanceof MongoDB.Timestamp) {
|
||||
// For now, the Meteor representation of a Mongo timestamp type (not a date!
|
||||
// this is a weird internal thing used in the oplog!) is the same as the
|
||||
// Mongo representation. We need to do this explicitly or else we would do a
|
||||
// structural clone and lose the prototype.
|
||||
return document;
|
||||
}
|
||||
return undefined;
|
||||
};
|
||||
|
||||
const makeMongoLegal = name => "EJSON" + name;
|
||||
const unmakeMongoLegal = name => name.substr(5);
|
||||
936
packages/mongo/mongo_connection.js
Normal file
936
packages/mongo/mongo_connection.js
Normal file
@@ -0,0 +1,936 @@
|
||||
import { Meteor } from 'meteor/meteor';
|
||||
import path from 'path';
|
||||
import { DocFetcher } from './doc_fetcher';
|
||||
import { ObserveMultiplexer } from './observe_multiplex';
|
||||
import { ObserveHandle } from './observe_handle';
|
||||
import { OPLOG_COLLECTION, OplogHandle } from './oplog_tailing';
|
||||
import { CLIENT_ONLY_METHODS, getAsyncMethodName } from 'meteor/minimongo/constants';
|
||||
import { OplogObserveDriver } from './oplog_observe_driver';
|
||||
import { PollingObserveDriver } from './polling_observe_driver';
|
||||
import { replaceMeteorAtomWithMongo, replaceTypes, transformResult } from './mongo_common';
|
||||
import { AsynchronousCursor } from './asynchronous_cursor';
|
||||
import { MongoDB } from './mongo_common';
|
||||
import { Cursor } from './cursor';
|
||||
import { CursorDescription } from './cursor_description';
|
||||
|
||||
const FILE_ASSET_SUFFIX = 'Asset';
|
||||
const ASSETS_FOLDER = 'assets';
|
||||
const APP_FOLDER = 'app';
|
||||
|
||||
const oplogCollectionWarnings = [];
|
||||
|
||||
export const MongoConnection = function (url, options) {
|
||||
var self = this;
|
||||
options = options || {};
|
||||
self._observeMultiplexers = {};
|
||||
self._onFailoverHook = new Hook;
|
||||
|
||||
const userOptions = {
|
||||
...(Mongo._connectionOptions || {}),
|
||||
...(Meteor.settings?.packages?.mongo?.options || {})
|
||||
};
|
||||
|
||||
var mongoOptions = Object.assign({
|
||||
ignoreUndefined: true,
|
||||
}, userOptions);
|
||||
|
||||
|
||||
|
||||
// Internally the oplog connections specify their own maxPoolSize
|
||||
// which we don't want to overwrite with any user defined value
|
||||
if ('maxPoolSize' in options) {
|
||||
// If we just set this for "server", replSet will override it. If we just
|
||||
// set it for replSet, it will be ignored if we're not using a replSet.
|
||||
mongoOptions.maxPoolSize = options.maxPoolSize;
|
||||
}
|
||||
if ('minPoolSize' in options) {
|
||||
mongoOptions.minPoolSize = options.minPoolSize;
|
||||
}
|
||||
|
||||
// Transform options like "tlsCAFileAsset": "filename.pem" into
|
||||
// "tlsCAFile": "/<fullpath>/filename.pem"
|
||||
Object.entries(mongoOptions || {})
|
||||
.filter(([key]) => key && key.endsWith(FILE_ASSET_SUFFIX))
|
||||
.forEach(([key, value]) => {
|
||||
const optionName = key.replace(FILE_ASSET_SUFFIX, '');
|
||||
mongoOptions[optionName] = path.join(Assets.getServerDir(),
|
||||
ASSETS_FOLDER, APP_FOLDER, value);
|
||||
delete mongoOptions[key];
|
||||
});
|
||||
|
||||
self.db = null;
|
||||
self._oplogHandle = null;
|
||||
self._docFetcher = null;
|
||||
|
||||
mongoOptions.driverInfo = {
|
||||
name: 'Meteor',
|
||||
version: Meteor.release
|
||||
}
|
||||
|
||||
self.client = new MongoDB.MongoClient(url, mongoOptions);
|
||||
self.db = self.client.db();
|
||||
|
||||
self.client.on('serverDescriptionChanged', Meteor.bindEnvironment(event => {
|
||||
// When the connection is no longer against the primary node, execute all
|
||||
// failover hooks. This is important for the driver as it has to re-pool the
|
||||
// query when it happens.
|
||||
if (
|
||||
event.previousDescription.type !== 'RSPrimary' &&
|
||||
event.newDescription.type === 'RSPrimary'
|
||||
) {
|
||||
self._onFailoverHook.each(callback => {
|
||||
callback();
|
||||
return true;
|
||||
});
|
||||
}
|
||||
}));
|
||||
|
||||
if (options.oplogUrl && ! Package['disable-oplog']) {
|
||||
self._oplogHandle = new OplogHandle(options.oplogUrl, self.db.databaseName);
|
||||
self._docFetcher = new DocFetcher(self);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
MongoConnection.prototype._close = async function() {
|
||||
var self = this;
|
||||
|
||||
if (! self.db)
|
||||
throw Error("close called before Connection created?");
|
||||
|
||||
// XXX probably untested
|
||||
var oplogHandle = self._oplogHandle;
|
||||
self._oplogHandle = null;
|
||||
if (oplogHandle)
|
||||
await oplogHandle.stop();
|
||||
|
||||
// Use Future.wrap so that errors get thrown. This happens to
|
||||
// work even outside a fiber since the 'close' method is not
|
||||
// actually asynchronous.
|
||||
await self.client.close();
|
||||
};
|
||||
|
||||
MongoConnection.prototype.close = function () {
|
||||
return this._close();
|
||||
};
|
||||
|
||||
MongoConnection.prototype._setOplogHandle = function(oplogHandle) {
|
||||
this._oplogHandle = oplogHandle;
|
||||
return this;
|
||||
};
|
||||
|
||||
// Returns the Mongo Collection object; may yield.
|
||||
MongoConnection.prototype.rawCollection = function (collectionName) {
|
||||
var self = this;
|
||||
|
||||
if (! self.db)
|
||||
throw Error("rawCollection called before Connection created?");
|
||||
|
||||
return self.db.collection(collectionName);
|
||||
};
|
||||
|
||||
MongoConnection.prototype.createCappedCollectionAsync = async function (
|
||||
collectionName, byteSize, maxDocuments) {
|
||||
var self = this;
|
||||
|
||||
if (! self.db)
|
||||
throw Error("createCappedCollectionAsync called before Connection created?");
|
||||
|
||||
|
||||
await self.db.createCollection(collectionName,
|
||||
{ capped: true, size: byteSize, max: maxDocuments });
|
||||
};
|
||||
|
||||
// This should be called synchronously with a write, to create a
|
||||
// transaction on the current write fence, if any. After we can read
|
||||
// the write, and after observers have been notified (or at least,
|
||||
// after the observer notifiers have added themselves to the write
|
||||
// fence), you should call 'committed()' on the object returned.
|
||||
MongoConnection.prototype._maybeBeginWrite = function () {
|
||||
const fence = DDPServer._getCurrentFence();
|
||||
if (fence) {
|
||||
return fence.beginWrite();
|
||||
} else {
|
||||
return {committed: function () {}};
|
||||
}
|
||||
};
|
||||
|
||||
// Internal interface: adds a callback which is called when the Mongo primary
|
||||
// changes. Returns a stop handle.
|
||||
MongoConnection.prototype._onFailover = function (callback) {
|
||||
return this._onFailoverHook.register(callback);
|
||||
};
|
||||
|
||||
MongoConnection.prototype.insertAsync = async function (collection_name, document) {
|
||||
const self = this;
|
||||
|
||||
if (collection_name === "___meteor_failure_test_collection") {
|
||||
const e = new Error("Failure test");
|
||||
e._expectedByTest = true;
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (!(LocalCollection._isPlainObject(document) &&
|
||||
!EJSON._isCustomType(document))) {
|
||||
throw new Error("Only plain objects may be inserted into MongoDB");
|
||||
}
|
||||
|
||||
var write = self._maybeBeginWrite();
|
||||
var refresh = async function () {
|
||||
await Meteor.refresh({collection: collection_name, id: document._id });
|
||||
};
|
||||
return self.rawCollection(collection_name).insertOne(
|
||||
replaceTypes(document, replaceMeteorAtomWithMongo),
|
||||
{
|
||||
safe: true,
|
||||
}
|
||||
).then(async ({insertedId}) => {
|
||||
await refresh();
|
||||
await write.committed();
|
||||
return insertedId;
|
||||
}).catch(async e => {
|
||||
await write.committed();
|
||||
throw e;
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
// Cause queries that may be affected by the selector to poll in this write
|
||||
// fence.
|
||||
MongoConnection.prototype._refresh = async function (collectionName, selector) {
|
||||
var refreshKey = {collection: collectionName};
|
||||
// If we know which documents we're removing, don't poll queries that are
|
||||
// specific to other documents. (Note that multiple notifications here should
|
||||
// not cause multiple polls, since all our listener is doing is enqueueing a
|
||||
// poll.)
|
||||
var specificIds = LocalCollection._idsMatchedBySelector(selector);
|
||||
if (specificIds) {
|
||||
for (const id of specificIds) {
|
||||
await Meteor.refresh(Object.assign({id: id}, refreshKey));
|
||||
};
|
||||
} else {
|
||||
await Meteor.refresh(refreshKey);
|
||||
}
|
||||
};
|
||||
|
||||
MongoConnection.prototype.removeAsync = async function (collection_name, selector) {
|
||||
var self = this;
|
||||
|
||||
if (collection_name === "___meteor_failure_test_collection") {
|
||||
var e = new Error("Failure test");
|
||||
e._expectedByTest = true;
|
||||
throw e;
|
||||
}
|
||||
|
||||
var write = self._maybeBeginWrite();
|
||||
var refresh = async function () {
|
||||
await self._refresh(collection_name, selector);
|
||||
};
|
||||
|
||||
return self.rawCollection(collection_name)
|
||||
.deleteMany(replaceTypes(selector, replaceMeteorAtomWithMongo), {
|
||||
safe: true,
|
||||
})
|
||||
.then(async ({ deletedCount }) => {
|
||||
await refresh();
|
||||
await write.committed();
|
||||
return transformResult({ result : {modifiedCount : deletedCount} }).numberAffected;
|
||||
}).catch(async (err) => {
|
||||
await write.committed();
|
||||
throw err;
|
||||
});
|
||||
};
|
||||
|
||||
MongoConnection.prototype.dropCollectionAsync = async function(collectionName) {
|
||||
var self = this;
|
||||
|
||||
|
||||
var write = self._maybeBeginWrite();
|
||||
var refresh = function() {
|
||||
return Meteor.refresh({
|
||||
collection: collectionName,
|
||||
id: null,
|
||||
dropCollection: true,
|
||||
});
|
||||
};
|
||||
|
||||
return self
|
||||
.rawCollection(collectionName)
|
||||
.drop()
|
||||
.then(async result => {
|
||||
await refresh();
|
||||
await write.committed();
|
||||
return result;
|
||||
})
|
||||
.catch(async e => {
|
||||
await write.committed();
|
||||
throw e;
|
||||
});
|
||||
};
|
||||
|
||||
// For testing only. Slightly better than `c.rawDatabase().dropDatabase()`
|
||||
// because it lets the test's fence wait for it to be complete.
|
||||
MongoConnection.prototype.dropDatabaseAsync = async function () {
|
||||
var self = this;
|
||||
|
||||
var write = self._maybeBeginWrite();
|
||||
var refresh = async function () {
|
||||
await Meteor.refresh({ dropDatabase: true });
|
||||
};
|
||||
|
||||
try {
|
||||
await self.db._dropDatabase();
|
||||
await refresh();
|
||||
await write.committed();
|
||||
} catch (e) {
|
||||
await write.committed();
|
||||
throw e;
|
||||
}
|
||||
};
|
||||
|
||||
MongoConnection.prototype.updateAsync = async function (collection_name, selector, mod, options) {
|
||||
var self = this;
|
||||
|
||||
if (collection_name === "___meteor_failure_test_collection") {
|
||||
var e = new Error("Failure test");
|
||||
e._expectedByTest = true;
|
||||
throw e;
|
||||
}
|
||||
|
||||
// explicit safety check. null and undefined can crash the mongo
|
||||
// driver. Although the node driver and minimongo do 'support'
|
||||
// non-object modifier in that they don't crash, they are not
|
||||
// meaningful operations and do not do anything. Defensively throw an
|
||||
// error here.
|
||||
if (!mod || typeof mod !== 'object') {
|
||||
const error = new Error("Invalid modifier. Modifier must be an object.");
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (!(LocalCollection._isPlainObject(mod) && !EJSON._isCustomType(mod))) {
|
||||
const error = new Error(
|
||||
"Only plain objects may be used as replacement" +
|
||||
" documents in MongoDB");
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (!options) options = {};
|
||||
|
||||
var write = self._maybeBeginWrite();
|
||||
var refresh = async function () {
|
||||
await self._refresh(collection_name, selector);
|
||||
};
|
||||
|
||||
var collection = self.rawCollection(collection_name);
|
||||
var mongoOpts = {safe: true};
|
||||
// Add support for filtered positional operator
|
||||
if (options.arrayFilters !== undefined) mongoOpts.arrayFilters = options.arrayFilters;
|
||||
// explictly enumerate options that minimongo supports
|
||||
if (options.upsert) mongoOpts.upsert = true;
|
||||
if (options.multi) mongoOpts.multi = true;
|
||||
// Lets you get a more more full result from MongoDB. Use with caution:
|
||||
// might not work with C.upsert (as opposed to C.update({upsert:true}) or
|
||||
// with simulated upsert.
|
||||
if (options.fullResult) mongoOpts.fullResult = true;
|
||||
|
||||
var mongoSelector = replaceTypes(selector, replaceMeteorAtomWithMongo);
|
||||
var mongoMod = replaceTypes(mod, replaceMeteorAtomWithMongo);
|
||||
|
||||
var isModify = LocalCollection._isModificationMod(mongoMod);
|
||||
|
||||
if (options._forbidReplace && !isModify) {
|
||||
var err = new Error("Invalid modifier. Replacements are forbidden.");
|
||||
throw err;
|
||||
}
|
||||
|
||||
// We've already run replaceTypes/replaceMeteorAtomWithMongo on
|
||||
// selector and mod. We assume it doesn't matter, as far as
|
||||
// the behavior of modifiers is concerned, whether `_modify`
|
||||
// is run on EJSON or on mongo-converted EJSON.
|
||||
|
||||
// Run this code up front so that it fails fast if someone uses
|
||||
// a Mongo update operator we don't support.
|
||||
let knownId;
|
||||
if (options.upsert) {
|
||||
try {
|
||||
let newDoc = LocalCollection._createUpsertDocument(selector, mod);
|
||||
knownId = newDoc._id;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
if (options.upsert &&
|
||||
! isModify &&
|
||||
! knownId &&
|
||||
options.insertedId &&
|
||||
! (options.insertedId instanceof Mongo.ObjectID &&
|
||||
options.generatedId)) {
|
||||
// In case of an upsert with a replacement, where there is no _id defined
|
||||
// in either the query or the replacement doc, mongo will generate an id itself.
|
||||
// Therefore we need this special strategy if we want to control the id ourselves.
|
||||
|
||||
// We don't need to do this when:
|
||||
// - This is not a replacement, so we can add an _id to $setOnInsert
|
||||
// - The id is defined by query or mod we can just add it to the replacement doc
|
||||
// - The user did not specify any id preference and the id is a Mongo ObjectId,
|
||||
// then we can just let Mongo generate the id
|
||||
return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options)
|
||||
.then(async result => {
|
||||
await refresh();
|
||||
await write.committed();
|
||||
if (result && ! options._returnObject) {
|
||||
return result.numberAffected;
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
if (options.upsert && !knownId && options.insertedId && isModify) {
|
||||
if (!mongoMod.hasOwnProperty('$setOnInsert')) {
|
||||
mongoMod.$setOnInsert = {};
|
||||
}
|
||||
knownId = options.insertedId;
|
||||
Object.assign(mongoMod.$setOnInsert, replaceTypes({_id: options.insertedId}, replaceMeteorAtomWithMongo));
|
||||
}
|
||||
|
||||
const strings = Object.keys(mongoMod).filter((key) => !key.startsWith("$"));
|
||||
let updateMethod = strings.length > 0 ? 'replaceOne' : 'updateMany';
|
||||
updateMethod =
|
||||
updateMethod === 'updateMany' && !mongoOpts.multi
|
||||
? 'updateOne'
|
||||
: updateMethod;
|
||||
return collection[updateMethod]
|
||||
.bind(collection)(mongoSelector, mongoMod, mongoOpts)
|
||||
.then(async result => {
|
||||
var meteorResult = transformResult({result});
|
||||
if (meteorResult && options._returnObject) {
|
||||
// If this was an upsertAsync() call, and we ended up
|
||||
// inserting a new doc and we know its id, then
|
||||
// return that id as well.
|
||||
if (options.upsert && meteorResult.insertedId) {
|
||||
if (knownId) {
|
||||
meteorResult.insertedId = knownId;
|
||||
} else if (meteorResult.insertedId instanceof MongoDB.ObjectID) {
|
||||
meteorResult.insertedId = new Mongo.ObjectID(meteorResult.insertedId.toHexString());
|
||||
}
|
||||
}
|
||||
await refresh();
|
||||
await write.committed();
|
||||
return meteorResult;
|
||||
} else {
|
||||
await refresh();
|
||||
await write.committed();
|
||||
return meteorResult.numberAffected;
|
||||
}
|
||||
}).catch(async (err) => {
|
||||
await write.committed();
|
||||
throw err;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// exposed for testing
|
||||
MongoConnection._isCannotChangeIdError = function (err) {
|
||||
|
||||
// Mongo 3.2.* returns error as next Object:
|
||||
// {name: String, code: Number, errmsg: String}
|
||||
// Older Mongo returns:
|
||||
// {name: String, code: Number, err: String}
|
||||
var error = err.errmsg || err.err;
|
||||
|
||||
// We don't use the error code here
|
||||
// because the error code we observed it producing (16837) appears to be
|
||||
// a far more generic error code based on examining the source.
|
||||
if (error.indexOf('The _id field cannot be changed') === 0
|
||||
|| error.indexOf("the (immutable) field '_id' was found to have been altered to _id") !== -1) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// XXX MongoConnection.upsertAsync() does not return the id of the inserted document
|
||||
// unless you set it explicitly in the selector or modifier (as a replacement
|
||||
// doc).
|
||||
MongoConnection.prototype.upsertAsync = async function (collectionName, selector, mod, options) {
|
||||
var self = this;
|
||||
|
||||
|
||||
|
||||
if (typeof options === "function" && ! callback) {
|
||||
callback = options;
|
||||
options = {};
|
||||
}
|
||||
|
||||
return self.updateAsync(collectionName, selector, mod,
|
||||
Object.assign({}, options, {
|
||||
upsert: true,
|
||||
_returnObject: true
|
||||
}));
|
||||
};
|
||||
|
||||
MongoConnection.prototype.find = function (collectionName, selector, options) {
|
||||
var self = this;
|
||||
|
||||
if (arguments.length === 1)
|
||||
selector = {};
|
||||
|
||||
return new Cursor(
|
||||
self, new CursorDescription(collectionName, selector, options));
|
||||
};
|
||||
|
||||
MongoConnection.prototype.findOneAsync = async function (collection_name, selector, options) {
|
||||
var self = this;
|
||||
if (arguments.length === 1) {
|
||||
selector = {};
|
||||
}
|
||||
|
||||
options = options || {};
|
||||
options.limit = 1;
|
||||
|
||||
const results = await self.find(collection_name, selector, options).fetch();
|
||||
|
||||
return results[0];
|
||||
};
|
||||
|
||||
// We'll actually design an index API later. For now, we just pass through to
|
||||
// Mongo's, but make it synchronous.
|
||||
MongoConnection.prototype.createIndexAsync = async function (collectionName, index,
|
||||
options) {
|
||||
var self = this;
|
||||
|
||||
// We expect this function to be called at startup, not from within a method,
|
||||
// so we don't interact with the write fence.
|
||||
var collection = self.rawCollection(collectionName);
|
||||
await collection.createIndex(index, options);
|
||||
};
|
||||
|
||||
// just to be consistent with the other methods
|
||||
MongoConnection.prototype.createIndex =
|
||||
MongoConnection.prototype.createIndexAsync;
|
||||
|
||||
MongoConnection.prototype.countDocuments = function (collectionName, ...args) {
|
||||
args = args.map(arg => replaceTypes(arg, replaceMeteorAtomWithMongo));
|
||||
const collection = this.rawCollection(collectionName);
|
||||
return collection.countDocuments(...args);
|
||||
};
|
||||
|
||||
MongoConnection.prototype.estimatedDocumentCount = function (collectionName, ...args) {
|
||||
args = args.map(arg => replaceTypes(arg, replaceMeteorAtomWithMongo));
|
||||
const collection = this.rawCollection(collectionName);
|
||||
return collection.estimatedDocumentCount(...args);
|
||||
};
|
||||
|
||||
MongoConnection.prototype.ensureIndexAsync = MongoConnection.prototype.createIndexAsync;
|
||||
|
||||
MongoConnection.prototype.dropIndexAsync = async function (collectionName, index) {
|
||||
var self = this;
|
||||
|
||||
|
||||
// This function is only used by test code, not within a method, so we don't
|
||||
// interact with the write fence.
|
||||
var collection = self.rawCollection(collectionName);
|
||||
var indexName = await collection.dropIndex(index);
|
||||
};
|
||||
|
||||
|
||||
CLIENT_ONLY_METHODS.forEach(function (m) {
|
||||
MongoConnection.prototype[m] = function () {
|
||||
throw new Error(
|
||||
`${m} + is not available on the server. Please use ${getAsyncMethodName(
|
||||
m
|
||||
)}() instead.`
|
||||
);
|
||||
};
|
||||
});
|
||||
|
||||
|
||||
var NUM_OPTIMISTIC_TRIES = 3;
|
||||
|
||||
|
||||
|
||||
var simulateUpsertWithInsertedId = async function (collection, selector, mod, options) {
|
||||
// STRATEGY: First try doing an upsert with a generated ID.
|
||||
// If this throws an error about changing the ID on an existing document
|
||||
// then without affecting the database, we know we should probably try
|
||||
// an update without the generated ID. If it affected 0 documents,
|
||||
// then without affecting the database, we the document that first
|
||||
// gave the error is probably removed and we need to try an insert again
|
||||
// We go back to step one and repeat.
|
||||
// Like all "optimistic write" schemes, we rely on the fact that it's
|
||||
// unlikely our writes will continue to be interfered with under normal
|
||||
// circumstances (though sufficiently heavy contention with writers
|
||||
// disagreeing on the existence of an object will cause writes to fail
|
||||
// in theory).
|
||||
|
||||
var insertedId = options.insertedId; // must exist
|
||||
var mongoOptsForUpdate = {
|
||||
safe: true,
|
||||
multi: options.multi
|
||||
};
|
||||
var mongoOptsForInsert = {
|
||||
safe: true,
|
||||
upsert: true
|
||||
};
|
||||
|
||||
var replacementWithId = Object.assign(
|
||||
replaceTypes({_id: insertedId}, replaceMeteorAtomWithMongo),
|
||||
mod);
|
||||
|
||||
var tries = NUM_OPTIMISTIC_TRIES;
|
||||
|
||||
var doUpdate = async function () {
|
||||
tries--;
|
||||
if (! tries) {
|
||||
throw new Error("Upsert failed after " + NUM_OPTIMISTIC_TRIES + " tries.");
|
||||
} else {
|
||||
let method = collection.updateMany;
|
||||
if(!Object.keys(mod).some(key => key.startsWith("$"))){
|
||||
method = collection.replaceOne.bind(collection);
|
||||
}
|
||||
return method(
|
||||
selector,
|
||||
mod,
|
||||
mongoOptsForUpdate).then(result => {
|
||||
if (result && (result.modifiedCount || result.upsertedCount)) {
|
||||
return {
|
||||
numberAffected: result.modifiedCount || result.upsertedCount,
|
||||
insertedId: result.upsertedId || undefined,
|
||||
};
|
||||
} else {
|
||||
return doConditionalInsert();
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
var doConditionalInsert = function() {
|
||||
return collection.replaceOne(selector, replacementWithId, mongoOptsForInsert)
|
||||
.then(result => ({
|
||||
numberAffected: result.upsertedCount,
|
||||
insertedId: result.upsertedId,
|
||||
})).catch(err => {
|
||||
if (MongoConnection._isCannotChangeIdError(err)) {
|
||||
return doUpdate();
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
};
|
||||
return doUpdate();
|
||||
};
|
||||
|
||||
// observeChanges for tailable cursors on capped collections.
|
||||
//
|
||||
// Some differences from normal cursors:
|
||||
// - Will never produce anything other than 'added' or 'addedBefore'. If you
|
||||
// do update a document that has already been produced, this will not notice
|
||||
// it.
|
||||
// - If you disconnect and reconnect from Mongo, it will essentially restart
|
||||
// the query, which will lead to duplicate results. This is pretty bad,
|
||||
// but if you include a field called 'ts' which is inserted as
|
||||
// new MongoInternals.MongoTimestamp(0, 0) (which is initialized to the
|
||||
// current Mongo-style timestamp), we'll be able to find the place to
|
||||
// restart properly. (This field is specifically understood by Mongo with an
|
||||
// optimization which allows it to find the right place to start without
|
||||
// an index on ts. It's how the oplog works.)
|
||||
// - No callbacks are triggered synchronously with the call (there's no
|
||||
// differentiation between "initial data" and "later changes"; everything
|
||||
// that matches the query gets sent asynchronously).
|
||||
// - De-duplication is not implemented.
|
||||
// - Does not yet interact with the write fence. Probably, this should work by
|
||||
// ignoring removes (which don't work on capped collections) and updates
|
||||
// (which don't affect tailable cursors), and just keeping track of the ID
|
||||
// of the inserted object, and closing the write fence once you get to that
|
||||
// ID (or timestamp?). This doesn't work well if the document doesn't match
|
||||
// the query, though. On the other hand, the write fence can close
|
||||
// immediately if it does not match the query. So if we trust minimongo
|
||||
// enough to accurately evaluate the query against the write fence, we
|
||||
// should be able to do this... Of course, minimongo doesn't even support
|
||||
// Mongo Timestamps yet.
|
||||
MongoConnection.prototype._observeChangesTailable = function (
|
||||
cursorDescription, ordered, callbacks) {
|
||||
var self = this;
|
||||
|
||||
// Tailable cursors only ever call added/addedBefore callbacks, so it's an
|
||||
// error if you didn't provide them.
|
||||
if ((ordered && !callbacks.addedBefore) ||
|
||||
(!ordered && !callbacks.added)) {
|
||||
throw new Error("Can't observe an " + (ordered ? "ordered" : "unordered")
|
||||
+ " tailable cursor without a "
|
||||
+ (ordered ? "addedBefore" : "added") + " callback");
|
||||
}
|
||||
|
||||
return self.tail(cursorDescription, function (doc) {
|
||||
var id = doc._id;
|
||||
delete doc._id;
|
||||
// The ts is an implementation detail. Hide it.
|
||||
delete doc.ts;
|
||||
if (ordered) {
|
||||
callbacks.addedBefore(id, doc, null);
|
||||
} else {
|
||||
callbacks.added(id, doc);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
MongoConnection.prototype._createAsynchronousCursor = function(
|
||||
cursorDescription, options = {}) {
|
||||
var self = this;
|
||||
const { selfForIteration, useTransform } = options;
|
||||
options = { selfForIteration, useTransform };
|
||||
|
||||
var collection = self.rawCollection(cursorDescription.collectionName);
|
||||
var cursorOptions = cursorDescription.options;
|
||||
var mongoOptions = {
|
||||
sort: cursorOptions.sort,
|
||||
limit: cursorOptions.limit,
|
||||
skip: cursorOptions.skip,
|
||||
projection: cursorOptions.fields || cursorOptions.projection,
|
||||
readPreference: cursorOptions.readPreference,
|
||||
};
|
||||
|
||||
// Do we want a tailable cursor (which only works on capped collections)?
|
||||
if (cursorOptions.tailable) {
|
||||
mongoOptions.numberOfRetries = -1;
|
||||
}
|
||||
|
||||
var dbCursor = collection.find(
|
||||
replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo),
|
||||
mongoOptions);
|
||||
|
||||
// Do we want a tailable cursor (which only works on capped collections)?
|
||||
if (cursorOptions.tailable) {
|
||||
// We want a tailable cursor...
|
||||
dbCursor.addCursorFlag("tailable", true)
|
||||
// ... and for the server to wait a bit if any getMore has no data (rather
|
||||
// than making us put the relevant sleeps in the client)...
|
||||
dbCursor.addCursorFlag("awaitData", true)
|
||||
|
||||
// And if this is on the oplog collection and the cursor specifies a 'ts',
|
||||
// then set the undocumented oplog replay flag, which does a special scan to
|
||||
// find the first document (instead of creating an index on ts). This is a
|
||||
// very hard-coded Mongo flag which only works on the oplog collection and
|
||||
// only works with the ts field.
|
||||
if (cursorDescription.collectionName === OPLOG_COLLECTION &&
|
||||
cursorDescription.selector.ts) {
|
||||
dbCursor.addCursorFlag("oplogReplay", true)
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof cursorOptions.maxTimeMs !== 'undefined') {
|
||||
dbCursor = dbCursor.maxTimeMS(cursorOptions.maxTimeMs);
|
||||
}
|
||||
if (typeof cursorOptions.hint !== 'undefined') {
|
||||
dbCursor = dbCursor.hint(cursorOptions.hint);
|
||||
}
|
||||
|
||||
return new AsynchronousCursor(dbCursor, cursorDescription, options, collection);
|
||||
};
|
||||
|
||||
// Tails the cursor described by cursorDescription, most likely on the
|
||||
// oplog. Calls docCallback with each document found. Ignores errors and just
|
||||
// restarts the tail on error.
|
||||
//
|
||||
// If timeoutMS is set, then if we don't get a new document every timeoutMS,
|
||||
// kill and restart the cursor. This is primarily a workaround for #8598.
|
||||
MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeoutMS) {
|
||||
var self = this;
|
||||
if (!cursorDescription.options.tailable)
|
||||
throw new Error("Can only tail a tailable cursor");
|
||||
|
||||
var cursor = self._createAsynchronousCursor(cursorDescription);
|
||||
|
||||
var stopped = false;
|
||||
var lastTS;
|
||||
|
||||
Meteor.defer(async function loop() {
|
||||
var doc = null;
|
||||
while (true) {
|
||||
if (stopped)
|
||||
return;
|
||||
try {
|
||||
doc = await cursor._nextObjectPromiseWithTimeout(timeoutMS);
|
||||
} catch (err) {
|
||||
// There's no good way to figure out if this was actually an error from
|
||||
// Mongo, or just client-side (including our own timeout error). Ah
|
||||
// well. But either way, we need to retry the cursor (unless the failure
|
||||
// was because the observe got stopped).
|
||||
doc = null;
|
||||
}
|
||||
// Since we awaited a promise above, we need to check again to see if
|
||||
// we've been stopped before calling the callback.
|
||||
if (stopped)
|
||||
return;
|
||||
if (doc) {
|
||||
// If a tailable cursor contains a "ts" field, use it to recreate the
|
||||
// cursor on error. ("ts" is a standard that Mongo uses internally for
|
||||
// the oplog, and there's a special flag that lets you do binary search
|
||||
// on it instead of needing to use an index.)
|
||||
lastTS = doc.ts;
|
||||
docCallback(doc);
|
||||
} else {
|
||||
var newSelector = Object.assign({}, cursorDescription.selector);
|
||||
if (lastTS) {
|
||||
newSelector.ts = {$gt: lastTS};
|
||||
}
|
||||
cursor = self._createAsynchronousCursor(new CursorDescription(
|
||||
cursorDescription.collectionName,
|
||||
newSelector,
|
||||
cursorDescription.options));
|
||||
// Mongo failover takes many seconds. Retry in a bit. (Without this
|
||||
// setTimeout, we peg the CPU at 100% and never notice the actual
|
||||
// failover.
|
||||
setTimeout(loop, 100);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
stop: function () {
|
||||
stopped = true;
|
||||
cursor.close();
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
Object.assign(MongoConnection.prototype, {
|
||||
_observeChanges: async function (
|
||||
cursorDescription, ordered, callbacks, nonMutatingCallbacks) {
|
||||
var self = this;
|
||||
const collectionName = cursorDescription.collectionName;
|
||||
|
||||
if (cursorDescription.options.tailable) {
|
||||
return self._observeChangesTailable(cursorDescription, ordered, callbacks);
|
||||
}
|
||||
|
||||
// You may not filter out _id when observing changes, because the id is a core
|
||||
// part of the observeChanges API.
|
||||
const fieldsOptions = cursorDescription.options.projection || cursorDescription.options.fields;
|
||||
if (fieldsOptions &&
|
||||
(fieldsOptions._id === 0 ||
|
||||
fieldsOptions._id === false)) {
|
||||
throw Error("You may not observe a cursor with {fields: {_id: 0}}");
|
||||
}
|
||||
|
||||
var observeKey = EJSON.stringify(
|
||||
Object.assign({ordered: ordered}, cursorDescription));
|
||||
|
||||
var multiplexer, observeDriver;
|
||||
var firstHandle = false;
|
||||
|
||||
// Find a matching ObserveMultiplexer, or create a new one. This next block is
|
||||
// guaranteed to not yield (and it doesn't call anything that can observe a
|
||||
// new query), so no other calls to this function can interleave with it.
|
||||
if (observeKey in self._observeMultiplexers) {
|
||||
multiplexer = self._observeMultiplexers[observeKey];
|
||||
} else {
|
||||
firstHandle = true;
|
||||
// Create a new ObserveMultiplexer.
|
||||
multiplexer = new ObserveMultiplexer({
|
||||
ordered: ordered,
|
||||
onStop: function () {
|
||||
delete self._observeMultiplexers[observeKey];
|
||||
return observeDriver.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
var observeHandle = new ObserveHandle(multiplexer,
|
||||
callbacks,
|
||||
nonMutatingCallbacks,
|
||||
);
|
||||
|
||||
const oplogOptions = self?._oplogHandle?._oplogOptions || {};
|
||||
const { includeCollections, excludeCollections } = oplogOptions;
|
||||
if (firstHandle) {
|
||||
var matcher, sorter;
|
||||
var canUseOplog = [
|
||||
function () {
|
||||
// At a bare minimum, using the oplog requires us to have an oplog, to
|
||||
// want unordered callbacks, and to not want a callback on the polls
|
||||
// that won't happen.
|
||||
return self._oplogHandle && !ordered &&
|
||||
!callbacks._testOnlyPollCallback;
|
||||
},
|
||||
function () {
|
||||
// We also need to check, if the collection of this Cursor is actually being "watched" by the Oplog handle
|
||||
// if not, we have to fallback to long polling
|
||||
if (excludeCollections?.length && excludeCollections.includes(collectionName)) {
|
||||
if (!oplogCollectionWarnings.includes(collectionName)) {
|
||||
console.warn(`Meteor.settings.packages.mongo.oplogExcludeCollections includes the collection ${collectionName} - your subscriptions will only use long polling!`);
|
||||
oplogCollectionWarnings.push(collectionName); // we only want to show the warnings once per collection!
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (includeCollections?.length && !includeCollections.includes(collectionName)) {
|
||||
if (!oplogCollectionWarnings.includes(collectionName)) {
|
||||
console.warn(`Meteor.settings.packages.mongo.oplogIncludeCollections does not include the collection ${collectionName} - your subscriptions will only use long polling!`);
|
||||
oplogCollectionWarnings.push(collectionName); // we only want to show the warnings once per collection!
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
},
|
||||
function () {
|
||||
// We need to be able to compile the selector. Fall back to polling for
|
||||
// some newfangled $selector that minimongo doesn't support yet.
|
||||
try {
|
||||
matcher = new Minimongo.Matcher(cursorDescription.selector);
|
||||
return true;
|
||||
} catch (e) {
|
||||
// XXX make all compilation errors MinimongoError or something
|
||||
// so that this doesn't ignore unrelated exceptions
|
||||
return false;
|
||||
}
|
||||
},
|
||||
function () {
|
||||
// ... and the selector itself needs to support oplog.
|
||||
return OplogObserveDriver.cursorSupported(cursorDescription, matcher);
|
||||
},
|
||||
function () {
|
||||
// And we need to be able to compile the sort, if any. eg, can't be
|
||||
// {$natural: 1}.
|
||||
if (!cursorDescription.options.sort)
|
||||
return true;
|
||||
try {
|
||||
sorter = new Minimongo.Sorter(cursorDescription.options.sort);
|
||||
return true;
|
||||
} catch (e) {
|
||||
// XXX make all compilation errors MinimongoError or something
|
||||
// so that this doesn't ignore unrelated exceptions
|
||||
return false;
|
||||
}
|
||||
}
|
||||
].every(f => f()); // invoke each function and check if all return true
|
||||
|
||||
var driverClass = canUseOplog ? OplogObserveDriver : PollingObserveDriver;
|
||||
observeDriver = new driverClass({
|
||||
cursorDescription: cursorDescription,
|
||||
mongoHandle: self,
|
||||
multiplexer: multiplexer,
|
||||
ordered: ordered,
|
||||
matcher: matcher, // ignored by polling
|
||||
sorter: sorter, // ignored by polling
|
||||
_testOnlyPollCallback: callbacks._testOnlyPollCallback
|
||||
});
|
||||
|
||||
if (observeDriver._init) {
|
||||
await observeDriver._init();
|
||||
}
|
||||
|
||||
// This field is only set for use in tests.
|
||||
multiplexer._observeDriver = observeDriver;
|
||||
}
|
||||
self._observeMultiplexers[observeKey] = multiplexer;
|
||||
// Blocks until the initial adds have been sent.
|
||||
await multiplexer.addHandleAndSendInitialAdds(observeHandle);
|
||||
|
||||
return observeHandle;
|
||||
},
|
||||
|
||||
});
|
||||
File diff suppressed because it is too large
Load Diff
@@ -2,6 +2,11 @@ import has from 'lodash.has';
|
||||
import isEmpty from 'lodash.isempty';
|
||||
import { oplogV2V1Converter } from "./oplog_v2_converter";
|
||||
import { check, Match } from 'meteor/check';
|
||||
import { CursorDescription } from './cursor_description';
|
||||
import { forEachTrigger, listenAll } from './mongo_driver';
|
||||
import { Cursor } from './cursor';
|
||||
import LocalCollection from 'meteor/minimongo/local_collection';
|
||||
import { idForOp } from './oplog_tailing';
|
||||
|
||||
var PHASE = {
|
||||
QUERYING: "QUERYING",
|
||||
@@ -30,7 +35,7 @@ var currentId = 0;
|
||||
// same simple interface: constructing it starts sending observeChanges
|
||||
// callbacks (and a ready() invocation) to the ObserveMultiplexer, and you stop
|
||||
// it by calling the stop() method.
|
||||
OplogObserveDriver = function (options) {
|
||||
export const OplogObserveDriver = function (options) {
|
||||
const self = this;
|
||||
self._usesOplog = true; // tests look at this
|
||||
|
||||
@@ -113,9 +118,6 @@ OplogObserveDriver = function (options) {
|
||||
|
||||
self._requeryWhenDoneThisQuery = false;
|
||||
self._writesToCommitWhenWeReachSteady = [];
|
||||
|
||||
|
||||
|
||||
};
|
||||
|
||||
_.extend(OplogObserveDriver.prototype, {
|
||||
@@ -1051,6 +1053,4 @@ var modifierCanBeDirectlyApplied = function (modifier) {
|
||||
return !/EJSON\$/.test(field);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
MongoInternals.OplogObserveDriver = OplogObserveDriver;
|
||||
};
|
||||
@@ -1,432 +0,0 @@
|
||||
import isEmpty from 'lodash.isempty';
|
||||
import has from 'lodash.has';
|
||||
|
||||
import { NpmModuleMongodb } from "meteor/npm-mongo";
|
||||
const { Long } = NpmModuleMongodb;
|
||||
|
||||
OPLOG_COLLECTION = 'oplog.rs';
|
||||
|
||||
var TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000;
|
||||
var TAIL_TIMEOUT = +process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000;
|
||||
|
||||
idForOp = function (op) {
|
||||
if (op.op === 'd')
|
||||
return op.o._id;
|
||||
else if (op.op === 'i')
|
||||
return op.o._id;
|
||||
else if (op.op === 'u')
|
||||
return op.o2._id;
|
||||
else if (op.op === 'c')
|
||||
throw Error("Operator 'c' doesn't supply an object with id: " +
|
||||
EJSON.stringify(op));
|
||||
else
|
||||
throw Error("Unknown op: " + EJSON.stringify(op));
|
||||
};
|
||||
|
||||
OplogHandle = function (oplogUrl, dbName) {
|
||||
var self = this;
|
||||
self._oplogUrl = oplogUrl;
|
||||
self._dbName = dbName;
|
||||
|
||||
self._oplogLastEntryConnection = null;
|
||||
self._oplogTailConnection = null;
|
||||
self._oplogOptions = null;
|
||||
self._stopped = false;
|
||||
self._tailHandle = null;
|
||||
self._readyPromiseResolver = null;
|
||||
self._readyPromise = new Promise(r => self._readyPromiseResolver = r);
|
||||
self._crossbar = new DDPServer._Crossbar({
|
||||
factPackage: "mongo-livedata", factName: "oplog-watchers"
|
||||
});
|
||||
self._baseOplogSelector = {
|
||||
ns: new RegExp("^(?:" + [
|
||||
Meteor._escapeRegExp(self._dbName + "."),
|
||||
Meteor._escapeRegExp("admin.$cmd"),
|
||||
].join("|") + ")"),
|
||||
|
||||
$or: [
|
||||
{ op: { $in: ['i', 'u', 'd'] } },
|
||||
// drop collection
|
||||
{ op: 'c', 'o.drop': { $exists: true } },
|
||||
{ op: 'c', 'o.dropDatabase': 1 },
|
||||
{ op: 'c', 'o.applyOps': { $exists: true } },
|
||||
]
|
||||
};
|
||||
|
||||
// Data structures to support waitUntilCaughtUp(). Each oplog entry has a
|
||||
// MongoTimestamp object on it (which is not the same as a Date --- it's a
|
||||
// combination of time and an incrementing counter; see
|
||||
// http://docs.mongodb.org/manual/reference/bson-types/#timestamps).
|
||||
//
|
||||
// _catchingUpFutures is an array of {ts: MongoTimestamp, future: Future}
|
||||
// objects, sorted by ascending timestamp. _lastProcessedTS is the
|
||||
// MongoTimestamp of the last oplog entry we've processed.
|
||||
//
|
||||
// Each time we call waitUntilCaughtUp, we take a peek at the final oplog
|
||||
// entry in the db. If we've already processed it (ie, it is not greater than
|
||||
// _lastProcessedTS), waitUntilCaughtUp immediately returns. Otherwise,
|
||||
// waitUntilCaughtUp makes a new Future and inserts it along with the final
|
||||
// timestamp entry that it read, into _catchingUpFutures. waitUntilCaughtUp
|
||||
// then waits on that future, which is resolved once _lastProcessedTS is
|
||||
// incremented to be past its timestamp by the worker fiber.
|
||||
//
|
||||
// XXX use a priority queue or something else that's faster than an array
|
||||
self._catchingUpResolvers = [];
|
||||
self._lastProcessedTS = null;
|
||||
|
||||
self._onSkippedEntriesHook = new Hook({
|
||||
debugPrintExceptions: "onSkippedEntries callback"
|
||||
});
|
||||
|
||||
self._entryQueue = new Meteor._DoubleEndedQueue();
|
||||
self._workerActive = false;
|
||||
|
||||
self._startTrailingPromise = self._startTailing();
|
||||
//TODO[fibers] Why wait?
|
||||
};
|
||||
|
||||
MongoInternals.OplogHandle = OplogHandle;
|
||||
|
||||
Object.assign(OplogHandle.prototype, {
|
||||
stop: async function () {
|
||||
var self = this;
|
||||
if (self._stopped)
|
||||
return;
|
||||
self._stopped = true;
|
||||
if (self._tailHandle)
|
||||
await self._tailHandle.stop();
|
||||
// XXX should close connections too
|
||||
},
|
||||
_onOplogEntry: async function(trigger, callback) {
|
||||
var self = this;
|
||||
if (self._stopped)
|
||||
throw new Error("Called onOplogEntry on stopped handle!");
|
||||
|
||||
// Calling onOplogEntry requires us to wait for the tailing to be ready.
|
||||
await self._readyPromise;
|
||||
|
||||
var listenHandle = self._crossbar.listen(trigger, callback);
|
||||
return {
|
||||
stop: async function () {
|
||||
await listenHandle.stop();
|
||||
}
|
||||
};
|
||||
},
|
||||
onOplogEntry: function (trigger, callback) {
|
||||
return this._onOplogEntry(trigger, callback);
|
||||
},
|
||||
// Register a callback to be invoked any time we skip oplog entries (eg,
|
||||
// because we are too far behind).
|
||||
onSkippedEntries: function (callback) {
|
||||
var self = this;
|
||||
if (self._stopped)
|
||||
throw new Error("Called onSkippedEntries on stopped handle!");
|
||||
return self._onSkippedEntriesHook.register(callback);
|
||||
},
|
||||
|
||||
async _waitUntilCaughtUp() {
|
||||
var self = this;
|
||||
if (self._stopped)
|
||||
throw new Error("Called waitUntilCaughtUp on stopped handle!");
|
||||
|
||||
// Calling waitUntilCaughtUp requries us to wait for the oplog connection to
|
||||
// be ready.
|
||||
await self._readyPromise;
|
||||
var lastEntry;
|
||||
|
||||
while (!self._stopped) {
|
||||
// We need to make the selector at least as restrictive as the actual
|
||||
// tailing selector (ie, we need to specify the DB name) or else we might
|
||||
// find a TS that won't show up in the actual tail stream.
|
||||
try {
|
||||
lastEntry = await self._oplogLastEntryConnection.findOneAsync(
|
||||
OPLOG_COLLECTION,
|
||||
self._baseOplogSelector,
|
||||
{ projection: { ts: 1 }, sort: { $natural: -1 } }
|
||||
);
|
||||
break;
|
||||
} catch (e) {
|
||||
// During failover (eg) if we get an exception we should log and retry
|
||||
// instead of crashing.
|
||||
Meteor._debug("Got exception while reading last entry", e);
|
||||
await Meteor._sleepForMs(100);
|
||||
}
|
||||
}
|
||||
|
||||
if (self._stopped)
|
||||
return;
|
||||
|
||||
if (!lastEntry) {
|
||||
// Really, nothing in the oplog? Well, we've processed everything.
|
||||
return;
|
||||
}
|
||||
|
||||
var ts = lastEntry.ts;
|
||||
if (!ts)
|
||||
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
|
||||
|
||||
if (self._lastProcessedTS && ts.lessThanOrEqual(self._lastProcessedTS)) {
|
||||
// We've already caught up to here.
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Insert the future into our list. Almost always, this will be at the end,
|
||||
// but it's conceivable that if we fail over from one primary to another,
|
||||
// the oplog entries we see will go backwards.
|
||||
var insertAfter = self._catchingUpResolvers.length;
|
||||
while (insertAfter - 1 > 0 && self._catchingUpResolvers[insertAfter - 1].ts.greaterThan(ts)) {
|
||||
insertAfter--;
|
||||
}
|
||||
let promiseResolver = null;
|
||||
const promiseToAwait = new Promise(r => promiseResolver = r);
|
||||
self._catchingUpResolvers.splice(insertAfter, 0, {ts: ts, resolver: promiseResolver});
|
||||
await promiseToAwait;
|
||||
},
|
||||
|
||||
// Calls `callback` once the oplog has been processed up to a point that is
|
||||
// roughly "now": specifically, once we've processed all ops that are
|
||||
// currently visible.
|
||||
// XXX become convinced that this is actually safe even if oplogConnection
|
||||
// is some kind of pool
|
||||
waitUntilCaughtUp: async function () {
|
||||
return this._waitUntilCaughtUp();
|
||||
},
|
||||
|
||||
_startTailing: async function () {
|
||||
var self = this;
|
||||
// First, make sure that we're talking to the local database.
|
||||
var mongodbUri = Npm.require('mongodb-uri');
|
||||
if (mongodbUri.parse(self._oplogUrl).database !== 'local') {
|
||||
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " +
|
||||
"a Mongo replica set");
|
||||
}
|
||||
|
||||
// We make two separate connections to Mongo. The Node Mongo driver
|
||||
// implements a naive round-robin connection pool: each "connection" is a
|
||||
// pool of several (5 by default) TCP connections, and each request is
|
||||
// rotated through the pools. Tailable cursor queries block on the server
|
||||
// until there is some data to return (or until a few seconds have
|
||||
// passed). So if the connection pool used for tailing cursors is the same
|
||||
// pool used for other queries, the other queries will be delayed by seconds
|
||||
// 1/5 of the time.
|
||||
//
|
||||
// The tail connection will only ever be running a single tail command, so
|
||||
// it only needs to make one underlying TCP connection.
|
||||
self._oplogTailConnection = new MongoConnection(
|
||||
self._oplogUrl, {maxPoolSize: 1, minPoolSize: 1});
|
||||
// XXX better docs, but: it's to get monotonic results
|
||||
// XXX is it safe to say "if there's an in flight query, just use its
|
||||
// results"? I don't think so but should consider that
|
||||
self._oplogLastEntryConnection = new MongoConnection(
|
||||
self._oplogUrl, {maxPoolSize: 1, minPoolSize: 1});
|
||||
|
||||
|
||||
// Now, make sure that there actually is a repl set here. If not, oplog
|
||||
// tailing won't ever find anything!
|
||||
// More on the isMasterDoc
|
||||
// https://docs.mongodb.com/manual/reference/command/isMaster/
|
||||
const isMasterDoc = await new Promise(function (resolve, reject) {
|
||||
self._oplogLastEntryConnection.db
|
||||
.admin()
|
||||
.command({ ismaster: 1 }, function (err, result) {
|
||||
if (err) reject(err);
|
||||
else resolve(result);
|
||||
});
|
||||
});
|
||||
|
||||
if (!(isMasterDoc && isMasterDoc.setName)) {
|
||||
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " +
|
||||
"a Mongo replica set");
|
||||
}
|
||||
|
||||
// Find the last oplog entry.
|
||||
var lastOplogEntry = await self._oplogLastEntryConnection.findOneAsync(
|
||||
OPLOG_COLLECTION,
|
||||
{},
|
||||
{ sort: { $natural: -1 }, projection: { ts: 1 } }
|
||||
);
|
||||
|
||||
var oplogSelector = Object.assign({}, self._baseOplogSelector);
|
||||
if (lastOplogEntry) {
|
||||
// Start after the last entry that currently exists.
|
||||
oplogSelector.ts = {$gt: lastOplogEntry.ts};
|
||||
// If there are any calls to callWhenProcessedLatest before any other
|
||||
// oplog entries show up, allow callWhenProcessedLatest to call its
|
||||
// callback immediately.
|
||||
self._lastProcessedTS = lastOplogEntry.ts;
|
||||
}
|
||||
|
||||
// These 2 settings allow you to either only watch certain collections (oplogIncludeCollections), or exclude some collections you don't want to watch for oplog updates (oplogExcludeCollections)
|
||||
// Usage:
|
||||
// settings.json = {
|
||||
// "packages": {
|
||||
// "mongo": {
|
||||
// "oplogExcludeCollections": ["products", "prices"] // This would exclude both collections "products" and "prices" from any oplog tailing.
|
||||
// Beware! This means, that no subscriptions on these 2 collections will update anymore!
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
const includeCollections = Meteor.settings?.packages?.mongo?.oplogIncludeCollections;
|
||||
const excludeCollections = Meteor.settings?.packages?.mongo?.oplogExcludeCollections;
|
||||
if (includeCollections?.length && excludeCollections?.length) {
|
||||
throw new Error("Can't use both mongo oplog settings oplogIncludeCollections and oplogExcludeCollections at the same time.");
|
||||
}
|
||||
if (excludeCollections?.length) {
|
||||
oplogSelector.ns = {
|
||||
$regex: oplogSelector.ns,
|
||||
$nin: excludeCollections.map((collName) => `${self._dbName}.${collName}`)
|
||||
}
|
||||
self._oplogOptions = { excludeCollections };
|
||||
}
|
||||
else if (includeCollections?.length) {
|
||||
oplogSelector = { $and: [
|
||||
{ $or: [
|
||||
{ ns: /^admin\.\$cmd/ },
|
||||
{ ns: { $in: includeCollections.map((collName) => `${self._dbName}.${collName}`) } }
|
||||
] },
|
||||
{ $or: oplogSelector.$or }, // the initial $or to select only certain operations (op)
|
||||
{ ts: oplogSelector.ts }
|
||||
] };
|
||||
self._oplogOptions = { includeCollections };
|
||||
}
|
||||
|
||||
var cursorDescription = new CursorDescription(
|
||||
OPLOG_COLLECTION, oplogSelector, {tailable: true});
|
||||
|
||||
// Start tailing the oplog.
|
||||
//
|
||||
// We restart the low-level oplog query every 30 seconds if we didn't get a
|
||||
// doc. This is a workaround for #8598: the Node Mongo driver has at least
|
||||
// one bug that can lead to query callbacks never getting called (even with
|
||||
// an error) when leadership failover occur.
|
||||
self._tailHandle = self._oplogTailConnection.tail(
|
||||
cursorDescription,
|
||||
function (doc) {
|
||||
self._entryQueue.push(doc);
|
||||
self._maybeStartWorker();
|
||||
},
|
||||
TAIL_TIMEOUT
|
||||
);
|
||||
|
||||
self._readyPromiseResolver();
|
||||
},
|
||||
|
||||
_maybeStartWorker: function () {
|
||||
var self = this;
|
||||
if (self._workerActive) return;
|
||||
self._workerActive = true;
|
||||
|
||||
Meteor.defer(async function () {
|
||||
// May be called recursively in case of transactions.
|
||||
async function handleDoc(doc) {
|
||||
if (doc.ns === "admin.$cmd") {
|
||||
if (doc.o.applyOps) {
|
||||
// This was a successful transaction, so we need to apply the
|
||||
// operations that were involved.
|
||||
let nextTimestamp = doc.ts;
|
||||
for (const op of doc.o.applyOps) {
|
||||
// See https://github.com/meteor/meteor/issues/10420.
|
||||
if (!op.ts) {
|
||||
op.ts = nextTimestamp;
|
||||
nextTimestamp = nextTimestamp.add(Long.ONE);
|
||||
}
|
||||
await handleDoc(op);
|
||||
}
|
||||
return;
|
||||
}
|
||||
throw new Error("Unknown command " + EJSON.stringify(doc));
|
||||
}
|
||||
|
||||
const trigger = {
|
||||
dropCollection: false,
|
||||
dropDatabase: false,
|
||||
op: doc,
|
||||
};
|
||||
|
||||
if (typeof doc.ns === "string" &&
|
||||
doc.ns.startsWith(self._dbName + ".")) {
|
||||
trigger.collection = doc.ns.slice(self._dbName.length + 1);
|
||||
}
|
||||
|
||||
// Is it a special command and the collection name is hidden
|
||||
// somewhere in operator?
|
||||
if (trigger.collection === "$cmd") {
|
||||
if (doc.o.dropDatabase) {
|
||||
delete trigger.collection;
|
||||
trigger.dropDatabase = true;
|
||||
} else if (has(doc.o, "drop")) {
|
||||
trigger.collection = doc.o.drop;
|
||||
trigger.dropCollection = true;
|
||||
trigger.id = null;
|
||||
} else if ("create" in doc.o && "idIndex" in doc.o) {
|
||||
// A collection got implicitly created within a transaction. There's
|
||||
// no need to do anything about it.
|
||||
} else {
|
||||
throw Error("Unknown command " + EJSON.stringify(doc));
|
||||
}
|
||||
|
||||
} else {
|
||||
// All other ops have an id.
|
||||
trigger.id = idForOp(doc);
|
||||
}
|
||||
|
||||
await self._crossbar.fire(trigger);
|
||||
}
|
||||
|
||||
try {
|
||||
while (! self._stopped &&
|
||||
! self._entryQueue.isEmpty()) {
|
||||
// Are we too far behind? Just tell our observers that they need to
|
||||
// repoll, and drop our queue.
|
||||
if (self._entryQueue.length > TOO_FAR_BEHIND) {
|
||||
var lastEntry = self._entryQueue.pop();
|
||||
self._entryQueue.clear();
|
||||
|
||||
self._onSkippedEntriesHook.each(function (callback) {
|
||||
callback();
|
||||
return true;
|
||||
});
|
||||
|
||||
// Free any waitUntilCaughtUp() calls that were waiting for us to
|
||||
// pass something that we just skipped.
|
||||
self._setLastProcessedTS(lastEntry.ts);
|
||||
continue;
|
||||
}
|
||||
|
||||
const doc = self._entryQueue.shift();
|
||||
|
||||
// Fire trigger(s) for this doc.
|
||||
await handleDoc(doc);
|
||||
|
||||
// Now that we've processed this operation, process pending
|
||||
// sequencers.
|
||||
if (doc.ts) {
|
||||
self._setLastProcessedTS(doc.ts);
|
||||
} else {
|
||||
throw Error("oplog entry without ts: " + EJSON.stringify(doc));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
self._workerActive = false;
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
_setLastProcessedTS: function (ts) {
|
||||
var self = this;
|
||||
self._lastProcessedTS = ts;
|
||||
while (!isEmpty(self._catchingUpResolvers) && self._catchingUpResolvers[0].ts.lessThanOrEqual(self._lastProcessedTS)) {
|
||||
var sequencer = self._catchingUpResolvers.shift();
|
||||
sequencer.resolver();
|
||||
}
|
||||
},
|
||||
|
||||
//Methods used on tests to dinamically change TOO_FAR_BEHIND
|
||||
_defineTooFarBehind: function(value) {
|
||||
TOO_FAR_BEHIND = value;
|
||||
},
|
||||
_resetTooFarBehind: function() {
|
||||
TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000;
|
||||
}
|
||||
});
|
||||
394
packages/mongo/oplog_tailing.ts
Normal file
394
packages/mongo/oplog_tailing.ts
Normal file
@@ -0,0 +1,394 @@
|
||||
import isEmpty from 'lodash.isempty';
|
||||
import { Meteor } from 'meteor/meteor';
|
||||
import { CursorDescription } from './cursor_description';
|
||||
import { MongoConnection } from './mongo_connection';
|
||||
|
||||
export const OPLOG_COLLECTION = 'oplog.rs';
|
||||
|
||||
let TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000);
|
||||
const TAIL_TIMEOUT = +(process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000);
|
||||
|
||||
interface OplogEntry {
|
||||
op: string;
|
||||
o: any;
|
||||
o2?: any;
|
||||
ts: any;
|
||||
ns: string;
|
||||
}
|
||||
|
||||
interface CatchingUpResolver {
|
||||
ts: any;
|
||||
resolver: () => void;
|
||||
}
|
||||
|
||||
interface OplogTrigger {
|
||||
dropCollection: boolean;
|
||||
dropDatabase: boolean;
|
||||
op: OplogEntry;
|
||||
collection?: string;
|
||||
id?: string | null;
|
||||
}
|
||||
|
||||
export class OplogHandle {
|
||||
private _oplogUrl: string;
|
||||
private _dbName: string;
|
||||
private _oplogLastEntryConnection: MongoConnection | null;
|
||||
private _oplogTailConnection: MongoConnection | null;
|
||||
private _oplogOptions: { excludeCollections?: string[]; includeCollections?: string[] } | null;
|
||||
private _stopped: boolean;
|
||||
private _tailHandle: any;
|
||||
private _readyPromiseResolver: (() => void) | null;
|
||||
private _readyPromise: Promise<void>;
|
||||
private _crossbar: any;
|
||||
private _baseOplogSelector: any;
|
||||
private _catchingUpResolvers: CatchingUpResolver[];
|
||||
private _lastProcessedTS: any;
|
||||
private _onSkippedEntriesHook: any;
|
||||
private _entryQueue: any;
|
||||
private _workerActive: boolean;
|
||||
private _startTrailingPromise: Promise<void>;
|
||||
|
||||
constructor(oplogUrl: string, dbName: string) {
|
||||
this._oplogUrl = oplogUrl;
|
||||
this._dbName = dbName;
|
||||
|
||||
this._oplogLastEntryConnection = null;
|
||||
this._oplogTailConnection = null;
|
||||
this._oplogOptions = null;
|
||||
this._stopped = false;
|
||||
this._tailHandle = null;
|
||||
this._readyPromiseResolver = null;
|
||||
this._readyPromise = new Promise(r => this._readyPromiseResolver = r);
|
||||
this._crossbar = new DDPServer._Crossbar({
|
||||
factPackage: "mongo-livedata", factName: "oplog-watchers"
|
||||
});
|
||||
this._baseOplogSelector = {
|
||||
ns: new RegExp("^(?:" + [
|
||||
// @ts-ignore
|
||||
Meteor._escapeRegExp(this._dbName + "."),
|
||||
// @ts-ignore
|
||||
Meteor._escapeRegExp("admin.$cmd"),
|
||||
].join("|") + ")"),
|
||||
|
||||
$or: [
|
||||
{ op: { $in: ['i', 'u', 'd'] } },
|
||||
{ op: 'c', 'o.drop': { $exists: true } },
|
||||
{ op: 'c', 'o.dropDatabase': 1 },
|
||||
{ op: 'c', 'o.applyOps': { $exists: true } },
|
||||
]
|
||||
};
|
||||
|
||||
this._catchingUpResolvers = [];
|
||||
this._lastProcessedTS = null;
|
||||
|
||||
this._onSkippedEntriesHook = new Hook({
|
||||
debugPrintExceptions: "onSkippedEntries callback"
|
||||
});
|
||||
|
||||
// @ts-ignore
|
||||
this._entryQueue = new Meteor._DoubleEndedQueue();
|
||||
this._workerActive = false;
|
||||
|
||||
this._startTrailingPromise = this._startTailing();
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (this._stopped) return;
|
||||
this._stopped = true;
|
||||
if (this._tailHandle) {
|
||||
await this._tailHandle.stop();
|
||||
}
|
||||
}
|
||||
|
||||
async _onOplogEntry(trigger: OplogTrigger, callback: Function): Promise<{ stop: () => Promise<void> }> {
|
||||
if (this._stopped) {
|
||||
throw new Error("Called onOplogEntry on stopped handle!");
|
||||
}
|
||||
|
||||
await this._readyPromise;
|
||||
|
||||
const listenHandle = this._crossbar.listen(trigger, callback);
|
||||
return {
|
||||
stop: async function () {
|
||||
await listenHandle.stop();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
onOplogEntry(trigger: OplogTrigger, callback: Function): Promise<{ stop: () => Promise<void> }> {
|
||||
return this._onOplogEntry(trigger, callback);
|
||||
}
|
||||
|
||||
onSkippedEntries(callback: Function): { stop: () => void } {
|
||||
if (this._stopped) {
|
||||
throw new Error("Called onSkippedEntries on stopped handle!");
|
||||
}
|
||||
return this._onSkippedEntriesHook.register(callback);
|
||||
}
|
||||
|
||||
async _waitUntilCaughtUp(): Promise<void> {
|
||||
if (this._stopped) {
|
||||
throw new Error("Called waitUntilCaughtUp on stopped handle!");
|
||||
}
|
||||
|
||||
await this._readyPromise;
|
||||
|
||||
let lastEntry: OplogEntry | null = null;
|
||||
|
||||
while (!this._stopped) {
|
||||
try {
|
||||
lastEntry = await this._oplogLastEntryConnection!.findOneAsync(
|
||||
OPLOG_COLLECTION,
|
||||
this._baseOplogSelector,
|
||||
{ projection: { ts: 1 }, sort: { $natural: -1 } }
|
||||
);
|
||||
break;
|
||||
} catch (e) {
|
||||
Meteor._debug("Got exception while reading last entry", e);
|
||||
// @ts-ignore
|
||||
await Meteor._sleepForMs(100);
|
||||
}
|
||||
}
|
||||
|
||||
if (this._stopped) return;
|
||||
|
||||
if (!lastEntry) return;
|
||||
|
||||
const ts = lastEntry.ts;
|
||||
if (!ts) {
|
||||
throw Error("oplog entry without ts: " + JSON.stringify(lastEntry));
|
||||
}
|
||||
|
||||
if (this._lastProcessedTS && ts.lessThanOrEqual(this._lastProcessedTS)) {
|
||||
return;
|
||||
}
|
||||
|
||||
let insertAfter = this._catchingUpResolvers.length;
|
||||
while (insertAfter - 1 > 0 && this._catchingUpResolvers[insertAfter - 1].ts.greaterThan(ts)) {
|
||||
insertAfter--;
|
||||
}
|
||||
let promiseResolver: (() => void) | null = null;
|
||||
const promiseToAwait = new Promise<void>(r => promiseResolver = r);
|
||||
this._catchingUpResolvers.splice(insertAfter, 0, { ts, resolver: promiseResolver! });
|
||||
await promiseToAwait;
|
||||
}
|
||||
|
||||
async waitUntilCaughtUp(): Promise<void> {
|
||||
return this._waitUntilCaughtUp();
|
||||
}
|
||||
|
||||
async _startTailing(): Promise<void> {
|
||||
const mongodbUri = require('mongodb-uri');
|
||||
if (mongodbUri.parse(this._oplogUrl).database !== 'local') {
|
||||
throw new Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set");
|
||||
}
|
||||
|
||||
this._oplogTailConnection = new MongoConnection(
|
||||
this._oplogUrl, { maxPoolSize: 1, minPoolSize: 1 }
|
||||
);
|
||||
this._oplogLastEntryConnection = new MongoConnection(
|
||||
this._oplogUrl, { maxPoolSize: 1, minPoolSize: 1 }
|
||||
);
|
||||
|
||||
try {
|
||||
const isMasterDoc = await new Promise<any>((resolve, reject) => {
|
||||
this._oplogLastEntryConnection!.db
|
||||
.admin()
|
||||
.command({ ismaster: 1 }, (err: Error | null, result: any) => {
|
||||
if (err) reject(err);
|
||||
else resolve(result);
|
||||
});
|
||||
});
|
||||
|
||||
if (!(isMasterDoc && isMasterDoc.setName)) {
|
||||
throw new Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set");
|
||||
}
|
||||
|
||||
const lastOplogEntry = await this._oplogLastEntryConnection.findOneAsync(
|
||||
OPLOG_COLLECTION,
|
||||
{},
|
||||
{ sort: { $natural: -1 }, projection: { ts: 1 } }
|
||||
);
|
||||
|
||||
let oplogSelector: any = { ...this._baseOplogSelector };
|
||||
if (lastOplogEntry) {
|
||||
oplogSelector.ts = { $gt: lastOplogEntry.ts };
|
||||
this._lastProcessedTS = lastOplogEntry.ts;
|
||||
}
|
||||
|
||||
const includeCollections = Meteor.settings?.packages?.mongo?.oplogIncludeCollections;
|
||||
const excludeCollections = Meteor.settings?.packages?.mongo?.oplogExcludeCollections;
|
||||
|
||||
if (includeCollections?.length && excludeCollections?.length) {
|
||||
throw new Error("Can't use both mongo oplog settings oplogIncludeCollections and oplogExcludeCollections at the same time.");
|
||||
}
|
||||
|
||||
if (excludeCollections?.length) {
|
||||
oplogSelector.ns = {
|
||||
$regex: oplogSelector.ns,
|
||||
$nin: excludeCollections.map((collName: string) => `${this._dbName}.${collName}`)
|
||||
};
|
||||
this._oplogOptions = { excludeCollections };
|
||||
} else if (includeCollections?.length) {
|
||||
oplogSelector = {
|
||||
$and: [
|
||||
{
|
||||
$or: [
|
||||
{ ns: /^admin\.\$cmd/ },
|
||||
{ ns: { $in: includeCollections.map((collName: string) => `${this._dbName}.${collName}`) } }
|
||||
]
|
||||
},
|
||||
{ $or: oplogSelector.$or },
|
||||
{ ts: oplogSelector.ts }
|
||||
]
|
||||
};
|
||||
this._oplogOptions = { includeCollections };
|
||||
}
|
||||
|
||||
const cursorDescription = new CursorDescription(
|
||||
OPLOG_COLLECTION,
|
||||
oplogSelector,
|
||||
{ tailable: true }
|
||||
);
|
||||
|
||||
this._tailHandle = this._oplogTailConnection.tail(
|
||||
cursorDescription,
|
||||
(doc: any) => {
|
||||
this._entryQueue.push(doc);
|
||||
this._maybeStartWorker();
|
||||
},
|
||||
TAIL_TIMEOUT
|
||||
);
|
||||
|
||||
this._readyPromiseResolver!();
|
||||
} catch (error) {
|
||||
console.error('Error in _startTailing:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private _maybeStartWorker(): void {
|
||||
if (this._workerActive) return;
|
||||
this._workerActive = true;
|
||||
|
||||
Meteor.defer(async () => {
|
||||
// May be called recursively in case of transactions.
|
||||
const handleDoc = async (doc: OplogEntry): Promise<void> => {
|
||||
if (doc.ns === "admin.$cmd") {
|
||||
if (doc.o.applyOps) {
|
||||
// This was a successful transaction, so we need to apply the
|
||||
// operations that were involved.
|
||||
let nextTimestamp = doc.ts;
|
||||
for (const op of doc.o.applyOps) {
|
||||
// See https://github.com/meteor/meteor/issues/10420.
|
||||
if (!op.ts) {
|
||||
op.ts = nextTimestamp;
|
||||
nextTimestamp = nextTimestamp.add(Long.ONE);
|
||||
}
|
||||
await handleDoc(op);
|
||||
}
|
||||
return;
|
||||
}
|
||||
throw new Error("Unknown command " + JSON.stringify(doc));
|
||||
}
|
||||
|
||||
const trigger: OplogTrigger = {
|
||||
dropCollection: false,
|
||||
dropDatabase: false,
|
||||
op: doc,
|
||||
};
|
||||
|
||||
if (typeof doc.ns === "string" && doc.ns.startsWith(this._dbName + ".")) {
|
||||
trigger.collection = doc.ns.slice(this._dbName.length + 1);
|
||||
}
|
||||
|
||||
// Is it a special command and the collection name is hidden
|
||||
// somewhere in operator?
|
||||
if (trigger.collection === "$cmd") {
|
||||
if (doc.o.dropDatabase) {
|
||||
delete trigger.collection;
|
||||
trigger.dropDatabase = true;
|
||||
} else if ("drop" in doc.o) {
|
||||
trigger.collection = doc.o.drop;
|
||||
trigger.dropCollection = true;
|
||||
trigger.id = null;
|
||||
} else if ("create" in doc.o && "idIndex" in doc.o) {
|
||||
// A collection got implicitly created within a transaction. There's
|
||||
// no need to do anything about it.
|
||||
} else {
|
||||
throw Error("Unknown command " + JSON.stringify(doc));
|
||||
}
|
||||
} else {
|
||||
// All other ops have an id.
|
||||
trigger.id = idForOp(doc);
|
||||
}
|
||||
|
||||
await this._crossbar.fire(trigger);
|
||||
};
|
||||
|
||||
try {
|
||||
while (!this._stopped && !this._entryQueue.isEmpty()) {
|
||||
// Are we too far behind? Just tell our observers that they need to
|
||||
// repoll, and drop our queue.
|
||||
if (this._entryQueue.length > TOO_FAR_BEHIND) {
|
||||
const lastEntry = this._entryQueue.pop();
|
||||
this._entryQueue.clear();
|
||||
|
||||
this._onSkippedEntriesHook.each((callback: Function) => {
|
||||
callback();
|
||||
return true;
|
||||
});
|
||||
|
||||
// Free any waitUntilCaughtUp() calls that were waiting for us to
|
||||
// pass something that we just skipped.
|
||||
this._setLastProcessedTS(lastEntry.ts);
|
||||
continue;
|
||||
}
|
||||
|
||||
const doc = this._entryQueue.shift();
|
||||
|
||||
// Fire trigger(s) for this doc.
|
||||
await handleDoc(doc);
|
||||
|
||||
// Now that we've processed this operation, process pending
|
||||
// sequencers.
|
||||
if (doc.ts) {
|
||||
this._setLastProcessedTS(doc.ts);
|
||||
} else {
|
||||
throw Error("oplog entry without ts: " + JSON.stringify(doc));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this._workerActive = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
_setLastProcessedTS(ts: any): void {
|
||||
this._lastProcessedTS = ts;
|
||||
while (!isEmpty(this._catchingUpResolvers) && this._catchingUpResolvers[0].ts.lessThanOrEqual(this._lastProcessedTS)) {
|
||||
const sequencer = this._catchingUpResolvers.shift()!;
|
||||
sequencer.resolver();
|
||||
}
|
||||
}
|
||||
|
||||
_defineTooFarBehind(value: number): void {
|
||||
TOO_FAR_BEHIND = value;
|
||||
}
|
||||
|
||||
_resetTooFarBehind(): void {
|
||||
TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000);
|
||||
}
|
||||
}
|
||||
|
||||
export function idForOp(op: OplogEntry): string {
|
||||
if (op.op === 'd' || op.op === 'i') {
|
||||
return op.o._id;
|
||||
} else if (op.op === 'u') {
|
||||
return op.o2._id;
|
||||
} else if (op.op === 'c') {
|
||||
throw Error("Operator 'c' doesn't supply an object with id: " + JSON.stringify(op));
|
||||
} else {
|
||||
throw Error("Unknown op: " + JSON.stringify(op));
|
||||
}
|
||||
}
|
||||
@@ -87,7 +87,7 @@ Package.onUse(function (api) {
|
||||
api.addFiles(
|
||||
[
|
||||
"mongo_driver.js",
|
||||
"oplog_tailing.js",
|
||||
"oplog_tailing.ts",
|
||||
"observe_multiplex.ts",
|
||||
"doc_fetcher.js",
|
||||
"polling_observe_driver.js",
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import throttle from 'lodash.throttle';
|
||||
import { listenAll } from './mongo_driver';
|
||||
|
||||
var POLLING_THROTTLE_MS = +process.env.METEOR_POLLING_THROTTLE_MS || 50;
|
||||
var POLLING_INTERVAL_MS = +process.env.METEOR_POLLING_INTERVAL_MS || 10 * 1000;
|
||||
|
||||
PollingObserveDriver = function (options) {
|
||||
export const PollingObserveDriver = function (options) {
|
||||
const self = this;
|
||||
self._options = options;
|
||||
|
||||
@@ -14,7 +15,7 @@ PollingObserveDriver = function (options) {
|
||||
self._stopCallbacks = [];
|
||||
self._stopped = false;
|
||||
|
||||
self._cursor = self._mongoHandle._createSynchronousCursor(
|
||||
self._cursor = self._mongoHandle._createAsynchronousCursor(
|
||||
self._cursorDescription);
|
||||
|
||||
// previous results snapshot. on each poll cycle, diffs against
|
||||
|
||||
@@ -4,11 +4,10 @@ import {
|
||||
getAsyncMethodName,
|
||||
CLIENT_ONLY_METHODS
|
||||
} from "meteor/minimongo/constants";
|
||||
import { MongoConnection } from './mongo_connection';
|
||||
|
||||
MongoInternals.RemoteCollectionDriver = function (
|
||||
mongo_url, options) {
|
||||
var self = this;
|
||||
self.mongo = new MongoConnection(mongo_url, options);
|
||||
MongoInternals.RemoteCollectionDriver = function (mongo_url, options) {
|
||||
this.mongo = new MongoConnection(mongo_url, options);
|
||||
};
|
||||
|
||||
const REMOTE_COLLECTION_METHODS = [
|
||||
|
||||
11
packages/webapp/.npm/package/npm-shrinkwrap.json
generated
11
packages/webapp/.npm/package/npm-shrinkwrap.json
generated
@@ -32,9 +32,9 @@
|
||||
"integrity": "sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w=="
|
||||
},
|
||||
"@types/node": {
|
||||
"version": "22.7.5",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.7.5.tgz",
|
||||
"integrity": "sha512-jML7s2NAzMWc//QSJ1a3prpk78cOPchGvXJsC3C6R6PSMoooztvRVQEz89gmBTBY1SPMaqo5teB4uNHPdetShQ=="
|
||||
"version": "22.7.6",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.7.6.tgz",
|
||||
"integrity": "sha512-/d7Rnj0/ExXDMcioS78/kf1lMzYk4BZV8MZGTBKzTGZ6/406ukkbYlIsZmMPhcR5KlkunDHQLrtAVmSq7r+mSw=="
|
||||
},
|
||||
"@types/qs": {
|
||||
"version": "6.9.16",
|
||||
@@ -514,11 +514,6 @@
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
|
||||
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw=="
|
||||
},
|
||||
"undici-types": {
|
||||
"version": "5.26.5",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
|
||||
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="
|
||||
},
|
||||
"unpipe": {
|
||||
"version": "1.0.0",
|
||||
"resolved": "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz",
|
||||
|
||||
Reference in New Issue
Block a user