Reduce EJSON.clone's when observing query changes

This commit is contained in:
seke
2020-01-15 00:55:43 +01:00
parent 6b4d429e5d
commit 9eaa77b2cf
6 changed files with 37 additions and 23 deletions

View File

@@ -49,6 +49,11 @@ N/A
* The `typescript` npm package has been updated to version 3.7.4.
* `cursor.observeChanges` now accepts a second options argument.
If your observer functions do not mutate the passed arguments, you can specify
`{ nonMutatingCallbacks: true }`, which improves performance by reducing
the amount of data copies.
## v1.9, 2020-01-09
### Breaking changes

View File

@@ -895,7 +895,7 @@ export class AccountsServer extends AccountsCommon {
// The onClose callback for the connection takes care of
// cleaning up the observe handle and any other state we have
// lying around.
});
}, { nonMutatingCallbacks: true });
// If the user ran another login or logout command we were waiting for the
// defer or added to fire (ie, another call to _setLoginToken occurred),

View File

@@ -669,17 +669,18 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver {
this.docs = new OrderedDict(MongoID.idStringify);
this.applyChange = {
addedBefore: (id, fields, before) => {
const doc = EJSON.clone(fields);
// Take a shallow copy since the top-level properties can be changed
const doc = { ...fields };
doc._id = id;
if (callbacks.addedBefore) {
callbacks.addedBefore.call(this, id, fields, before);
callbacks.addedBefore.call(this, id, EJSON.clone(fields), before);
}
// This line triggers if we provide added with movedBefore.
if (callbacks.added) {
callbacks.added.call(this, id, fields);
callbacks.added.call(this, id, EJSON.clone(fields));
}
// XXX could `before` be a falsy ID? Technically
@@ -701,10 +702,11 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver {
this.docs = new LocalCollection._IdMap;
this.applyChange = {
added: (id, fields) => {
const doc = EJSON.clone(fields);
// Take a shallow copy since the top-level properties can be changed
const doc = { ...fields };
if (callbacks.added) {
callbacks.added.call(this, id, fields);
callbacks.added.call(this, id, EJSON.clone(fields));
}
doc._id = id;
@@ -1330,7 +1332,11 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => {
callbacks: observeChangesCallbacks
});
const handle = cursor.observeChanges(changeObserver.applyChange);
// CachingChangeObserver clones all received input on its callbacks
// So we can mark it as safe to reduce the ejson clones.
changeObserver.applyChange._fromObserve = true;
const handle = cursor.observeChanges(changeObserver.applyChange,
{ nonMutatingCallbacks: true });
suppressed = false;

View File

@@ -372,7 +372,7 @@ Object.assign(Mongo.Collection, {
removed: function (id) {
sub.removed(collection, id);
}
});
}, { nonMutatingCallbacks: true });
// We don't call sub.ready() here: it gets called in livedata_server, after
// possibly calling _publishCursor on multiple returned cursors.

View File

@@ -918,7 +918,7 @@ Cursor.prototype.observe = function (callbacks) {
return LocalCollection._observeFromObserveChanges(self, callbacks);
};
Cursor.prototype.observeChanges = function (callbacks) {
Cursor.prototype.observeChanges = function (callbacks, options = {}) {
var self = this;
var methods = [
'addedAt',
@@ -931,8 +931,8 @@ Cursor.prototype.observeChanges = function (callbacks) {
];
var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks);
// XXX: Can we find out if callbacks are from observe?
var exceptionName = ' observe/observeChanges callback';
let exceptionName = callbacks._fromObserve ? 'observe' : 'observeChanges';
exceptionName += ' callback';
methods.forEach(function (method) {
if (callbacks[method] && typeof callbacks[method] == "function") {
callbacks[method] = Meteor.bindEnvironment(callbacks[method], method + exceptionName);
@@ -940,7 +940,7 @@ Cursor.prototype.observeChanges = function (callbacks) {
});
return self._mongo._observeChanges(
self._cursorDescription, ordered, callbacks);
self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks);
};
MongoConnection.prototype._createSynchronousCursor = function(
@@ -1241,7 +1241,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo
};
MongoConnection.prototype._observeChanges = function (
cursorDescription, ordered, callbacks) {
cursorDescription, ordered, callbacks, nonMutatingCallbacks) {
var self = this;
if (cursorDescription.options.tailable) {
@@ -1282,7 +1282,10 @@ MongoConnection.prototype._observeChanges = function (
}
});
var observeHandle = new ObserveHandle(multiplexer, callbacks);
var observeHandle = new ObserveHandle(multiplexer,
callbacks,
nonMutatingCallbacks,
);
if (firstHandle) {
var matcher, sorter;

View File

@@ -155,11 +155,7 @@ _.extend(ObserveMultiplexer.prototype, {
return;
// First, apply the change to the cache.
// XXX We could make applyChange callbacks promise not to hang on to any
// state from their arguments (assuming that their supplied callbacks
// don't) and skip this clone. Currently 'changed' hangs on to state
// though.
self._cache.applyChange[callbackName].apply(null, EJSON.clone(args));
self._cache.applyChange[callbackName].apply(null, args);
// If we haven't finished the initial adds, then we should only be getting
// adds.
@@ -179,7 +175,8 @@ _.extend(ObserveMultiplexer.prototype, {
return;
var callback = handle['_' + callbackName];
// clone arguments so that callbacks can mutate their arguments
callback && callback.apply(null, EJSON.clone(args));
callback && callback.apply(null,
handle.nonMutatingCallbacks ? args : EJSON.clone(args));
});
});
},
@@ -199,8 +196,8 @@ _.extend(ObserveMultiplexer.prototype, {
self._cache.docs.forEach(function (doc, id) {
if (!_.has(self._handles, handle._id))
throw Error("handle got removed before sending initial adds!");
var fields = EJSON.clone(doc);
delete fields._id;
const { _id, ...fields } = handle.nonMutatingCallbacks ? doc
: EJSON.clone(doc);
if (self._ordered)
add(id, fields, null); // we're going in order, so add at end
else
@@ -211,7 +208,9 @@ _.extend(ObserveMultiplexer.prototype, {
var nextObserveHandleId = 1;
ObserveHandle = function (multiplexer, callbacks) {
// When the callbacks do not mutate the arguments, we can skip a lot of data clones
ObserveHandle = function (multiplexer, callbacks, nonMutatingCallbacks = false) {
var self = this;
// The end user is only supposed to call stop(). The other fields are
// accessible to the multiplexer, though.
@@ -231,6 +230,7 @@ ObserveHandle = function (multiplexer, callbacks) {
});
self._stopped = false;
self._id = nextObserveHandleId++;
self.nonMutatingCallbacks = nonMutatingCallbacks;
};
ObserveHandle.prototype.stop = function () {
var self = this;