diff --git a/History.md b/History.md index b360b74088..69ebe569e1 100644 --- a/History.md +++ b/History.md @@ -81,6 +81,11 @@ N/A * Updated V8 to [release v7.8](https://v8.dev/blog/v8-release-78) which includes improvements in performance, for example, object destructuring now is as fast as the equivalent variable assignment. * [12.15.0](https://nodejs.org/en/blog/release/v12.15.0/) +* `cursor.observeChanges` now accepts a second options argument. + If your observer functions do not mutate the passed arguments, you can specify + `{ nonMutatingCallbacks: true }`, which improves performance by reducing + the amount of data copies. + ## v1.9, 2020-01-09 ### Breaking changes diff --git a/packages/accounts-base/accounts_server.js b/packages/accounts-base/accounts_server.js index fc0d35a89d..afed16aba8 100644 --- a/packages/accounts-base/accounts_server.js +++ b/packages/accounts-base/accounts_server.js @@ -895,7 +895,7 @@ export class AccountsServer extends AccountsCommon { // The onClose callback for the connection takes care of // cleaning up the observe handle and any other state we have // lying around. - }); + }, { nonMutatingCallbacks: true }); // If the user ran another login or logout command we were waiting for the // defer or added to fire (ie, another call to _setLoginToken occurred), diff --git a/packages/minimongo/local_collection.js b/packages/minimongo/local_collection.js index 5ea83843e0..e767137089 100644 --- a/packages/minimongo/local_collection.js +++ b/packages/minimongo/local_collection.js @@ -669,17 +669,18 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver { this.docs = new OrderedDict(MongoID.idStringify); this.applyChange = { addedBefore: (id, fields, before) => { - const doc = EJSON.clone(fields); + // Take a shallow copy since the top-level properties can be changed + const doc = { ...fields }; doc._id = id; if (callbacks.addedBefore) { - callbacks.addedBefore.call(this, id, fields, before); + callbacks.addedBefore.call(this, id, EJSON.clone(fields), before); } // This line triggers if we provide added with movedBefore. if (callbacks.added) { - callbacks.added.call(this, id, fields); + callbacks.added.call(this, id, EJSON.clone(fields)); } // XXX could `before` be a falsy ID? Technically @@ -701,10 +702,11 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver { this.docs = new LocalCollection._IdMap; this.applyChange = { added: (id, fields) => { - const doc = EJSON.clone(fields); + // Take a shallow copy since the top-level properties can be changed + const doc = { ...fields }; if (callbacks.added) { - callbacks.added.call(this, id, fields); + callbacks.added.call(this, id, EJSON.clone(fields)); } doc._id = id; @@ -1330,7 +1332,12 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => { callbacks: observeChangesCallbacks }); - const handle = cursor.observeChanges(changeObserver.applyChange); + // CachingChangeObserver clones all received input on its callbacks + // So we can mark it as safe to reduce the ejson clones. + // This is tested by the `mongo-livedata - (extended) scribbling` tests + changeObserver.applyChange._fromObserve = true; + const handle = cursor.observeChanges(changeObserver.applyChange, + { nonMutatingCallbacks: true }); suppressed = false; diff --git a/packages/minimongo/package.js b/packages/minimongo/package.js index 047682d2a7..51b5fbc057 100644 --- a/packages/minimongo/package.js +++ b/packages/minimongo/package.js @@ -1,6 +1,6 @@ Package.describe({ summary: "Meteor's client-side datastore: a port of MongoDB to Javascript", - version: '1.4.5' + version: '1.5.0' }); Package.onUse(api => { diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index a610481ba6..30d6c82da7 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -372,7 +372,10 @@ Object.assign(Mongo.Collection, { removed: function (id) { sub.removed(collection, id); } - }); + }, + // Publications don't mutate the documents + // This is tested by the `livedata - publish callbacks clone` test + { nonMutatingCallbacks: true }); // We don't call sub.ready() here: it gets called in livedata_server, after // possibly calling _publishCursor on multiple returned cursors. diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 4be68220e6..40a36b42bf 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -918,7 +918,7 @@ Cursor.prototype.observe = function (callbacks) { return LocalCollection._observeFromObserveChanges(self, callbacks); }; -Cursor.prototype.observeChanges = function (callbacks) { +Cursor.prototype.observeChanges = function (callbacks, options = {}) { var self = this; var methods = [ 'addedAt', @@ -931,8 +931,8 @@ Cursor.prototype.observeChanges = function (callbacks) { ]; var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks); - // XXX: Can we find out if callbacks are from observe? - var exceptionName = ' observe/observeChanges callback'; + let exceptionName = callbacks._fromObserve ? 'observe' : 'observeChanges'; + exceptionName += ' callback'; methods.forEach(function (method) { if (callbacks[method] && typeof callbacks[method] == "function") { callbacks[method] = Meteor.bindEnvironment(callbacks[method], method + exceptionName); @@ -940,7 +940,7 @@ Cursor.prototype.observeChanges = function (callbacks) { }); return self._mongo._observeChanges( - self._cursorDescription, ordered, callbacks); + self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks); }; MongoConnection.prototype._createSynchronousCursor = function( @@ -1241,7 +1241,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo }; MongoConnection.prototype._observeChanges = function ( - cursorDescription, ordered, callbacks) { + cursorDescription, ordered, callbacks, nonMutatingCallbacks) { var self = this; if (cursorDescription.options.tailable) { @@ -1282,7 +1282,10 @@ MongoConnection.prototype._observeChanges = function ( } }); - var observeHandle = new ObserveHandle(multiplexer, callbacks); + var observeHandle = new ObserveHandle(multiplexer, + callbacks, + nonMutatingCallbacks, + ); if (firstHandle) { var matcher, sorter; diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 30ea3c10e1..8f77d1c822 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -560,6 +560,80 @@ Tinytest.addAsync("mongo-livedata - scribbling, " + idGeneration, function (test onComplete(); }); +if (Meteor.isServer) { + Tinytest.addAsync("mongo-livedata - extended scribbling, " + idGeneration, function (test, onComplete) { + function error() { + throw new Meteor.Error('unsafe object mutation'); + } + + const denyModifications = { + get(target, key) { + const type = Object.prototype.toString.call(target[key]); + if (type === '[object Object]' || type === '[object Array]') { + return freeze(target[key]); + } else { + return target[key]; + } + }, + set: error, + deleteProperty: error, + defineProperty: error, + }; + + // Object.freeze only throws in silent mode + // So we make our own version that always throws. + function freeze(obj) { + return new Proxy(obj, denyModifications); + } + + const origApplyCallback = ObserveMultiplexer.prototype._applyCallback; + ObserveMultiplexer.prototype._applyCallback = function(callback, args) { + // Make sure that if anything touches the original object, this will throw + return origApplyCallback.call(this, callback, freeze(args)); + } + + const run = test.runId(); + const coll = new Mongo.Collection(`livedata_test_scribble_collection_${run}`, collectionOptions); + const expectMutatable = (o) => { + try { + o.a[0].c = 3; + } catch (error) { + test.fail(); + } + } + const expectNotMutatable = (o) => { + try { + o.a[0].c = 3; + test.fail(); + } catch (error) {} + } + const handle = coll.find({run}).observe({ + addedAt: expectMutatable, + changedAt: function(id, o) { + expectMutatable(o); + } + }); + + const handle2 = coll.find({run}).observeChanges({ + added: expectNotMutatable, + changed: function(id, o) { + expectNotMutatable(o); + } + }, { nonMutatingCallbacks: true }); + + runInFence(function () { + coll.insert({run, a: [ {c: 1} ]}); + coll.update({run}, { $set: { 'a.0.c': 2 } }); + }); + + handle.stop(); + handle2.stop(); + + ObserveMultiplexer.prototype._applyCallback = origApplyCallback; + onComplete(); + }); +} + Tinytest.addAsync("mongo-livedata - stop handle in callback, " + idGeneration, function (test, onComplete) { var run = Random.id(); var coll; diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index aa766bc07f..6e8f9349f6 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -155,11 +155,7 @@ _.extend(ObserveMultiplexer.prototype, { return; // First, apply the change to the cache. - // XXX We could make applyChange callbacks promise not to hang on to any - // state from their arguments (assuming that their supplied callbacks - // don't) and skip this clone. Currently 'changed' hangs on to state - // though. - self._cache.applyChange[callbackName].apply(null, EJSON.clone(args)); + self._cache.applyChange[callbackName].apply(null, args); // If we haven't finished the initial adds, then we should only be getting // adds. @@ -179,7 +175,8 @@ _.extend(ObserveMultiplexer.prototype, { return; var callback = handle['_' + callbackName]; // clone arguments so that callbacks can mutate their arguments - callback && callback.apply(null, EJSON.clone(args)); + callback && callback.apply(null, + handle.nonMutatingCallbacks ? args : EJSON.clone(args)); }); }); }, @@ -199,8 +196,8 @@ _.extend(ObserveMultiplexer.prototype, { self._cache.docs.forEach(function (doc, id) { if (!_.has(self._handles, handle._id)) throw Error("handle got removed before sending initial adds!"); - var fields = EJSON.clone(doc); - delete fields._id; + const { _id, ...fields } = handle.nonMutatingCallbacks ? doc + : EJSON.clone(doc); if (self._ordered) add(id, fields, null); // we're going in order, so add at end else @@ -211,7 +208,9 @@ _.extend(ObserveMultiplexer.prototype, { var nextObserveHandleId = 1; -ObserveHandle = function (multiplexer, callbacks) { + +// When the callbacks do not mutate the arguments, we can skip a lot of data clones +ObserveHandle = function (multiplexer, callbacks, nonMutatingCallbacks = false) { var self = this; // The end user is only supposed to call stop(). The other fields are // accessible to the multiplexer, though. @@ -231,6 +230,7 @@ ObserveHandle = function (multiplexer, callbacks) { }); self._stopped = false; self._id = nextObserveHandleId++; + self.nonMutatingCallbacks = nonMutatingCallbacks; }; ObserveHandle.prototype.stop = function () { var self = this; diff --git a/packages/mongo/package.js b/packages/mongo/package.js index 9df74a8ba4..e0b6852bba 100644 --- a/packages/mongo/package.js +++ b/packages/mongo/package.js @@ -71,6 +71,7 @@ Package.onUse(function (api) { api.export('MongoInternals', 'server'); api.export("Mongo"); + api.export('ObserveMultiplexer', 'server', {testOnly: true}); api.addFiles(['mongo_driver.js', 'oplog_tailing.js', 'observe_multiplex.js', 'doc_fetcher.js',