Merge pull request #10864 from meteor/mongo-performance-improvements

Reduce EJSON.clone's when observing query changes
This commit is contained in:
Filipe Névola
2020-02-25 12:10:20 -04:00
committed by GitHub
9 changed files with 117 additions and 24 deletions

View File

@@ -81,6 +81,11 @@ N/A
* Updated V8 to [release v7.8](https://v8.dev/blog/v8-release-78) which includes improvements in performance, for example, object destructuring now is as fast as the equivalent variable assignment.
* [12.15.0](https://nodejs.org/en/blog/release/v12.15.0/)
* `cursor.observeChanges` now accepts a second options argument.
If your observer functions do not mutate the passed arguments, you can specify
`{ nonMutatingCallbacks: true }`, which improves performance by reducing
the amount of data copies.
## v1.9, 2020-01-09
### Breaking changes

View File

@@ -895,7 +895,7 @@ export class AccountsServer extends AccountsCommon {
// The onClose callback for the connection takes care of
// cleaning up the observe handle and any other state we have
// lying around.
});
}, { nonMutatingCallbacks: true });
// If the user ran another login or logout command we were waiting for the
// defer or added to fire (ie, another call to _setLoginToken occurred),

View File

@@ -669,17 +669,18 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver {
this.docs = new OrderedDict(MongoID.idStringify);
this.applyChange = {
addedBefore: (id, fields, before) => {
const doc = EJSON.clone(fields);
// Take a shallow copy since the top-level properties can be changed
const doc = { ...fields };
doc._id = id;
if (callbacks.addedBefore) {
callbacks.addedBefore.call(this, id, fields, before);
callbacks.addedBefore.call(this, id, EJSON.clone(fields), before);
}
// This line triggers if we provide added with movedBefore.
if (callbacks.added) {
callbacks.added.call(this, id, fields);
callbacks.added.call(this, id, EJSON.clone(fields));
}
// XXX could `before` be a falsy ID? Technically
@@ -701,10 +702,11 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver {
this.docs = new LocalCollection._IdMap;
this.applyChange = {
added: (id, fields) => {
const doc = EJSON.clone(fields);
// Take a shallow copy since the top-level properties can be changed
const doc = { ...fields };
if (callbacks.added) {
callbacks.added.call(this, id, fields);
callbacks.added.call(this, id, EJSON.clone(fields));
}
doc._id = id;
@@ -1330,7 +1332,12 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => {
callbacks: observeChangesCallbacks
});
const handle = cursor.observeChanges(changeObserver.applyChange);
// CachingChangeObserver clones all received input on its callbacks
// So we can mark it as safe to reduce the ejson clones.
// This is tested by the `mongo-livedata - (extended) scribbling` tests
changeObserver.applyChange._fromObserve = true;
const handle = cursor.observeChanges(changeObserver.applyChange,
{ nonMutatingCallbacks: true });
suppressed = false;

View File

@@ -1,6 +1,6 @@
Package.describe({
summary: "Meteor's client-side datastore: a port of MongoDB to Javascript",
version: '1.4.5'
version: '1.5.0'
});
Package.onUse(api => {

View File

@@ -372,7 +372,10 @@ Object.assign(Mongo.Collection, {
removed: function (id) {
sub.removed(collection, id);
}
});
},
// Publications don't mutate the documents
// This is tested by the `livedata - publish callbacks clone` test
{ nonMutatingCallbacks: true });
// We don't call sub.ready() here: it gets called in livedata_server, after
// possibly calling _publishCursor on multiple returned cursors.

View File

@@ -918,7 +918,7 @@ Cursor.prototype.observe = function (callbacks) {
return LocalCollection._observeFromObserveChanges(self, callbacks);
};
Cursor.prototype.observeChanges = function (callbacks) {
Cursor.prototype.observeChanges = function (callbacks, options = {}) {
var self = this;
var methods = [
'addedAt',
@@ -931,8 +931,8 @@ Cursor.prototype.observeChanges = function (callbacks) {
];
var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks);
// XXX: Can we find out if callbacks are from observe?
var exceptionName = ' observe/observeChanges callback';
let exceptionName = callbacks._fromObserve ? 'observe' : 'observeChanges';
exceptionName += ' callback';
methods.forEach(function (method) {
if (callbacks[method] && typeof callbacks[method] == "function") {
callbacks[method] = Meteor.bindEnvironment(callbacks[method], method + exceptionName);
@@ -940,7 +940,7 @@ Cursor.prototype.observeChanges = function (callbacks) {
});
return self._mongo._observeChanges(
self._cursorDescription, ordered, callbacks);
self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks);
};
MongoConnection.prototype._createSynchronousCursor = function(
@@ -1241,7 +1241,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo
};
MongoConnection.prototype._observeChanges = function (
cursorDescription, ordered, callbacks) {
cursorDescription, ordered, callbacks, nonMutatingCallbacks) {
var self = this;
if (cursorDescription.options.tailable) {
@@ -1282,7 +1282,10 @@ MongoConnection.prototype._observeChanges = function (
}
});
var observeHandle = new ObserveHandle(multiplexer, callbacks);
var observeHandle = new ObserveHandle(multiplexer,
callbacks,
nonMutatingCallbacks,
);
if (firstHandle) {
var matcher, sorter;

View File

@@ -560,6 +560,80 @@ Tinytest.addAsync("mongo-livedata - scribbling, " + idGeneration, function (test
onComplete();
});
if (Meteor.isServer) {
Tinytest.addAsync("mongo-livedata - extended scribbling, " + idGeneration, function (test, onComplete) {
function error() {
throw new Meteor.Error('unsafe object mutation');
}
const denyModifications = {
get(target, key) {
const type = Object.prototype.toString.call(target[key]);
if (type === '[object Object]' || type === '[object Array]') {
return freeze(target[key]);
} else {
return target[key];
}
},
set: error,
deleteProperty: error,
defineProperty: error,
};
// Object.freeze only throws in silent mode
// So we make our own version that always throws.
function freeze(obj) {
return new Proxy(obj, denyModifications);
}
const origApplyCallback = ObserveMultiplexer.prototype._applyCallback;
ObserveMultiplexer.prototype._applyCallback = function(callback, args) {
// Make sure that if anything touches the original object, this will throw
return origApplyCallback.call(this, callback, freeze(args));
}
const run = test.runId();
const coll = new Mongo.Collection(`livedata_test_scribble_collection_${run}`, collectionOptions);
const expectMutatable = (o) => {
try {
o.a[0].c = 3;
} catch (error) {
test.fail();
}
}
const expectNotMutatable = (o) => {
try {
o.a[0].c = 3;
test.fail();
} catch (error) {}
}
const handle = coll.find({run}).observe({
addedAt: expectMutatable,
changedAt: function(id, o) {
expectMutatable(o);
}
});
const handle2 = coll.find({run}).observeChanges({
added: expectNotMutatable,
changed: function(id, o) {
expectNotMutatable(o);
}
}, { nonMutatingCallbacks: true });
runInFence(function () {
coll.insert({run, a: [ {c: 1} ]});
coll.update({run}, { $set: { 'a.0.c': 2 } });
});
handle.stop();
handle2.stop();
ObserveMultiplexer.prototype._applyCallback = origApplyCallback;
onComplete();
});
}
Tinytest.addAsync("mongo-livedata - stop handle in callback, " + idGeneration, function (test, onComplete) {
var run = Random.id();
var coll;

View File

@@ -155,11 +155,7 @@ _.extend(ObserveMultiplexer.prototype, {
return;
// First, apply the change to the cache.
// XXX We could make applyChange callbacks promise not to hang on to any
// state from their arguments (assuming that their supplied callbacks
// don't) and skip this clone. Currently 'changed' hangs on to state
// though.
self._cache.applyChange[callbackName].apply(null, EJSON.clone(args));
self._cache.applyChange[callbackName].apply(null, args);
// If we haven't finished the initial adds, then we should only be getting
// adds.
@@ -179,7 +175,8 @@ _.extend(ObserveMultiplexer.prototype, {
return;
var callback = handle['_' + callbackName];
// clone arguments so that callbacks can mutate their arguments
callback && callback.apply(null, EJSON.clone(args));
callback && callback.apply(null,
handle.nonMutatingCallbacks ? args : EJSON.clone(args));
});
});
},
@@ -199,8 +196,8 @@ _.extend(ObserveMultiplexer.prototype, {
self._cache.docs.forEach(function (doc, id) {
if (!_.has(self._handles, handle._id))
throw Error("handle got removed before sending initial adds!");
var fields = EJSON.clone(doc);
delete fields._id;
const { _id, ...fields } = handle.nonMutatingCallbacks ? doc
: EJSON.clone(doc);
if (self._ordered)
add(id, fields, null); // we're going in order, so add at end
else
@@ -211,7 +208,9 @@ _.extend(ObserveMultiplexer.prototype, {
var nextObserveHandleId = 1;
ObserveHandle = function (multiplexer, callbacks) {
// When the callbacks do not mutate the arguments, we can skip a lot of data clones
ObserveHandle = function (multiplexer, callbacks, nonMutatingCallbacks = false) {
var self = this;
// The end user is only supposed to call stop(). The other fields are
// accessible to the multiplexer, though.
@@ -231,6 +230,7 @@ ObserveHandle = function (multiplexer, callbacks) {
});
self._stopped = false;
self._id = nextObserveHandleId++;
self.nonMutatingCallbacks = nonMutatingCallbacks;
};
ObserveHandle.prototype.stop = function () {
var self = this;

View File

@@ -71,6 +71,7 @@ Package.onUse(function (api) {
api.export('MongoInternals', 'server');
api.export("Mongo");
api.export('ObserveMultiplexer', 'server', {testOnly: true});
api.addFiles(['mongo_driver.js', 'oplog_tailing.js',
'observe_multiplex.js', 'doc_fetcher.js',