From 9eaa77b2cf3941bd3b8cedfd7434bc3f5a2d3c77 Mon Sep 17 00:00:00 2001 From: seke Date: Wed, 15 Jan 2020 00:55:43 +0100 Subject: [PATCH 1/5] Reduce EJSON.clone's when observing query changes --- History.md | 5 +++++ packages/accounts-base/accounts_server.js | 2 +- packages/minimongo/local_collection.js | 18 ++++++++++++------ packages/mongo/collection.js | 2 +- packages/mongo/mongo_driver.js | 15 +++++++++------ packages/mongo/observe_multiplex.js | 18 +++++++++--------- 6 files changed, 37 insertions(+), 23 deletions(-) diff --git a/History.md b/History.md index 583d249bc0..ae4939cbba 100644 --- a/History.md +++ b/History.md @@ -49,6 +49,11 @@ N/A * The `typescript` npm package has been updated to version 3.7.4. +* `cursor.observeChanges` now accepts a second options argument. + If your observer functions do not mutate the passed arguments, you can specify + `{ nonMutatingCallbacks: true }`, which improves performance by reducing + the amount of data copies. + ## v1.9, 2020-01-09 ### Breaking changes diff --git a/packages/accounts-base/accounts_server.js b/packages/accounts-base/accounts_server.js index abce89da1d..619d76eafe 100644 --- a/packages/accounts-base/accounts_server.js +++ b/packages/accounts-base/accounts_server.js @@ -895,7 +895,7 @@ export class AccountsServer extends AccountsCommon { // The onClose callback for the connection takes care of // cleaning up the observe handle and any other state we have // lying around. - }); + }, { nonMutatingCallbacks: true }); // If the user ran another login or logout command we were waiting for the // defer or added to fire (ie, another call to _setLoginToken occurred), diff --git a/packages/minimongo/local_collection.js b/packages/minimongo/local_collection.js index 5ea83843e0..d6264ac0f8 100644 --- a/packages/minimongo/local_collection.js +++ b/packages/minimongo/local_collection.js @@ -669,17 +669,18 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver { this.docs = new OrderedDict(MongoID.idStringify); this.applyChange = { addedBefore: (id, fields, before) => { - const doc = EJSON.clone(fields); + // Take a shallow copy since the top-level properties can be changed + const doc = { ...fields }; doc._id = id; if (callbacks.addedBefore) { - callbacks.addedBefore.call(this, id, fields, before); + callbacks.addedBefore.call(this, id, EJSON.clone(fields), before); } // This line triggers if we provide added with movedBefore. if (callbacks.added) { - callbacks.added.call(this, id, fields); + callbacks.added.call(this, id, EJSON.clone(fields)); } // XXX could `before` be a falsy ID? Technically @@ -701,10 +702,11 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver { this.docs = new LocalCollection._IdMap; this.applyChange = { added: (id, fields) => { - const doc = EJSON.clone(fields); + // Take a shallow copy since the top-level properties can be changed + const doc = { ...fields }; if (callbacks.added) { - callbacks.added.call(this, id, fields); + callbacks.added.call(this, id, EJSON.clone(fields)); } doc._id = id; @@ -1330,7 +1332,11 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => { callbacks: observeChangesCallbacks }); - const handle = cursor.observeChanges(changeObserver.applyChange); + // CachingChangeObserver clones all received input on its callbacks + // So we can mark it as safe to reduce the ejson clones. + changeObserver.applyChange._fromObserve = true; + const handle = cursor.observeChanges(changeObserver.applyChange, + { nonMutatingCallbacks: true }); suppressed = false; diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index e1f497c861..5df2f92459 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -372,7 +372,7 @@ Object.assign(Mongo.Collection, { removed: function (id) { sub.removed(collection, id); } - }); + }, { nonMutatingCallbacks: true }); // We don't call sub.ready() here: it gets called in livedata_server, after // possibly calling _publishCursor on multiple returned cursors. diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 4be68220e6..40a36b42bf 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -918,7 +918,7 @@ Cursor.prototype.observe = function (callbacks) { return LocalCollection._observeFromObserveChanges(self, callbacks); }; -Cursor.prototype.observeChanges = function (callbacks) { +Cursor.prototype.observeChanges = function (callbacks, options = {}) { var self = this; var methods = [ 'addedAt', @@ -931,8 +931,8 @@ Cursor.prototype.observeChanges = function (callbacks) { ]; var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks); - // XXX: Can we find out if callbacks are from observe? - var exceptionName = ' observe/observeChanges callback'; + let exceptionName = callbacks._fromObserve ? 'observe' : 'observeChanges'; + exceptionName += ' callback'; methods.forEach(function (method) { if (callbacks[method] && typeof callbacks[method] == "function") { callbacks[method] = Meteor.bindEnvironment(callbacks[method], method + exceptionName); @@ -940,7 +940,7 @@ Cursor.prototype.observeChanges = function (callbacks) { }); return self._mongo._observeChanges( - self._cursorDescription, ordered, callbacks); + self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks); }; MongoConnection.prototype._createSynchronousCursor = function( @@ -1241,7 +1241,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo }; MongoConnection.prototype._observeChanges = function ( - cursorDescription, ordered, callbacks) { + cursorDescription, ordered, callbacks, nonMutatingCallbacks) { var self = this; if (cursorDescription.options.tailable) { @@ -1282,7 +1282,10 @@ MongoConnection.prototype._observeChanges = function ( } }); - var observeHandle = new ObserveHandle(multiplexer, callbacks); + var observeHandle = new ObserveHandle(multiplexer, + callbacks, + nonMutatingCallbacks, + ); if (firstHandle) { var matcher, sorter; diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index aa766bc07f..6e8f9349f6 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -155,11 +155,7 @@ _.extend(ObserveMultiplexer.prototype, { return; // First, apply the change to the cache. - // XXX We could make applyChange callbacks promise not to hang on to any - // state from their arguments (assuming that their supplied callbacks - // don't) and skip this clone. Currently 'changed' hangs on to state - // though. - self._cache.applyChange[callbackName].apply(null, EJSON.clone(args)); + self._cache.applyChange[callbackName].apply(null, args); // If we haven't finished the initial adds, then we should only be getting // adds. @@ -179,7 +175,8 @@ _.extend(ObserveMultiplexer.prototype, { return; var callback = handle['_' + callbackName]; // clone arguments so that callbacks can mutate their arguments - callback && callback.apply(null, EJSON.clone(args)); + callback && callback.apply(null, + handle.nonMutatingCallbacks ? args : EJSON.clone(args)); }); }); }, @@ -199,8 +196,8 @@ _.extend(ObserveMultiplexer.prototype, { self._cache.docs.forEach(function (doc, id) { if (!_.has(self._handles, handle._id)) throw Error("handle got removed before sending initial adds!"); - var fields = EJSON.clone(doc); - delete fields._id; + const { _id, ...fields } = handle.nonMutatingCallbacks ? doc + : EJSON.clone(doc); if (self._ordered) add(id, fields, null); // we're going in order, so add at end else @@ -211,7 +208,9 @@ _.extend(ObserveMultiplexer.prototype, { var nextObserveHandleId = 1; -ObserveHandle = function (multiplexer, callbacks) { + +// When the callbacks do not mutate the arguments, we can skip a lot of data clones +ObserveHandle = function (multiplexer, callbacks, nonMutatingCallbacks = false) { var self = this; // The end user is only supposed to call stop(). The other fields are // accessible to the multiplexer, though. @@ -231,6 +230,7 @@ ObserveHandle = function (multiplexer, callbacks) { }); self._stopped = false; self._id = nextObserveHandleId++; + self.nonMutatingCallbacks = nonMutatingCallbacks; }; ObserveHandle.prototype.stop = function () { var self = this; From 20f197cec5f78cc5672ec4d1996c09cac3bb2808 Mon Sep 17 00:00:00 2001 From: seke Date: Thu, 16 Jan 2020 00:03:09 +0100 Subject: [PATCH 2/5] Add more extended scribble test --- packages/mongo/mongo_livedata_tests.js | 74 ++++++++++++++++++++++++++ packages/mongo/package.js | 15 +++--- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 30ea3c10e1..8f77d1c822 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -560,6 +560,80 @@ Tinytest.addAsync("mongo-livedata - scribbling, " + idGeneration, function (test onComplete(); }); +if (Meteor.isServer) { + Tinytest.addAsync("mongo-livedata - extended scribbling, " + idGeneration, function (test, onComplete) { + function error() { + throw new Meteor.Error('unsafe object mutation'); + } + + const denyModifications = { + get(target, key) { + const type = Object.prototype.toString.call(target[key]); + if (type === '[object Object]' || type === '[object Array]') { + return freeze(target[key]); + } else { + return target[key]; + } + }, + set: error, + deleteProperty: error, + defineProperty: error, + }; + + // Object.freeze only throws in silent mode + // So we make our own version that always throws. + function freeze(obj) { + return new Proxy(obj, denyModifications); + } + + const origApplyCallback = ObserveMultiplexer.prototype._applyCallback; + ObserveMultiplexer.prototype._applyCallback = function(callback, args) { + // Make sure that if anything touches the original object, this will throw + return origApplyCallback.call(this, callback, freeze(args)); + } + + const run = test.runId(); + const coll = new Mongo.Collection(`livedata_test_scribble_collection_${run}`, collectionOptions); + const expectMutatable = (o) => { + try { + o.a[0].c = 3; + } catch (error) { + test.fail(); + } + } + const expectNotMutatable = (o) => { + try { + o.a[0].c = 3; + test.fail(); + } catch (error) {} + } + const handle = coll.find({run}).observe({ + addedAt: expectMutatable, + changedAt: function(id, o) { + expectMutatable(o); + } + }); + + const handle2 = coll.find({run}).observeChanges({ + added: expectNotMutatable, + changed: function(id, o) { + expectNotMutatable(o); + } + }, { nonMutatingCallbacks: true }); + + runInFence(function () { + coll.insert({run, a: [ {c: 1} ]}); + coll.update({run}, { $set: { 'a.0.c': 2 } }); + }); + + handle.stop(); + handle2.stop(); + + ObserveMultiplexer.prototype._applyCallback = origApplyCallback; + onComplete(); + }); +} + Tinytest.addAsync("mongo-livedata - stop handle in callback, " + idGeneration, function (test, onComplete) { var run = Random.id(); var coll; diff --git a/packages/mongo/package.js b/packages/mongo/package.js index c05b92ced7..086695d61b 100644 --- a/packages/mongo/package.js +++ b/packages/mongo/package.js @@ -71,6 +71,7 @@ Package.onUse(function (api) { api.export('MongoInternals', 'server'); api.export("Mongo"); + api.export('ObserveMultiplexer', 'server', {testOnly: true}); api.addFiles(['mongo_driver.js', 'oplog_tailing.js', 'observe_multiplex.js', 'doc_fetcher.js', @@ -90,11 +91,11 @@ Package.onTest(function (api) { 'ddp', 'base64']); // XXX test order dependency: the allow_tests "partial allow" test // fails if it is run before mongo_livedata_tests. - api.addFiles('mongo_livedata_tests.js', ['client', 'server']); - api.addFiles('upsert_compatibility_test.js', 'server'); - api.addFiles('allow_tests.js', ['client', 'server']); - api.addFiles('collection_tests.js', ['client', 'server']); - api.addFiles('observe_changes_tests.js', ['client', 'server']); - api.addFiles('oplog_tests.js', 'server'); - api.addFiles('doc_fetcher_tests.js', 'server'); + api.addFiles('mongo_livedata_tests.js', ['server']); + // api.addFiles('upsert_compatibility_test.js', 'server'); + // api.addFiles('allow_tests.js', ['client', 'server']); + // api.addFiles('collection_tests.js', ['client', 'server']); + // api.addFiles('observe_changes_tests.js', ['client', 'server']); + // api.addFiles('oplog_tests.js', 'server'); + // api.addFiles('doc_fetcher_tests.js', 'server'); }); From a0113ba868bea75ac7ad5e587f7d350e5122a654 Mon Sep 17 00:00:00 2001 From: seke Date: Thu, 16 Jan 2020 00:11:28 +0100 Subject: [PATCH 3/5] Add more comments about data mutation assumptions --- packages/minimongo/local_collection.js | 1 + packages/mongo/collection.js | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/minimongo/local_collection.js b/packages/minimongo/local_collection.js index d6264ac0f8..e767137089 100644 --- a/packages/minimongo/local_collection.js +++ b/packages/minimongo/local_collection.js @@ -1334,6 +1334,7 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => { // CachingChangeObserver clones all received input on its callbacks // So we can mark it as safe to reduce the ejson clones. + // This is tested by the `mongo-livedata - (extended) scribbling` tests changeObserver.applyChange._fromObserve = true; const handle = cursor.observeChanges(changeObserver.applyChange, { nonMutatingCallbacks: true }); diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index 5df2f92459..b383d00d6c 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -372,7 +372,10 @@ Object.assign(Mongo.Collection, { removed: function (id) { sub.removed(collection, id); } - }, { nonMutatingCallbacks: true }); + }, + // Publications don't mutate the documents + // This is tested by the `livedata - publish callbacks clone` test + { nonMutatingCallbacks: true }); // We don't call sub.ready() here: it gets called in livedata_server, after // possibly calling _publishCursor on multiple returned cursors. From c55875c66cb25195e88dc882e148b24ea0d3142b Mon Sep 17 00:00:00 2001 From: seke Date: Thu, 16 Jan 2020 00:14:22 +0100 Subject: [PATCH 4/5] Revert dev changes --- packages/mongo/package.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/mongo/package.js b/packages/mongo/package.js index 086695d61b..cdabd217ed 100644 --- a/packages/mongo/package.js +++ b/packages/mongo/package.js @@ -91,11 +91,11 @@ Package.onTest(function (api) { 'ddp', 'base64']); // XXX test order dependency: the allow_tests "partial allow" test // fails if it is run before mongo_livedata_tests. - api.addFiles('mongo_livedata_tests.js', ['server']); - // api.addFiles('upsert_compatibility_test.js', 'server'); - // api.addFiles('allow_tests.js', ['client', 'server']); - // api.addFiles('collection_tests.js', ['client', 'server']); - // api.addFiles('observe_changes_tests.js', ['client', 'server']); - // api.addFiles('oplog_tests.js', 'server'); - // api.addFiles('doc_fetcher_tests.js', 'server'); + api.addFiles('mongo_livedata_tests.js', ['client', 'server']); + api.addFiles('upsert_compatibility_test.js', 'server'); + api.addFiles('allow_tests.js', ['client', 'server']); + api.addFiles('collection_tests.js', ['client', 'server']); + api.addFiles('observe_changes_tests.js', ['client', 'server']); + api.addFiles('oplog_tests.js', 'server'); + api.addFiles('doc_fetcher_tests.js', 'server'); }); From bf7b23275c11f822e80a55e14e552dd55563e685 Mon Sep 17 00:00:00 2001 From: seke Date: Tue, 4 Feb 2020 17:23:45 +0100 Subject: [PATCH 5/5] Bump minor package versions for livequery changes --- packages/accounts-base/package.js | 2 +- packages/minimongo/package.js | 2 +- packages/mongo/package.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/accounts-base/package.js b/packages/accounts-base/package.js index 0ea2c05a6c..d0cae1538f 100644 --- a/packages/accounts-base/package.js +++ b/packages/accounts-base/package.js @@ -1,6 +1,6 @@ Package.describe({ summary: "A user account system", - version: "1.5.0", + version: "1.6.0", }); Package.onUse(api => { diff --git a/packages/minimongo/package.js b/packages/minimongo/package.js index 047682d2a7..51b5fbc057 100644 --- a/packages/minimongo/package.js +++ b/packages/minimongo/package.js @@ -1,6 +1,6 @@ Package.describe({ summary: "Meteor's client-side datastore: a port of MongoDB to Javascript", - version: '1.4.5' + version: '1.5.0' }); Package.onUse(api => { diff --git a/packages/mongo/package.js b/packages/mongo/package.js index cdabd217ed..fd992c6fe8 100644 --- a/packages/mongo/package.js +++ b/packages/mongo/package.js @@ -9,7 +9,7 @@ Package.describe({ summary: "Adaptor for using MongoDB and Minimongo over DDP", - version: '1.8.1' + version: '1.9.0' }); Npm.depends({