Merge pull request #13445 from meteor/leonardo/refactor-and-perf-phase-2

feat: performance improvements
This commit is contained in:
Leonardo Venturini
2024-12-04 17:01:36 -04:00
committed by GitHub
35 changed files with 1876 additions and 1354 deletions

16
.envrc
View File

@@ -56,6 +56,22 @@ function @docs-migration-start {
npm run docs:dev --prefix "$ROOT_DIR/v3-docs/v3-migration-docs"
}
function @get-changes {
git diff --numstat HEAD~1 HEAD | awk '($1 + $2) <= 5000 {print $3}'
}
function @summarize-changes {
changes=$(@get-changes)
if [ -n "$changes" ]; then
changes=$(git diff HEAD~1 HEAD -- $(echo "$changes" | tr '\n' ' '))
else
changes=$(git diff HEAD~1 HEAD)
fi
echo "$changes" | llm -s "Summarize the following changes in a few sentences:"
}
function @packages-bumped {
git diff --name-only devel...$(git branch --show-current) | grep "packages/.*/package.js$" | while IFS= read -r file; do
if ! git show devel:$file > /dev/null 2>&1; then

View File

@@ -1,6 +1,5 @@
name: Check legacy syntax
on:
- push
- pull_request
jobs:
check-code-style:

17
package-lock.json generated
View File

@@ -15,6 +15,8 @@
"@babel/preset-react": "^7.18.6",
"@types/lodash.isempty": "^4.4.9",
"@types/node": "^18.16.18",
"@types/sockjs": "^0.3.36",
"@types/sockjs-client": "^1.5.4",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
"eslint": "^8.36.0",
@@ -1127,6 +1129,21 @@
"integrity": "sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==",
"dev": true
},
"node_modules/@types/sockjs": {
"version": "0.3.36",
"resolved": "https://registry.npmjs.org/@types/sockjs/-/sockjs-0.3.36.tgz",
"integrity": "sha512-MK9V6NzAS1+Ud7JV9lJLFqW85VbC9dq3LmwZCuBe4wBDgKC0Kj/jd8Xl+nSviU+Qc3+m7umHHyHg//2KSa0a0Q==",
"dev": true,
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/sockjs-client": {
"version": "1.5.4",
"resolved": "https://registry.npmjs.org/@types/sockjs-client/-/sockjs-client-1.5.4.tgz",
"integrity": "sha512-zk+uFZeWyvJ5ZFkLIwoGA/DfJ+pYzcZ8eH4H/EILCm2OBZyHH6Hkdna1/UWL/CFruh5wj6ES7g75SvUB0VsH5w==",
"dev": true
},
"node_modules/@typescript-eslint/eslint-plugin": {
"version": "5.62.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.62.0.tgz",

View File

@@ -19,6 +19,8 @@
"@babel/preset-react": "^7.18.6",
"@types/lodash.isempty": "^4.4.9",
"@types/node": "^18.16.18",
"@types/sockjs": "^0.3.36",
"@types/sockjs-client": "^1.5.4",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
"eslint": "^8.36.0",

View File

@@ -1,6 +1,6 @@
import { DDP } from '../common/namespace.js';
import { Meteor } from 'meteor/meteor';
import { loadAsyncStubHelpers } from "./queueStubsHelpers";
import { loadAsyncStubHelpers } from "./queue_stub_helpers";
// Meteor.refresh can be called on the client (if you're in common code) but it
// only has an effect on the server.

View File

@@ -0,0 +1,202 @@
import { DDPCommon } from 'meteor/ddp-common';
import { Meteor } from 'meteor/meteor';
export class ConnectionStreamHandlers {
constructor(connection) {
this._connection = connection;
}
/**
* Handles incoming raw messages from the DDP stream
* @param {String} raw_msg The raw message received from the stream
*/
async onMessage(raw_msg) {
let msg;
try {
msg = DDPCommon.parseDDP(raw_msg);
} catch (e) {
Meteor._debug('Exception while parsing DDP', e);
return;
}
// Any message counts as receiving a pong, as it demonstrates that
// the server is still alive.
if (this._connection._heartbeat) {
this._connection._heartbeat.messageReceived();
}
if (msg === null || !msg.msg) {
if(!msg || !msg.testMessageOnConnect) {
if (Object.keys(msg).length === 1 && msg.server_id) return;
Meteor._debug('discarding invalid livedata message', msg);
}
return;
}
// Important: This was missing from previous version
// We need to set the current version before routing the message
if (msg.msg === 'connected') {
this._connection._version = this._connection._versionSuggestion;
}
await this._routeMessage(msg);
}
/**
* Routes messages to their appropriate handlers based on message type
* @private
* @param {Object} msg The parsed DDP message
*/
async _routeMessage(msg) {
switch (msg.msg) {
case 'connected':
await this._connection._livedata_connected(msg);
this._connection.options.onConnected();
break;
case 'failed':
await this._handleFailedMessage(msg);
break;
case 'ping':
if (this._connection.options.respondToPings) {
this._connection._send({ msg: 'pong', id: msg.id });
}
break;
case 'pong':
// noop, as we assume everything's a pong
break;
case 'added':
case 'changed':
case 'removed':
case 'ready':
case 'updated':
await this._connection._livedata_data(msg);
break;
case 'nosub':
await this._connection._livedata_nosub(msg);
break;
case 'result':
await this._connection._livedata_result(msg);
break;
case 'error':
this._connection._livedata_error(msg);
break;
default:
Meteor._debug('discarding unknown livedata message type', msg);
}
}
/**
* Handles failed connection messages
* @private
* @param {Object} msg The failed message object
*/
_handleFailedMessage(msg) {
if (this._connection._supportedDDPVersions.indexOf(msg.version) >= 0) {
this._connection._versionSuggestion = msg.version;
this._connection._stream.reconnect({ _force: true });
} else {
const description =
'DDP version negotiation failed; server requested version ' +
msg.version;
this._connection._stream.disconnect({ _permanent: true, _error: description });
this._connection.options.onDDPVersionNegotiationFailure(description);
}
}
/**
* Handles connection reset events
*/
onReset() {
// Reset is called even on the first connection, so this is
// the only place we send this message.
const msg = this._buildConnectMessage();
this._connection._send(msg);
// Mark non-retry calls as failed and handle outstanding methods
this._handleOutstandingMethodsOnReset();
// Now, to minimize setup latency, go ahead and blast out all of
// our pending methods ands subscriptions before we've even taken
// the necessary RTT to know if we successfully reconnected.
this._connection._callOnReconnectAndSendAppropriateOutstandingMethods();
this._resendSubscriptions();
}
/**
* Builds the initial connect message
* @private
* @returns {Object} The connect message object
*/
_buildConnectMessage() {
const msg = { msg: 'connect' };
if (this._connection._lastSessionId) {
msg.session = this._connection._lastSessionId;
}
msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0];
this._connection._versionSuggestion = msg.version;
msg.support = this._connection._supportedDDPVersions;
return msg;
}
/**
* Handles outstanding methods during a reset
* @private
*/
_handleOutstandingMethodsOnReset() {
const blocks = this._connection._outstandingMethodBlocks;
if (blocks.length === 0) return;
const currentMethodBlock = blocks[0].methods;
blocks[0].methods = currentMethodBlock.filter(
methodInvoker => {
// Methods with 'noRetry' option set are not allowed to re-send after
// recovering dropped connection.
if (methodInvoker.sentMessage && methodInvoker.noRetry) {
methodInvoker.receiveResult(
new Meteor.Error(
'invocation-failed',
'Method invocation might have failed due to dropped connection. ' +
'Failing because `noRetry` option was passed to Meteor.apply.'
)
);
}
// Only keep a method if it wasn't sent or it's allowed to retry.
return !(methodInvoker.sentMessage && methodInvoker.noRetry);
}
);
// Clear empty blocks
if (blocks.length > 0 && blocks[0].methods.length === 0) {
blocks.shift();
}
// Reset all method invokers as unsent
Object.values(this._connection._methodInvokers).forEach(invoker => {
invoker.sentMessage = false;
});
}
/**
* Resends all active subscriptions
* @private
*/
_resendSubscriptions() {
Object.entries(this._connection._subscriptions).forEach(([id, sub]) => {
this._connection._sendQueued({
msg: 'sub',
id: id,
name: sub.name,
params: sub.params
});
});
}
}

View File

@@ -0,0 +1,206 @@
import { MongoID } from 'meteor/mongo-id';
import { DiffSequence } from 'meteor/diff-sequence';
import { hasOwn } from "meteor/ddp-common/utils";
import { isEmpty } from "meteor/ddp-common/utils";
export class DocumentProcessors {
constructor(connection) {
this._connection = connection;
}
/**
* @summary Process an 'added' message from the server
* @param {Object} msg The added message
* @param {Object} updates The updates accumulator
*/
async _process_added(msg, updates) {
const self = this._connection;
const id = MongoID.idParse(msg.id);
const serverDoc = self._getServerDoc(msg.collection, id);
if (serverDoc) {
// Some outstanding stub wrote here.
const isExisting = serverDoc.document !== undefined;
serverDoc.document = msg.fields || Object.create(null);
serverDoc.document._id = id;
if (self._resetStores) {
// During reconnect the server is sending adds for existing ids.
// Always push an update so that document stays in the store after
// reset. Use current version of the document for this update, so
// that stub-written values are preserved.
const currentDoc = await self._stores[msg.collection].getDoc(msg.id);
if (currentDoc !== undefined) msg.fields = currentDoc;
self._pushUpdate(updates, msg.collection, msg);
} else if (isExisting) {
throw new Error('Server sent add for existing id: ' + msg.id);
}
} else {
self._pushUpdate(updates, msg.collection, msg);
}
}
/**
* @summary Process a 'changed' message from the server
* @param {Object} msg The changed message
* @param {Object} updates The updates accumulator
*/
_process_changed(msg, updates) {
const self = this._connection;
const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id));
if (serverDoc) {
if (serverDoc.document === undefined) {
throw new Error('Server sent changed for nonexisting id: ' + msg.id);
}
DiffSequence.applyChanges(serverDoc.document, msg.fields);
} else {
self._pushUpdate(updates, msg.collection, msg);
}
}
/**
* @summary Process a 'removed' message from the server
* @param {Object} msg The removed message
* @param {Object} updates The updates accumulator
*/
_process_removed(msg, updates) {
const self = this._connection;
const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id));
if (serverDoc) {
// Some outstanding stub wrote here.
if (serverDoc.document === undefined) {
throw new Error('Server sent removed for nonexisting id:' + msg.id);
}
serverDoc.document = undefined;
} else {
self._pushUpdate(updates, msg.collection, {
msg: 'removed',
collection: msg.collection,
id: msg.id
});
}
}
/**
* @summary Process a 'ready' message from the server
* @param {Object} msg The ready message
* @param {Object} updates The updates accumulator
*/
_process_ready(msg, updates) {
const self = this._connection;
// Process "sub ready" messages. "sub ready" messages don't take effect
// until all current server documents have been flushed to the local
// database. We can use a write fence to implement this.
msg.subs.forEach((subId) => {
self._runWhenAllServerDocsAreFlushed(() => {
const subRecord = self._subscriptions[subId];
// Did we already unsubscribe?
if (!subRecord) return;
// Did we already receive a ready message? (Oops!)
if (subRecord.ready) return;
subRecord.ready = true;
subRecord.readyCallback && subRecord.readyCallback();
subRecord.readyDeps.changed();
});
});
}
/**
* @summary Process an 'updated' message from the server
* @param {Object} msg The updated message
* @param {Object} updates The updates accumulator
*/
_process_updated(msg, updates) {
const self = this._connection;
// Process "method done" messages.
msg.methods.forEach((methodId) => {
const docs = self._documentsWrittenByStub[methodId] || {};
Object.values(docs).forEach((written) => {
const serverDoc = self._getServerDoc(written.collection, written.id);
if (!serverDoc) {
throw new Error('Lost serverDoc for ' + JSON.stringify(written));
}
if (!serverDoc.writtenByStubs[methodId]) {
throw new Error(
'Doc ' +
JSON.stringify(written) +
' not written by method ' +
methodId
);
}
delete serverDoc.writtenByStubs[methodId];
if (isEmpty(serverDoc.writtenByStubs)) {
// All methods whose stubs wrote this method have completed! We can
// now copy the saved document to the database (reverting the stub's
// change if the server did not write to this object, or applying the
// server's writes if it did).
// This is a fake ddp 'replace' message. It's just for talking
// between livedata connections and minimongo. (We have to stringify
// the ID because it's supposed to look like a wire message.)
self._pushUpdate(updates, written.collection, {
msg: 'replace',
id: MongoID.idStringify(written.id),
replace: serverDoc.document
});
// Call all flush callbacks.
serverDoc.flushCallbacks.forEach((c) => {
c();
});
// Delete this completed serverDocument. Don't bother to GC empty
// IdMaps inside self._serverDocuments, since there probably aren't
// many collections and they'll be written repeatedly.
self._serverDocuments[written.collection].remove(written.id);
}
});
delete self._documentsWrittenByStub[methodId];
// We want to call the data-written callback, but we can't do so until all
// currently buffered messages are flushed.
const callbackInvoker = self._methodInvokers[methodId];
if (!callbackInvoker) {
throw new Error('No callback invoker for method ' + methodId);
}
self._runWhenAllServerDocsAreFlushed(
(...args) => callbackInvoker.dataVisible(...args)
);
});
}
/**
* @summary Push an update to the buffer
* @private
* @param {Object} updates The updates accumulator
* @param {String} collection The collection name
* @param {Object} msg The update message
*/
_pushUpdate(updates, collection, msg) {
if (!hasOwn.call(updates, collection)) {
updates[collection] = [];
}
updates[collection].push(msg);
}
/**
* @summary Get a server document by collection and id
* @private
* @param {String} collection The collection name
* @param {String} id The document id
* @returns {Object|null} The server document or null
*/
_getServerDoc(collection, id) {
const self = this._connection;
if (!hasOwn.call(self._serverDocuments, collection)) {
return null;
}
const serverDocsForCollection = self._serverDocuments[collection];
return serverDocsForCollection.get(id) || null;
}
}

View File

@@ -5,20 +5,18 @@ import { EJSON } from 'meteor/ejson';
import { Random } from 'meteor/random';
import { MongoID } from 'meteor/mongo-id';
import { DDP } from './namespace.js';
import MethodInvoker from './MethodInvoker.js';
import { MethodInvoker } from './method_invoker';
import {
hasOwn,
slice,
keys,
isEmpty,
last,
} from "meteor/ddp-common/utils.js";
class MongoIDMap extends IdMap {
constructor() {
super(MongoID.idStringify, MongoID.idParse);
}
}
} from "meteor/ddp-common/utils";
import { ConnectionStreamHandlers } from './connection_stream_handlers';
import { MongoIDMap } from './mongo_id_map';
import { MessageProcessors } from './message_processors';
import { DocumentProcessors } from './document_processors';
// @param url {String|Object} URL to Meteor app,
// or an object as a test hook (see code)
@@ -202,12 +200,6 @@ export class Connection {
self._updatesForUnknownStores = {};
// if we're blocking a migration, the retry func
self._retryMigrate = null;
self.__flushBufferedWrites = Meteor.bindEnvironment(
self._flushBufferedWrites,
'flushing DDP buffered writes',
self
);
// Collection name -> array of messages.
self._bufferedWrites = {};
// When current buffer of updates must be flushed at, in ms timestamp.
@@ -249,34 +241,63 @@ export class Connection {
});
}
this._streamHandlers = new ConnectionStreamHandlers(this);
const onDisconnect = () => {
if (self._heartbeat) {
self._heartbeat.stop();
self._heartbeat = null;
if (this._heartbeat) {
this._heartbeat.stop();
this._heartbeat = null;
}
};
if (Meteor.isServer) {
self._stream.on(
this._stream.on(
'message',
Meteor.bindEnvironment(
this.onMessage.bind(this),
msg => this._streamHandlers.onMessage(msg),
'handling DDP message'
)
);
self._stream.on(
this._stream.on(
'reset',
Meteor.bindEnvironment(this.onReset.bind(this), 'handling DDP reset')
Meteor.bindEnvironment(
() => this._streamHandlers.onReset(),
'handling DDP reset'
)
);
self._stream.on(
this._stream.on(
'disconnect',
Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect')
);
} else {
self._stream.on('message', this.onMessage.bind(this));
self._stream.on('reset', this.onReset.bind(this));
self._stream.on('disconnect', onDisconnect);
this._stream.on('message', msg => this._streamHandlers.onMessage(msg));
this._stream.on('reset', () => this._streamHandlers.onReset());
this._stream.on('disconnect', onDisconnect);
}
this._messageProcessors = new MessageProcessors(this);
// Expose message processor methods to maintain backward compatibility
this._livedata_connected = (msg) => this._messageProcessors._livedata_connected(msg);
this._livedata_data = (msg) => this._messageProcessors._livedata_data(msg);
this._livedata_nosub = (msg) => this._messageProcessors._livedata_nosub(msg);
this._livedata_result = (msg) => this._messageProcessors._livedata_result(msg);
this._livedata_error = (msg) => this._messageProcessors._livedata_error(msg);
this._documentProcessors = new DocumentProcessors(this);
// Expose document processor methods to maintain backward compatibility
this._process_added = (msg, updates) => this._documentProcessors._process_added(msg, updates);
this._process_changed = (msg, updates) => this._documentProcessors._process_changed(msg, updates);
this._process_removed = (msg, updates) => this._documentProcessors._process_removed(msg, updates);
this._process_ready = (msg, updates) => this._documentProcessors._process_ready(msg, updates);
this._process_updated = (msg, updates) => this._documentProcessors._process_updated(msg, updates);
// Also expose utility methods used by other parts of the system
this._pushUpdate = (updates, collection, msg) =>
this._documentProcessors._pushUpdate(updates, collection, msg);
this._getServerDoc = (collection, id) =>
this._documentProcessors._getServerDoc(collection, id);
}
// 'name' is the name of the data on the wire that should go in the
@@ -941,7 +962,7 @@ export class Connection {
// documents.
_saveOriginals() {
if (! this._waitingForQuiescence()) {
this._flushBufferedWritesClient();
this._flushBufferedWrites();
}
Object.values(this._stores).forEach((store) => {
@@ -1099,121 +1120,6 @@ export class Connection {
return Object.values(invokers).some((invoker) => !!invoker.sentMessage);
}
async _livedata_connected(msg) {
const self = this;
if (self._version !== 'pre1' && self._heartbeatInterval !== 0) {
self._heartbeat = new DDPCommon.Heartbeat({
heartbeatInterval: self._heartbeatInterval,
heartbeatTimeout: self._heartbeatTimeout,
onTimeout() {
self._lostConnection(
new DDP.ConnectionError('DDP heartbeat timed out')
);
},
sendPing() {
self._send({ msg: 'ping' });
}
});
self._heartbeat.start();
}
// If this is a reconnect, we'll have to reset all stores.
if (self._lastSessionId) self._resetStores = true;
let reconnectedToPreviousSession;
if (typeof msg.session === 'string') {
reconnectedToPreviousSession = self._lastSessionId === msg.session;
self._lastSessionId = msg.session;
}
if (reconnectedToPreviousSession) {
// Successful reconnection -- pick up where we left off. Note that right
// now, this never happens: the server never connects us to a previous
// session, because DDP doesn't provide enough data for the server to know
// what messages the client has processed. We need to improve DDP to make
// this possible, at which point we'll probably need more code here.
return;
}
// Server doesn't have our data any more. Re-sync a new session.
// Forget about messages we were buffering for unknown collections. They'll
// be resent if still relevant.
self._updatesForUnknownStores = Object.create(null);
if (self._resetStores) {
// Forget about the effects of stubs. We'll be resetting all collections
// anyway.
self._documentsWrittenByStub = Object.create(null);
self._serverDocuments = Object.create(null);
}
// Clear _afterUpdateCallbacks.
self._afterUpdateCallbacks = [];
// Mark all named subscriptions which are ready (ie, we already called the
// ready callback) as needing to be revived.
// XXX We should also block reconnect quiescence until unnamed subscriptions
// (eg, autopublish) are done re-publishing to avoid flicker!
self._subsBeingRevived = Object.create(null);
Object.entries(self._subscriptions).forEach(([id, sub]) => {
if (sub.ready) {
self._subsBeingRevived[id] = true;
}
});
// Arrange for "half-finished" methods to have their callbacks run, and
// track methods that were sent on this connection so that we don't
// quiesce until they are all done.
//
// Start by clearing _methodsBlockingQuiescence: methods sent before
// reconnect don't matter, and any "wait" methods sent on the new connection
// that we drop here will be restored by the loop below.
self._methodsBlockingQuiescence = Object.create(null);
if (self._resetStores) {
const invokers = self._methodInvokers;
keys(invokers).forEach(id => {
const invoker = invokers[id];
if (invoker.gotResult()) {
// This method already got its result, but it didn't call its callback
// because its data didn't become visible. We did not resend the
// method RPC. We'll call its callback when we get a full quiesce,
// since that's as close as we'll get to "data must be visible".
self._afterUpdateCallbacks.push(
(...args) => invoker.dataVisible(...args)
);
} else if (invoker.sentMessage) {
// This method has been sent on this connection (maybe as a resend
// from the last connection, maybe from onReconnect, maybe just very
// quickly before processing the connected message).
//
// We don't need to do anything special to ensure its callbacks get
// called, but we'll count it as a method which is preventing
// reconnect quiescence. (eg, it might be a login method that was run
// from onReconnect, and we don't want to see flicker by seeing a
// logged-out state.)
self._methodsBlockingQuiescence[invoker.methodId] = true;
}
});
}
self._messagesBufferedUntilQuiescence = [];
// If we're not waiting on any methods or subs, we can reset the stores and
// call the callbacks immediately.
if (! self._waitingForQuiescence()) {
if (self._resetStores) {
for (const store of Object.values(self._stores)) {
await store.beginUpdate(0, true);
await store.endUpdate();
}
self._resetStores = false;
}
self._runAfterUpdateCallbacks();
}
}
async _processOneDataMessage(msg, updates) {
const messageType = msg.msg;
@@ -1235,87 +1141,6 @@ export class Connection {
}
}
async _livedata_data(msg) {
const self = this;
if (self._waitingForQuiescence()) {
self._messagesBufferedUntilQuiescence.push(msg);
if (msg.msg === 'nosub') {
delete self._subsBeingRevived[msg.id];
}
if (msg.subs) {
msg.subs.forEach(subId => {
delete self._subsBeingRevived[subId];
});
}
if (msg.methods) {
msg.methods.forEach(methodId => {
delete self._methodsBlockingQuiescence[methodId];
});
}
if (self._waitingForQuiescence()) {
return;
}
// No methods or subs are blocking quiescence!
// We'll now process and all of our buffered messages, reset all stores,
// and apply them all at once.
const bufferedMessages = self._messagesBufferedUntilQuiescence;
for (const bufferedMessage of Object.values(bufferedMessages)) {
await self._processOneDataMessage(
bufferedMessage,
self._bufferedWrites
);
}
self._messagesBufferedUntilQuiescence = [];
} else {
await self._processOneDataMessage(msg, self._bufferedWrites);
}
// Immediately flush writes when:
// 1. Buffering is disabled. Or;
// 2. any non-(added/changed/removed) message arrives.
const standardWrite =
msg.msg === "added" ||
msg.msg === "changed" ||
msg.msg === "removed";
if (self._bufferedWritesInterval === 0 || ! standardWrite) {
await self._flushBufferedWrites();
return;
}
if (self._bufferedWritesFlushAt === null) {
self._bufferedWritesFlushAt =
new Date().valueOf() + self._bufferedWritesMaxAge;
} else if (self._bufferedWritesFlushAt < new Date().valueOf()) {
await self._flushBufferedWrites();
return;
}
if (self._bufferedWritesFlushHandle) {
clearTimeout(self._bufferedWritesFlushHandle);
}
self._bufferedWritesFlushHandle = setTimeout(() => {
// __flushBufferedWrites is a promise, so with this we can wait the promise to finish
// before doing something
self._liveDataWritesPromise = self.__flushBufferedWrites();
if (Meteor._isPromise(self._liveDataWritesPromise)) {
self._liveDataWritesPromise.finally(
() => (self._liveDataWritesPromise = undefined)
);
}
}, self._bufferedWritesInterval);
}
_prepareBuffersToFlush() {
const self = this;
if (self._bufferedWritesFlushHandle) {
@@ -1332,61 +1157,49 @@ export class Connection {
return writes;
}
async _flushBufferedWritesServer() {
const self = this;
const writes = self._prepareBuffersToFlush();
await self._performWritesServer(writes);
}
_flushBufferedWritesClient() {
const self = this;
const writes = self._prepareBuffersToFlush();
self._performWritesClient(writes);
}
_flushBufferedWrites() {
const self = this;
return Meteor.isClient
? self._flushBufferedWritesClient()
: self._flushBufferedWritesServer();
}
/**
* Server-side store updates handled asynchronously
* @private
*/
async _performWritesServer(updates) {
const self = this;
if (self._resetStores || ! isEmpty(updates)) {
// Begin a transactional update of each store.
for (const [storeName, store] of Object.entries(self._stores)) {
if (self._resetStores || !isEmpty(updates)) {
// Start all store updates - keeping original loop structure
for (const store of Object.values(self._stores)) {
await store.beginUpdate(
hasOwn.call(updates, storeName)
? updates[storeName].length
: 0,
updates[store._name]?.length || 0,
self._resetStores
);
}
self._resetStores = false;
for (const [storeName, updateMessages] of Object.entries(updates)) {
// Process each store's updates sequentially as before
for (const [storeName, messages] of Object.entries(updates)) {
const store = self._stores[storeName];
if (store) {
for (const updateMessage of updateMessages) {
await store.update(updateMessage);
// Batch each store's messages in modest chunks to prevent event loop blocking
// while maintaining operation order
const CHUNK_SIZE = 100;
for (let i = 0; i < messages.length; i += CHUNK_SIZE) {
const chunk = messages.slice(i, Math.min(i + CHUNK_SIZE, messages.length));
for (const msg of chunk) {
await store.update(msg);
}
await new Promise(resolve => process.nextTick(resolve));
}
} else {
// Nobody's listening for this data. Queue it up until
// someone wants it.
// XXX memory use will grow without bound if you forget to
// create a collection or just don't care about it... going
// to have to do something about that.
const updates = self._updatesForUnknownStores;
if (! hasOwn.call(updates, storeName)) {
updates[storeName] = [];
}
updates[storeName].push(...updateMessages);
// Queue updates for uninitialized stores
self._updatesForUnknownStores[storeName] =
self._updatesForUnknownStores[storeName] || [];
self._updatesForUnknownStores[storeName].push(...messages);
}
}
// End update transaction.
// Complete all updates
for (const store of Object.values(self._stores)) {
await store.endUpdate();
}
@@ -1394,53 +1207,55 @@ export class Connection {
self._runAfterUpdateCallbacks();
}
/**
* Client-side store updates handled synchronously for optimistic UI
* @private
*/
_performWritesClient(updates) {
const self = this;
if (self._resetStores || ! isEmpty(updates)) {
// Begin a transactional update of each store.
for (const [storeName, store] of Object.entries(self._stores)) {
if (self._resetStores || !isEmpty(updates)) {
// Synchronous store updates for client
Object.values(self._stores).forEach(store => {
store.beginUpdate(
hasOwn.call(updates, storeName)
? updates[storeName].length
: 0,
updates[store._name]?.length || 0,
self._resetStores
);
}
});
self._resetStores = false;
for (const [storeName, updateMessages] of Object.entries(updates)) {
Object.entries(updates).forEach(([storeName, messages]) => {
const store = self._stores[storeName];
if (store) {
for (const updateMessage of updateMessages) {
store.update(updateMessage);
}
messages.forEach(msg => store.update(msg));
} else {
// Nobody's listening for this data. Queue it up until
// someone wants it.
// XXX memory use will grow without bound if you forget to
// create a collection or just don't care about it... going
// to have to do something about that.
const updates = self._updatesForUnknownStores;
if (! hasOwn.call(updates, storeName)) {
updates[storeName] = [];
}
updates[storeName].push(...updateMessages);
self._updatesForUnknownStores[storeName] =
self._updatesForUnknownStores[storeName] || [];
self._updatesForUnknownStores[storeName].push(...messages);
}
}
// End update transaction.
for (const store of Object.values(self._stores)) {
store.endUpdate();
}
});
Object.values(self._stores).forEach(store => store.endUpdate());
}
self._runAfterUpdateCallbacks();
}
/**
* Executes buffered writes either synchronously (client) or async (server)
* @private
*/
async _flushBufferedWrites() {
const self = this;
const writes = self._prepareBuffersToFlush();
return Meteor.isClient
? self._performWritesClient(writes)
: self._performWritesServer(writes);
}
// Call any callbacks deferred with _runWhenAllServerDocsAreFlushed whose
// relevant docs have been flushed, as well as dataVisible callbacks at
// reconnect-quiescence time.
@@ -1453,160 +1268,6 @@ export class Connection {
});
}
_pushUpdate(updates, collection, msg) {
if (! hasOwn.call(updates, collection)) {
updates[collection] = [];
}
updates[collection].push(msg);
}
_getServerDoc(collection, id) {
const self = this;
if (! hasOwn.call(self._serverDocuments, collection)) {
return null;
}
const serverDocsForCollection = self._serverDocuments[collection];
return serverDocsForCollection.get(id) || null;
}
async _process_added(msg, updates) {
const self = this;
const id = MongoID.idParse(msg.id);
const serverDoc = self._getServerDoc(msg.collection, id);
if (serverDoc) {
// Some outstanding stub wrote here.
const isExisting = serverDoc.document !== undefined;
serverDoc.document = msg.fields || Object.create(null);
serverDoc.document._id = id;
if (self._resetStores) {
// During reconnect the server is sending adds for existing ids.
// Always push an update so that document stays in the store after
// reset. Use current version of the document for this update, so
// that stub-written values are preserved.
const currentDoc = await self._stores[msg.collection].getDoc(msg.id);
if (currentDoc !== undefined) msg.fields = currentDoc;
self._pushUpdate(updates, msg.collection, msg);
} else if (isExisting) {
throw new Error('Server sent add for existing id: ' + msg.id);
}
} else {
self._pushUpdate(updates, msg.collection, msg);
}
}
_process_changed(msg, updates) {
const self = this;
const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id));
if (serverDoc) {
if (serverDoc.document === undefined)
throw new Error('Server sent changed for nonexisting id: ' + msg.id);
DiffSequence.applyChanges(serverDoc.document, msg.fields);
} else {
self._pushUpdate(updates, msg.collection, msg);
}
}
_process_removed(msg, updates) {
const self = this;
const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id));
if (serverDoc) {
// Some outstanding stub wrote here.
if (serverDoc.document === undefined)
throw new Error('Server sent removed for nonexisting id:' + msg.id);
serverDoc.document = undefined;
} else {
self._pushUpdate(updates, msg.collection, {
msg: 'removed',
collection: msg.collection,
id: msg.id
});
}
}
_process_updated(msg, updates) {
const self = this;
// Process "method done" messages.
msg.methods.forEach((methodId) => {
const docs = self._documentsWrittenByStub[methodId] || {};
Object.values(docs).forEach((written) => {
const serverDoc = self._getServerDoc(written.collection, written.id);
if (! serverDoc) {
throw new Error('Lost serverDoc for ' + JSON.stringify(written));
}
if (! serverDoc.writtenByStubs[methodId]) {
throw new Error(
'Doc ' +
JSON.stringify(written) +
' not written by method ' +
methodId
);
}
delete serverDoc.writtenByStubs[methodId];
if (isEmpty(serverDoc.writtenByStubs)) {
// All methods whose stubs wrote this method have completed! We can
// now copy the saved document to the database (reverting the stub's
// change if the server did not write to this object, or applying the
// server's writes if it did).
// This is a fake ddp 'replace' message. It's just for talking
// between livedata connections and minimongo. (We have to stringify
// the ID because it's supposed to look like a wire message.)
self._pushUpdate(updates, written.collection, {
msg: 'replace',
id: MongoID.idStringify(written.id),
replace: serverDoc.document
});
// Call all flush callbacks.
serverDoc.flushCallbacks.forEach((c) => {
c();
});
// Delete this completed serverDocument. Don't bother to GC empty
// IdMaps inside self._serverDocuments, since there probably aren't
// many collections and they'll be written repeatedly.
self._serverDocuments[written.collection].remove(written.id);
}
});
delete self._documentsWrittenByStub[methodId];
// We want to call the data-written callback, but we can't do so until all
// currently buffered messages are flushed.
const callbackInvoker = self._methodInvokers[methodId];
if (! callbackInvoker) {
throw new Error('No callback invoker for method ' + methodId);
}
self._runWhenAllServerDocsAreFlushed(
(...args) => callbackInvoker.dataVisible(...args)
);
});
}
_process_ready(msg, updates) {
const self = this;
// Process "sub ready" messages. "sub ready" messages don't take effect
// until all current server documents have been flushed to the local
// database. We can use a write fence to implement this.
msg.subs.forEach((subId) => {
self._runWhenAllServerDocsAreFlushed(() => {
const subRecord = self._subscriptions[subId];
// Did we already unsubscribe?
if (!subRecord) return;
// Did we already receive a ready message? (Oops!)
if (subRecord.ready) return;
subRecord.ready = true;
subRecord.readyCallback && subRecord.readyCallback();
subRecord.readyDeps.changed();
});
});
}
// Ensures that "f" will be called after all documents currently in
// _serverDocuments have been written to the local cache. f will not be called
// if the connection is lost before then!
@@ -1646,93 +1307,6 @@ export class Connection {
}
}
async _livedata_nosub(msg) {
const self = this;
// First pass it through _livedata_data, which only uses it to help get
// towards quiescence.
await self._livedata_data(msg);
// Do the rest of our processing immediately, with no
// buffering-until-quiescence.
// we weren't subbed anyway, or we initiated the unsub.
if (! hasOwn.call(self._subscriptions, msg.id)) {
return;
}
// XXX COMPAT WITH 1.0.3.1 #errorCallback
const errorCallback = self._subscriptions[msg.id].errorCallback;
const stopCallback = self._subscriptions[msg.id].stopCallback;
self._subscriptions[msg.id].remove();
const meteorErrorFromMsg = msgArg => {
return (
msgArg &&
msgArg.error &&
new Meteor.Error(
msgArg.error.error,
msgArg.error.reason,
msgArg.error.details
)
);
};
// XXX COMPAT WITH 1.0.3.1 #errorCallback
if (errorCallback && msg.error) {
errorCallback(meteorErrorFromMsg(msg));
}
if (stopCallback) {
stopCallback(meteorErrorFromMsg(msg));
}
}
async _livedata_result(msg) {
// id, result or error. error has error (code), reason, details
const self = this;
// Lets make sure there are no buffered writes before returning result.
if (! isEmpty(self._bufferedWrites)) {
await self._flushBufferedWrites();
}
// find the outstanding request
// should be O(1) in nearly all realistic use cases
if (isEmpty(self._outstandingMethodBlocks)) {
Meteor._debug('Received method result but no methods outstanding');
return;
}
const currentMethodBlock = self._outstandingMethodBlocks[0].methods;
let i;
const m = currentMethodBlock.find((method, idx) => {
const found = method.methodId === msg.id;
if (found) i = idx;
return found;
});
if (!m) {
Meteor._debug("Can't match method response to original method call", msg);
return;
}
// Remove from current method block. This may leave the block empty, but we
// don't move on to the next block until the callback has been delivered, in
// _outstandingMethodFinished.
currentMethodBlock.splice(i, 1);
if (hasOwn.call(msg, 'error')) {
m.receiveResult(
new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details)
);
} else {
// msg.result may be undefined if the method didn't return a
// value
m.receiveResult(undefined, msg.result);
}
}
_addOutstandingMethod(methodInvoker, options) {
if (options?.wait) {
// It's a wait method! Wait methods go in their own block.
@@ -1801,11 +1375,6 @@ export class Connection {
});
}
_livedata_error(msg) {
Meteor._debug('Received error from server: ', msg.reason);
if (msg.offendingMessage) Meteor._debug('For: ', msg.offendingMessage);
}
_sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks) {
const self = this;
if (isEmpty(oldOutstandingMethodBlocks)) return;
@@ -1870,148 +1439,4 @@ export class Connection {
self._retryMigrate = null;
}
}
async onMessage(raw_msg) {
let msg;
try {
msg = DDPCommon.parseDDP(raw_msg);
} catch (e) {
Meteor._debug('Exception while parsing DDP', e);
return;
}
// Any message counts as receiving a pong, as it demonstrates that
// the server is still alive.
if (this._heartbeat) {
this._heartbeat.messageReceived();
}
if (msg === null || !msg.msg) {
if(!msg || !msg.testMessageOnConnect) {
if (Object.keys(msg).length === 1 && msg.server_id) return;
Meteor._debug('discarding invalid livedata message', msg);
}
return;
}
if (msg.msg === 'connected') {
this._version = this._versionSuggestion;
await this._livedata_connected(msg);
this.options.onConnected();
} else if (msg.msg === 'failed') {
if (this._supportedDDPVersions.indexOf(msg.version) >= 0) {
this._versionSuggestion = msg.version;
this._stream.reconnect({ _force: true });
} else {
const description =
'DDP version negotiation failed; server requested version ' +
msg.version;
this._stream.disconnect({ _permanent: true, _error: description });
this.options.onDDPVersionNegotiationFailure(description);
}
} else if (msg.msg === 'ping' && this.options.respondToPings) {
this._send({ msg: 'pong', id: msg.id });
} else if (msg.msg === 'pong') {
// noop, as we assume everything's a pong
} else if (
['added', 'changed', 'removed', 'ready', 'updated'].includes(msg.msg)
) {
await this._livedata_data(msg);
} else if (msg.msg === 'nosub') {
await this._livedata_nosub(msg);
} else if (msg.msg === 'result') {
await this._livedata_result(msg);
} else if (msg.msg === 'error') {
this._livedata_error(msg);
} else {
Meteor._debug('discarding unknown livedata message type', msg);
}
}
onReset() {
// Send a connect message at the beginning of the stream.
// NOTE: reset is called even on the first connection, so this is
// the only place we send this message.
const msg = { msg: 'connect' };
if (this._lastSessionId) msg.session = this._lastSessionId;
msg.version = this._versionSuggestion || this._supportedDDPVersions[0];
this._versionSuggestion = msg.version;
msg.support = this._supportedDDPVersions;
this._send(msg);
// Mark non-retry calls as failed. This has to be done early as getting these methods out of the
// current block is pretty important to making sure that quiescence is properly calculated, as
// well as possibly moving on to another useful block.
// Only bother testing if there is an outstandingMethodBlock (there might not be, especially if
// we are connecting for the first time.
if (this._outstandingMethodBlocks.length > 0) {
// If there is an outstanding method block, we only care about the first one as that is the
// one that could have already sent messages with no response, that are not allowed to retry.
const currentMethodBlock = this._outstandingMethodBlocks[0].methods;
this._outstandingMethodBlocks[0].methods = currentMethodBlock.filter(
methodInvoker => {
// Methods with 'noRetry' option set are not allowed to re-send after
// recovering dropped connection.
if (methodInvoker.sentMessage && methodInvoker.noRetry) {
// Make sure that the method is told that it failed.
methodInvoker.receiveResult(
new Meteor.Error(
'invocation-failed',
'Method invocation might have failed due to dropped connection. ' +
'Failing because `noRetry` option was passed to Meteor.apply.'
)
);
}
// Only keep a method if it wasn't sent or it's allowed to retry.
// This may leave the block empty, but we don't move on to the next
// block until the callback has been delivered, in _outstandingMethodFinished.
return !(methodInvoker.sentMessage && methodInvoker.noRetry);
}
);
}
// Now, to minimize setup latency, go ahead and blast out all of
// our pending methods ands subscriptions before we've even taken
// the necessary RTT to know if we successfully reconnected. (1)
// They're supposed to be idempotent, and where they are not,
// they can block retry in apply; (2) even if we did reconnect,
// we're not sure what messages might have gotten lost
// (in either direction) since we were disconnected (TCP being
// sloppy about that.)
// If the current block of methods all got their results (but didn't all get
// their data visible), discard the empty block now.
if (
this._outstandingMethodBlocks.length > 0 &&
this._outstandingMethodBlocks[0].methods.length === 0
) {
this._outstandingMethodBlocks.shift();
}
// Mark all messages as unsent, they have not yet been sent on this
// connection.
keys(this._methodInvokers).forEach(id => {
this._methodInvokers[id].sentMessage = false;
});
// If an `onReconnect` handler is set, call it first. Go through
// some hoops to ensure that methods that are called from within
// `onReconnect` get executed _before_ ones that were originally
// outstanding (since `onReconnect` is used to re-establish auth
// certificates)
this._callOnReconnectAndSendAppropriateOutstandingMethods();
// add new subscriptions at the end. this way they take effect after
// the handlers and we don't see flicker.
Object.entries(this._subscriptions).forEach(([id, sub]) => {
this._sendQueued({
msg: 'sub',
id: id,
name: sub.name,
params: sub.params
});
});
}
}

View File

@@ -0,0 +1,336 @@
import { DDPCommon } from 'meteor/ddp-common';
import { Meteor } from 'meteor/meteor';
import { DDP } from './namespace.js';
import { EJSON } from 'meteor/ejson';
import { isEmpty, hasOwn } from "meteor/ddp-common/utils";
export class MessageProcessors {
constructor(connection) {
this._connection = connection;
}
/**
* @summary Process the connection message and set up the session
* @param {Object} msg The connection message
*/
async _livedata_connected(msg) {
const self = this._connection;
if (self._version !== 'pre1' && self._heartbeatInterval !== 0) {
self._heartbeat = new DDPCommon.Heartbeat({
heartbeatInterval: self._heartbeatInterval,
heartbeatTimeout: self._heartbeatTimeout,
onTimeout() {
self._lostConnection(
new DDP.ConnectionError('DDP heartbeat timed out')
);
},
sendPing() {
self._send({ msg: 'ping' });
}
});
self._heartbeat.start();
}
// If this is a reconnect, we'll have to reset all stores.
if (self._lastSessionId) self._resetStores = true;
let reconnectedToPreviousSession;
if (typeof msg.session === 'string') {
reconnectedToPreviousSession = self._lastSessionId === msg.session;
self._lastSessionId = msg.session;
}
if (reconnectedToPreviousSession) {
// Successful reconnection -- pick up where we left off.
return;
}
// Server doesn't have our data anymore. Re-sync a new session.
// Forget about messages we were buffering for unknown collections. They'll
// be resent if still relevant.
self._updatesForUnknownStores = Object.create(null);
if (self._resetStores) {
// Forget about the effects of stubs. We'll be resetting all collections
// anyway.
self._documentsWrittenByStub = Object.create(null);
self._serverDocuments = Object.create(null);
}
// Clear _afterUpdateCallbacks.
self._afterUpdateCallbacks = [];
// Mark all named subscriptions which are ready as needing to be revived.
self._subsBeingRevived = Object.create(null);
Object.entries(self._subscriptions).forEach(([id, sub]) => {
if (sub.ready) {
self._subsBeingRevived[id] = true;
}
});
// Arrange for "half-finished" methods to have their callbacks run, and
// track methods that were sent on this connection so that we don't
// quiesce until they are all done.
//
// Start by clearing _methodsBlockingQuiescence: methods sent before
// reconnect don't matter, and any "wait" methods sent on the new connection
// that we drop here will be restored by the loop below.
self._methodsBlockingQuiescence = Object.create(null);
if (self._resetStores) {
const invokers = self._methodInvokers;
Object.keys(invokers).forEach(id => {
const invoker = invokers[id];
if (invoker.gotResult()) {
// This method already got its result, but it didn't call its callback
// because its data didn't become visible. We did not resend the
// method RPC. We'll call its callback when we get a full quiesce,
// since that's as close as we'll get to "data must be visible".
self._afterUpdateCallbacks.push(
(...args) => invoker.dataVisible(...args)
);
} else if (invoker.sentMessage) {
// This method has been sent on this connection (maybe as a resend
// from the last connection, maybe from onReconnect, maybe just very
// quickly before processing the connected message).
//
// We don't need to do anything special to ensure its callbacks get
// called, but we'll count it as a method which is preventing
// reconnect quiescence. (eg, it might be a login method that was run
// from onReconnect, and we don't want to see flicker by seeing a
// logged-out state.)
self._methodsBlockingQuiescence[invoker.methodId] = true;
}
});
}
self._messagesBufferedUntilQuiescence = [];
// If we're not waiting on any methods or subs, we can reset the stores and
// call the callbacks immediately.
if (!self._waitingForQuiescence()) {
if (self._resetStores) {
for (const store of Object.values(self._stores)) {
await store.beginUpdate(0, true);
await store.endUpdate();
}
self._resetStores = false;
}
self._runAfterUpdateCallbacks();
}
}
/**
* @summary Process various data messages from the server
* @param {Object} msg The data message
*/
async _livedata_data(msg) {
const self = this._connection;
if (self._waitingForQuiescence()) {
self._messagesBufferedUntilQuiescence.push(msg);
if (msg.msg === 'nosub') {
delete self._subsBeingRevived[msg.id];
}
if (msg.subs) {
msg.subs.forEach(subId => {
delete self._subsBeingRevived[subId];
});
}
if (msg.methods) {
msg.methods.forEach(methodId => {
delete self._methodsBlockingQuiescence[methodId];
});
}
if (self._waitingForQuiescence()) {
return;
}
// No methods or subs are blocking quiescence!
// We'll now process and all of our buffered messages, reset all stores,
// and apply them all at once.
const bufferedMessages = self._messagesBufferedUntilQuiescence;
for (const bufferedMessage of Object.values(bufferedMessages)) {
await this._processOneDataMessage(
bufferedMessage,
self._bufferedWrites
);
}
self._messagesBufferedUntilQuiescence = [];
} else {
await this._processOneDataMessage(msg, self._bufferedWrites);
}
// Immediately flush writes when:
// 1. Buffering is disabled. Or;
// 2. any non-(added/changed/removed) message arrives.
const standardWrite =
msg.msg === "added" ||
msg.msg === "changed" ||
msg.msg === "removed";
if (self._bufferedWritesInterval === 0 || !standardWrite) {
await self._flushBufferedWrites();
return;
}
if (self._bufferedWritesFlushAt === null) {
self._bufferedWritesFlushAt =
new Date().valueOf() + self._bufferedWritesMaxAge;
} else if (self._bufferedWritesFlushAt < new Date().valueOf()) {
await self._flushBufferedWrites();
return;
}
if (self._bufferedWritesFlushHandle) {
clearTimeout(self._bufferedWritesFlushHandle);
}
self._bufferedWritesFlushHandle = setTimeout(() => {
self._liveDataWritesPromise = self._flushBufferedWrites();
if (Meteor._isPromise(self._liveDataWritesPromise)) {
self._liveDataWritesPromise.finally(
() => (self._liveDataWritesPromise = undefined)
);
}
}, self._bufferedWritesInterval);
}
/**
* @summary Process individual data messages by type
* @private
*/
async _processOneDataMessage(msg, updates) {
const messageType = msg.msg;
switch (messageType) {
case 'added':
await this._connection._process_added(msg, updates);
break;
case 'changed':
this._connection._process_changed(msg, updates);
break;
case 'removed':
this._connection._process_removed(msg, updates);
break;
case 'ready':
this._connection._process_ready(msg, updates);
break;
case 'updated':
this._connection._process_updated(msg, updates);
break;
case 'nosub':
// ignore this
break;
default:
Meteor._debug('discarding unknown livedata data message type', msg);
}
}
/**
* @summary Handle method results arriving from the server
* @param {Object} msg The method result message
*/
async _livedata_result(msg) {
const self = this._connection;
// Lets make sure there are no buffered writes before returning result.
if (!isEmpty(self._bufferedWrites)) {
await self._flushBufferedWrites();
}
// find the outstanding request
// should be O(1) in nearly all realistic use cases
if (isEmpty(self._outstandingMethodBlocks)) {
Meteor._debug('Received method result but no methods outstanding');
return;
}
const currentMethodBlock = self._outstandingMethodBlocks[0].methods;
let i;
const m = currentMethodBlock.find((method, idx) => {
const found = method.methodId === msg.id;
if (found) i = idx;
return found;
});
if (!m) {
Meteor._debug("Can't match method response to original method call", msg);
return;
}
// Remove from current method block. This may leave the block empty, but we
// don't move on to the next block until the callback has been delivered, in
// _outstandingMethodFinished.
currentMethodBlock.splice(i, 1);
if (hasOwn.call(msg, 'error')) {
m.receiveResult(
new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details)
);
} else {
// msg.result may be undefined if the method didn't return a value
m.receiveResult(undefined, msg.result);
}
}
/**
* @summary Handle "nosub" messages arriving from the server
* @param {Object} msg The nosub message
*/
async _livedata_nosub(msg) {
const self = this._connection;
// First pass it through _livedata_data, which only uses it to help get
// towards quiescence.
await this._livedata_data(msg);
// Do the rest of our processing immediately, with no
// buffering-until-quiescence.
// we weren't subbed anyway, or we initiated the unsub.
if (!hasOwn.call(self._subscriptions, msg.id)) {
return;
}
// XXX COMPAT WITH 1.0.3.1 #errorCallback
const errorCallback = self._subscriptions[msg.id].errorCallback;
const stopCallback = self._subscriptions[msg.id].stopCallback;
self._subscriptions[msg.id].remove();
const meteorErrorFromMsg = msgArg => {
return (
msgArg &&
msgArg.error &&
new Meteor.Error(
msgArg.error.error,
msgArg.error.reason,
msgArg.error.details
)
);
};
// XXX COMPAT WITH 1.0.3.1 #errorCallback
if (errorCallback && msg.error) {
errorCallback(meteorErrorFromMsg(msg));
}
if (stopCallback) {
stopCallback(meteorErrorFromMsg(msg));
}
}
/**
* @summary Handle errors from the server
* @param {Object} msg The error message
*/
_livedata_error(msg) {
Meteor._debug('Received error from server: ', msg.reason);
if (msg.offendingMessage) Meteor._debug('For: ', msg.offendingMessage);
}
// Document change message processors will be defined in a separate class
}

View File

@@ -3,7 +3,7 @@
// _methodInvokers map; it removes itself once the method is fully finished and
// the callback is invoked. This occurs when it has both received a result,
// and the data written by it is fully visible.
export default class MethodInvoker {
export class MethodInvoker {
constructor(options) {
// Public (within this file) fields.
this.methodId = options.methodId;

View File

@@ -0,0 +1,7 @@
import { MongoID } from 'meteor/mongo-id';
export class MongoIDMap extends IdMap {
constructor() {
super(MongoID.idStringify, MongoID.idParse);
}
}

View File

@@ -0,0 +1,40 @@
interface ChangeCollector {
[key: string]: any;
}
interface DataEntry {
subscriptionHandle: string;
value: any;
}
export class DummyDocumentView {
private existsIn: Set<string>;
private dataByKey: Map<string, DataEntry[]>;
constructor() {
this.existsIn = new Set<string>(); // set of subscriptionHandle
this.dataByKey = new Map<string, DataEntry[]>(); // key-> [ {subscriptionHandle, value} by precedence]
}
getFields(): Record<string, never> {
return {};
}
clearField(
subscriptionHandle: string,
key: string,
changeCollector: ChangeCollector
): void {
changeCollector[key] = undefined;
}
changeField(
subscriptionHandle: string,
key: string,
value: any,
changeCollector: ChangeCollector,
isAdd?: boolean
): void {
changeCollector[key] = value;
}
}

View File

@@ -1,6 +1,8 @@
import isEmpty from 'lodash.isempty';
import isString from 'lodash.isstring';
import isObject from 'lodash.isobject';
import isString from 'lodash.isstring';
import { SessionCollectionView } from './session_collection_view';
import { SessionDocumentView } from './session_document_view';
DDPServer = {};
@@ -55,33 +57,7 @@ DDPServer.publicationStrategies = publicationStrategies;
// Session and Subscription are file scope. For now, until we freeze
// the interface, Server is package scope (in the future it should be
// exported).
var DummyDocumentView = function () {
var self = this;
self.existsIn = new Set(); // set of subscriptionHandle
self.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence]
};
Object.assign(DummyDocumentView.prototype, {
getFields: function () {
return {}
},
clearField: function (subscriptionHandle, key, changeCollector) {
changeCollector[key] = undefined
},
changeField: function (subscriptionHandle, key, value,
changeCollector, isAdd) {
changeCollector[key] = value
}
});
// Represents a single document in a SessionCollectionView
var SessionDocumentView = function () {
var self = this;
self.existsIn = new Set(); // set of subscriptionHandle
self.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence]
};
DDPServer._SessionDocumentView = SessionDocumentView;
@@ -94,210 +70,9 @@ DDPServer._getCurrentFence = function () {
return currentInvocation ? currentInvocation.fence : undefined;
};
Object.assign(SessionDocumentView.prototype, {
getFields: function () {
var self = this;
var ret = {};
self.dataByKey.forEach(function (precedenceList, key) {
ret[key] = precedenceList[0].value;
});
return ret;
},
clearField: function (subscriptionHandle, key, changeCollector) {
var self = this;
// Publish API ignores _id if present in fields
if (key === "_id")
return;
var precedenceList = self.dataByKey.get(key);
// It's okay to clear fields that didn't exist. No need to throw
// an error.
if (!precedenceList)
return;
var removedValue = undefined;
for (var i = 0; i < precedenceList.length; i++) {
var precedence = precedenceList[i];
if (precedence.subscriptionHandle === subscriptionHandle) {
// The view's value can only change if this subscription is the one that
// used to have precedence.
if (i === 0)
removedValue = precedence.value;
precedenceList.splice(i, 1);
break;
}
}
if (precedenceList.length === 0) {
self.dataByKey.delete(key);
changeCollector[key] = undefined;
} else if (removedValue !== undefined &&
!EJSON.equals(removedValue, precedenceList[0].value)) {
changeCollector[key] = precedenceList[0].value;
}
},
changeField: function (subscriptionHandle, key, value,
changeCollector, isAdd) {
var self = this;
// Publish API ignores _id if present in fields
if (key === "_id")
return;
// Don't share state with the data passed in by the user.
value = EJSON.clone(value);
if (!self.dataByKey.has(key)) {
self.dataByKey.set(key, [{subscriptionHandle: subscriptionHandle,
value: value}]);
changeCollector[key] = value;
return;
}
var precedenceList = self.dataByKey.get(key);
var elt;
if (!isAdd) {
elt = precedenceList.find(function (precedence) {
return precedence.subscriptionHandle === subscriptionHandle;
});
}
if (elt) {
if (elt === precedenceList[0] && !EJSON.equals(value, elt.value)) {
// this subscription is changing the value of this field.
changeCollector[key] = value;
}
elt.value = value;
} else {
// this subscription is newly caring about this field
precedenceList.push({subscriptionHandle: subscriptionHandle, value: value});
}
}
});
/**
* Represents a client's view of a single collection
* @param {String} collectionName Name of the collection it represents
* @param {Object.<String, Function>} sessionCallbacks The callbacks for added, changed, removed
* @class SessionCollectionView
*/
var SessionCollectionView = function (collectionName, sessionCallbacks) {
var self = this;
self.collectionName = collectionName;
self.documents = new Map();
self.callbacks = sessionCallbacks;
};
DDPServer._SessionCollectionView = SessionCollectionView;
Object.assign(SessionCollectionView.prototype, {
isEmpty: function () {
var self = this;
return self.documents.size === 0;
},
diff: function (previous) {
var self = this;
DiffSequence.diffMaps(previous.documents, self.documents, {
both: self.diffDocument.bind(self),
rightOnly: function (id, nowDV) {
self.callbacks.added(self.collectionName, id, nowDV.getFields());
},
leftOnly: function (id, prevDV) {
self.callbacks.removed(self.collectionName, id);
}
});
},
diffDocument: function (id, prevDV, nowDV) {
var self = this;
var fields = {};
DiffSequence.diffObjects(prevDV.getFields(), nowDV.getFields(), {
both: function (key, prev, now) {
if (!EJSON.equals(prev, now))
fields[key] = now;
},
rightOnly: function (key, now) {
fields[key] = now;
},
leftOnly: function(key, prev) {
fields[key] = undefined;
}
});
self.callbacks.changed(self.collectionName, id, fields);
},
added: function (subscriptionHandle, id, fields) {
var self = this;
var docView = self.documents.get(id);
var added = false;
if (!docView) {
added = true;
if (Meteor.server.getPublicationStrategy(this.collectionName).useDummyDocumentView) {
docView = new DummyDocumentView();
} else {
docView = new SessionDocumentView();
}
self.documents.set(id, docView);
}
docView.existsIn.add(subscriptionHandle);
var changeCollector = {};
Object.entries(fields).forEach(function ([key, value]) {
docView.changeField(
subscriptionHandle, key, value, changeCollector, true);
});
if (added)
self.callbacks.added(self.collectionName, id, changeCollector);
else
self.callbacks.changed(self.collectionName, id, changeCollector);
},
changed: function (subscriptionHandle, id, changed) {
var self = this;
var changedResult = {};
var docView = self.documents.get(id);
if (!docView)
throw new Error("Could not find element with id " + id + " to change");
Object.entries(changed).forEach(function ([key, value]) {
if (value === undefined)
docView.clearField(subscriptionHandle, key, changedResult);
else
docView.changeField(subscriptionHandle, key, value, changedResult);
});
self.callbacks.changed(self.collectionName, id, changedResult);
},
removed: function (subscriptionHandle, id) {
var self = this;
var docView = self.documents.get(id);
if (!docView) {
var err = new Error("Removed nonexistent document " + id);
throw err;
}
docView.existsIn.delete(subscriptionHandle);
if (docView.existsIn.size === 0) {
// it is gone from everyone
self.callbacks.removed(self.collectionName, id);
self.documents.delete(id);
} else {
var changed = {};
// remove this subscription from every precedence list
// and record the changes
docView.dataByKey.forEach(function (precedenceList, key) {
docView.clearField(subscriptionHandle, key, changed);
});
self.callbacks.changed(self.collectionName, id, changed);
}
}
});
/******************************************************************************/
/* Session */
/******************************************************************************/
@@ -636,7 +411,7 @@ Object.assign(Session.prototype, {
if (!blocked)
return; // idempotent
blocked = false;
processNext();
setImmediate(processNext);
};
self.server.onMessageHook.each(function (callback) {

View File

@@ -5,7 +5,7 @@ Package.describe({
});
Npm.depends({
"permessage-deflate": "0.1.7",
"permessage-deflate2": "0.1.8",
sockjs: "0.3.24",
"lodash.once": "4.1.1",
"lodash.isempty": "4.4.0",
@@ -23,6 +23,7 @@ Package.onUse(function (api) {
"mongo-id",
"diff-sequence",
"ecmascript",
"typescript",
],
"server"
);

View File

@@ -0,0 +1,140 @@
import { DummyDocumentView } from "./dummy_document_view";
import { SessionDocumentView } from "./session_document_view";
interface SessionCallbacks {
added: (collectionName: string, id: string, fields: Record<string, any>) => void;
changed: (collectionName: string, id: string, fields: Record<string, any>) => void;
removed: (collectionName: string, id: string) => void;
}
type DocumentView = SessionDocumentView | DummyDocumentView;
export class SessionCollectionView {
private readonly collectionName: string;
private readonly documents: Map<string, DocumentView>;
private readonly callbacks: SessionCallbacks;
/**
* Represents a client's view of a single collection
* @param collectionName - Name of the collection it represents
* @param sessionCallbacks - The callbacks for added, changed, removed
*/
constructor(collectionName: string, sessionCallbacks: SessionCallbacks) {
this.collectionName = collectionName;
this.documents = new Map();
this.callbacks = sessionCallbacks;
}
public isEmpty(): boolean {
return this.documents.size === 0;
}
public diff(previous: SessionCollectionView): void {
DiffSequence.diffMaps(previous.documents, this.documents, {
both: this.diffDocument.bind(this),
rightOnly: (id: string, nowDV: DocumentView) => {
this.callbacks.added(this.collectionName, id, nowDV.getFields());
},
leftOnly: (id: string, prevDV: DocumentView) => {
this.callbacks.removed(this.collectionName, id);
}
});
}
private diffDocument(id: string, prevDV: DocumentView, nowDV: DocumentView): void {
const fields: Record<string, any> = {};
DiffSequence.diffObjects(prevDV.getFields(), nowDV.getFields(), {
both: (key: string, prev: any, now: any) => {
if (!EJSON.equals(prev, now)) {
fields[key] = now;
}
},
rightOnly: (key: string, now: any) => {
fields[key] = now;
},
leftOnly: (key: string, prev: any) => {
fields[key] = undefined;
}
});
this.callbacks.changed(this.collectionName, id, fields);
}
public added(subscriptionHandle: string, id: string, fields: Record<string, any>): void {
let docView: DocumentView | undefined = this.documents.get(id);
let added = false;
if (!docView) {
added = true;
if (Meteor.server.getPublicationStrategy(this.collectionName).useDummyDocumentView) {
docView = new DummyDocumentView();
} else {
docView = new SessionDocumentView();
}
this.documents.set(id, docView);
}
docView.existsIn.add(subscriptionHandle);
const changeCollector: Record<string, any> = {};
Object.entries(fields).forEach(([key, value]) => {
docView!.changeField(
subscriptionHandle,
key,
value,
changeCollector,
true
);
});
if (added) {
this.callbacks.added(this.collectionName, id, changeCollector);
} else {
this.callbacks.changed(this.collectionName, id, changeCollector);
}
}
public changed(subscriptionHandle: string, id: string, changed: Record<string, any>): void {
const changedResult: Record<string, any> = {};
const docView = this.documents.get(id);
if (!docView) {
throw new Error(`Could not find element with id ${id} to change`);
}
Object.entries(changed).forEach(([key, value]) => {
if (value === undefined) {
docView.clearField(subscriptionHandle, key, changedResult);
} else {
docView.changeField(subscriptionHandle, key, value, changedResult);
}
});
this.callbacks.changed(this.collectionName, id, changedResult);
}
public removed(subscriptionHandle: string, id: string): void {
const docView = this.documents.get(id);
if (!docView) {
throw new Error(`Removed nonexistent document ${id}`);
}
docView.existsIn.delete(subscriptionHandle);
if (docView.existsIn.size === 0) {
// it is gone from everyone
this.callbacks.removed(this.collectionName, id);
this.documents.delete(id);
} else {
const changed: Record<string, any> = {};
// remove this subscription from every precedence list
// and record the changes
docView.dataByKey.forEach((precedenceList, key) => {
docView.clearField(subscriptionHandle, key, changed);
});
this.callbacks.changed(this.collectionName, id, changed);
}
}
}

View File

@@ -0,0 +1,106 @@
interface PrecedenceItem {
subscriptionHandle: string;
value: any;
}
interface ChangeCollector {
[key: string]: any;
}
export class SessionDocumentView {
private existsIn: Set<string>;
private dataByKey: Map<string, PrecedenceItem[]>;
constructor() {
this.existsIn = new Set(); // set of subscriptionHandle
// Memory Growth
this.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence]
}
getFields(): Record<string, any> {
const ret: Record<string, any> = {};
this.dataByKey.forEach((precedenceList, key) => {
ret[key] = precedenceList[0].value;
});
return ret;
}
clearField(
subscriptionHandle: string,
key: string,
changeCollector: ChangeCollector
): void {
// Publish API ignores _id if present in fields
if (key === "_id") return;
const precedenceList = this.dataByKey.get(key);
// It's okay to clear fields that didn't exist. No need to throw
// an error.
if (!precedenceList) return;
let removedValue: any = undefined;
for (let i = 0; i < precedenceList.length; i++) {
const precedence = precedenceList[i];
if (precedence.subscriptionHandle === subscriptionHandle) {
// The view's value can only change if this subscription is the one that
// used to have precedence.
if (i === 0) removedValue = precedence.value;
precedenceList.splice(i, 1);
break;
}
}
if (precedenceList.length === 0) {
this.dataByKey.delete(key);
changeCollector[key] = undefined;
} else if (
removedValue !== undefined &&
!EJSON.equals(removedValue, precedenceList[0].value)
) {
changeCollector[key] = precedenceList[0].value;
}
}
changeField(
subscriptionHandle: string,
key: string,
value: any,
changeCollector: ChangeCollector,
isAdd: boolean = false
): void {
// Publish API ignores _id if present in fields
if (key === "_id") return;
// Don't share state with the data passed in by the user.
value = EJSON.clone(value);
if (!this.dataByKey.has(key)) {
this.dataByKey.set(key, [
{ subscriptionHandle: subscriptionHandle, value: value },
]);
changeCollector[key] = value;
return;
}
const precedenceList = this.dataByKey.get(key)!;
let elt: PrecedenceItem | undefined;
if (!isAdd) {
elt = precedenceList.find(
(precedence) => precedence.subscriptionHandle === subscriptionHandle
);
}
if (elt) {
if (elt === precedenceList[0] && !EJSON.equals(value, elt.value)) {
// this subscription is changing the value of this field.
changeCollector[key] = value;
}
elt.value = value;
} else {
// this subscription is newly caring about this field
precedenceList.push({ subscriptionHandle: subscriptionHandle, value: value });
}
}
}

View File

@@ -1,4 +1,5 @@
import once from 'lodash.once';
import zlib from 'node:zlib';
// By default, we use the permessage-deflate extension with default
// configuration. If $SERVER_WEBSOCKET_COMPRESSION is set, then it must be valid
@@ -14,12 +15,18 @@ import once from 'lodash.once';
var websocketExtensions = once(function () {
var extensions = [];
var websocketCompressionConfig = process.env.SERVER_WEBSOCKET_COMPRESSION
? JSON.parse(process.env.SERVER_WEBSOCKET_COMPRESSION) : {};
var websocketCompressionConfig = process.env.SERVER_WEBSOCKET_COMPRESSION ?
JSON.parse(process.env.SERVER_WEBSOCKET_COMPRESSION) : {};
if (websocketCompressionConfig) {
extensions.push(Npm.require('permessage-deflate').configure(
websocketCompressionConfig
));
extensions.push(Npm.require('permessage-deflate2').configure({
threshold: 1024,
level: zlib.constants.Z_BEST_SPEED,
memLevel: zlib.constants.Z_MIN_MEMLEVEL,
noContextTakeover: true,
maxWindowBits: zlib.constants.Z_MIN_WINDOWBITS,
...(websocketCompressionConfig || {})
}));
}
return extensions;

View File

@@ -1,7 +1,3 @@
// A write fence collects a group of writes, and provides a callback
// when all of the writes are fully committed and propagated (all
// observers have been notified of the write and acknowledged it.)
//
DDPServer._WriteFence = class {
constructor() {
this.armed = false;
@@ -12,58 +8,49 @@ DDPServer._WriteFence = class {
this.completion_callbacks = [];
}
// Start tracking a write, and return an object to represent it. The
// object has a single method, committed(). This method should be
// called when the write is fully committed and propagated. You can
// continue to add writes to the WriteFence up until it is triggered
// (calls its callbacks because all writes have committed.)
beginWrite() {
if (this.retired)
return { committed: function () {} };
if (this.retired) {
return { committed: () => {} };
}
if (this.fired)
if (this.fired) {
throw new Error("fence has already activated -- too late to add writes");
}
this.outstanding_writes++;
let committed = false;
const _committedFn = async () => {
if (committed)
throw new Error("committed called twice on the same write");
committed = true;
this.outstanding_writes--;
await this._maybeFire();
};
return {
committed: _committedFn,
committed: async () => {
if (committed) {
throw new Error("committed called twice on the same write");
}
committed = true;
this.outstanding_writes--;
await this._maybeFire();
}
};
}
// Arm the fence. Once the fence is armed, and there are no more
// uncommitted writes, it will activate.
arm() {
if (this === DDPServer._getCurrentFence())
if (this === DDPServer._getCurrentFence()) {
throw Error("Can't arm the current fence");
}
this.armed = true;
return this._maybeFire();
}
// Register a function to be called once before firing the fence.
// Callback function can add new writes to the fence, in which case
// it won't fire until those writes are done as well.
onBeforeFire(func) {
if (this.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
if (this.fired) {
throw new Error("fence has already activated -- too late to add a callback");
}
this.before_fire_callbacks.push(func);
}
// Register a function to be called when the fence fires.
onAllCommitted(func) {
if (this.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
if (this.fired) {
throw new Error("fence has already activated -- too late to add a callback");
}
this.completion_callbacks.push(func);
}
@@ -72,56 +59,54 @@ DDPServer._WriteFence = class {
const returnValue = new Promise(r => resolver = r);
this.onAllCommitted(resolver);
await this.arm();
return returnValue;
}
// Convenience function. Arms the fence, then blocks until it fires.
async armAndWait() {
armAndWait() {
return this._armAndWait();
}
async _maybeFire() {
if (this.fired)
if (this.fired) {
throw new Error("write fence already activated?");
if (this.armed && !this.outstanding_writes) {
const invokeCallback = async (func) => {
try {
await func(this);
} catch (err) {
Meteor._debug("exception in write fence callback:", err);
}
};
}
this.outstanding_writes++;
while (this.before_fire_callbacks.length > 0) {
const cb = this.before_fire_callbacks.shift();
await invokeCallback(cb);
}
this.outstanding_writes--;
if (!this.armed || this.outstanding_writes > 0) {
return;
}
if (!this.outstanding_writes) {
this.fired = true;
const callbacks = this.completion_callbacks || [];
this.completion_callbacks = [];
while (callbacks.length > 0) {
const cb = callbacks.shift();
await invokeCallback(cb);
}
const invokeCallback = async (func) => {
try {
await func(this);
} catch (err) {
Meteor._debug("exception in write fence callback:", err);
}
};
this.outstanding_writes++;
// Process all before_fire callbacks in parallel
const beforeCallbacks = [...this.before_fire_callbacks];
this.before_fire_callbacks = [];
await Promise.all(beforeCallbacks.map(cb => invokeCallback(cb)));
this.outstanding_writes--;
if (this.outstanding_writes === 0) {
this.fired = true;
// Process all completion callbacks in parallel
const callbacks = [...this.completion_callbacks];
this.completion_callbacks = [];
await Promise.all(callbacks.map(cb => invokeCallback(cb)));
}
}
// Deactivate this fence so that adding more writes has no effect.
// The fence must have already fired.
retire() {
if (!this.fired)
if (!this.fired) {
throw new Error("Can't retire a fence that hasn't fired.");
}
this.retired = true;
}
};
// The current write fence. When there is a current write fence, code
// that writes to databases should register their writes with it using
// beginWrite().
//
DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable;
DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable;

View File

@@ -24,145 +24,6 @@ FakeDoubleEndedQueue.prototype.isEmpty = function () {
Meteor._DoubleEndedQueue = Meteor.isServer ? Npm.require('denque') : FakeDoubleEndedQueue;
// Meteor._SynchronousQueue is a queue which runs task functions serially.
// Tasks are assumed to be synchronous: ie, it's assumed that they are
// done when they return.
//
// It has two methods:
// - queueTask queues a task to be run, and returns immediately.
// - runTask queues a task to be run, and then yields. It returns
// when the task finishes running.
//
// It's safe to call queueTask from within a task, but not runTask (unless
// you're calling runTask from a nested Fiber).
//
// Somewhat inspired by async.queue, but specific to blocking tasks.
// XXX break this out into an NPM module?
// XXX could maybe use the npm 'schlock' module instead, which would
// also support multiple concurrent "read" tasks
//
function AsynchronousQueue () {
this._taskHandles = new Meteor._DoubleEndedQueue();
this._runningOrRunScheduled = false;
// This is true if we're currently draining. While we're draining, a further
// drain is a noop, to prevent infinite loops. "drain" is a heuristic type
// operation, that has a meaning like unto "what a naive person would expect
// when modifying a table from an observe"
this._draining = false;
}
Object.assign(AsynchronousQueue.prototype, {
queueTask(task) {
const self = this;
self._taskHandles.push({
task: Meteor.bindEnvironment(task, function (e) {
Meteor._debug('Exception from task', e);
throw e;
}),
name: task.name
});
self._scheduleRun();
},
async _scheduleRun() {
// Already running or scheduled? Do nothing.
if (this._runningOrRunScheduled)
return;
this._runningOrRunScheduled = true;
let resolve;
const promise = new Promise(r => resolve = r);
const runImmediateHandle = (fn) => {
if (Meteor.isServer) {
Meteor._runFresh(() => setImmediate(fn))
return;
}
setTimeout(fn, 0);
};
runImmediateHandle(() => {
this._run().finally(resolve);
});
return promise;
},
async _run() {
if (!this._runningOrRunScheduled)
throw new Error("expected to be _runningOrRunScheduled");
if (this._taskHandles.isEmpty()) {
// Done running tasks! Don't immediately schedule another run, but
// allow future tasks to do so.
this._runningOrRunScheduled = false;
return;
}
const taskHandle = this._taskHandles.shift();
let exception;
// Run the task.
try {
await taskHandle.task();
} catch (err) {
if (taskHandle.resolver) {
// We'll throw this exception through runTask.
exception = err;
} else {
Meteor._debug("Exception in queued task", err);
}
}
// Soon, run the next task, if there is any.
this._runningOrRunScheduled = false;
this._scheduleRun();
if (taskHandle.resolver) {
if (exception) {
taskHandle.resolver(null, exception);
} else {
taskHandle.resolver();
}
}
},
async runTask(task) {
let resolver;
const promise = new Promise(
(resolve, reject) =>
(resolver = (res, rej) => {
if (rej) {
reject(rej);
return;
}
resolve(res);
})
);
const handle = {
task,
name: task.name,
resolver,
};
this._taskHandles.push(handle);
await this._scheduleRun();
return promise;
},
flush() {
return this.runTask(() => { });
},
async drain() {
if (this._draining)
return;
this._draining = true;
while (!this._taskHandles.isEmpty()) {
await this.flush();
}
this._draining = false;
}
});
Meteor._AsynchronousQueue = AsynchronousQueue;
// Sleep. Mostly used for debugging (eg, inserting latency into server
// methods).

View File

@@ -0,0 +1,171 @@
class AsynchronousQueue {
/**
* Creates a queue that processes tasks in parallel batches while preserving completion order
* when needed. Configurable batch size and concurrency limits help optimize throughput.
*
* Batch size and concurrency are configured via environment variables:
* - METEOR_ASYNC_QUEUE_BATCH_SIZE: Number of tasks to process in each batch (default: 128)
* - METEOR_ASYNC_QUEUE_MAX_CONCURRENT: Maximum number of concurrent tasks (default: 16)
*
* @param {Object} options
* @param {boolean} [options.orderMatters=true] Whether task completion order should be preserved
*/
constructor({ orderMatters = true } = {}) {
this._batchSize = parseInt(
process.env.METEOR_ASYNC_QUEUE_BATCH_SIZE ||
'128'
);
this._maxConcurrent = parseInt(
process.env.METEOR_ASYNC_QUEUE_MAX_CONCURRENT ||
'16'
);
this._orderMatters = orderMatters;
this._taskHandles = new Meteor._DoubleEndedQueue();
this._runningOrRunScheduled = false;
this._draining = false;
this._activePromises = new Set();
}
queueTask(task) {
const wrappedTask = Meteor.bindEnvironment(task, function (e) {
Meteor._debug('Exception from task', e);
throw e;
});
this._taskHandles.push({
task: wrappedTask,
name: task.name
});
this._scheduleRun();
}
async _scheduleRun() {
if (this._runningOrRunScheduled) return;
this._runningOrRunScheduled = true;
const runImmediateHandle = (fn) => {
if (Meteor.isServer) {
Meteor._runFresh(() => setImmediate(fn));
return;
}
setTimeout(fn, 0);
};
return new Promise(resolve => {
runImmediateHandle(() => {
this._run().finally(resolve);
});
});
}
async _run() {
if (!this._runningOrRunScheduled) {
throw new Error("expected to be _runningOrRunScheduled");
}
if (this._taskHandles.isEmpty()) {
this._runningOrRunScheduled = false;
return;
}
// Collect tasks for the current batch
const batch = [];
while (batch.length < this._batchSize && !this._taskHandles.isEmpty()) {
batch.push(this._taskHandles.shift());
}
// Process batch
if (this._orderMatters) {
await this._processOrderedBatch(batch);
} else {
await this._processParallelBatch(batch);
}
// Schedule next batch if there are more tasks
this._runningOrRunScheduled = false;
if (!this._taskHandles.isEmpty()) {
this._scheduleRun();
}
}
async _processParallelBatch(batch) {
const taskPromises = batch.map(async taskHandle => {
try {
const promise = taskHandle.task();
this._activePromises.add(promise);
const result = await promise;
this._activePromises.delete(promise);
if (taskHandle.resolver) {
taskHandle.resolver(result);
}
} catch (err) {
if (taskHandle.resolver) {
taskHandle.resolver(null, err);
} else {
Meteor._debug("Exception in queued task", err);
}
}
});
// Process in chunks to control concurrency
for (let i = 0; i < taskPromises.length; i += this._maxConcurrent) {
const chunk = taskPromises.slice(i, i + this._maxConcurrent);
await Promise.all(chunk);
}
}
async _processOrderedBatch(batch) {
for (const taskHandle of batch) {
try {
const result = await taskHandle.task();
if (taskHandle.resolver) {
taskHandle.resolver(result);
}
} catch (err) {
if (taskHandle.resolver) {
taskHandle.resolver(null, err);
} else {
Meteor._debug("Exception in queued task", err);
}
}
}
}
async runTask(task) {
return new Promise((resolve, reject) => {
const resolver = (res, err) => err ? reject(err) : resolve(res);
this._taskHandles.push({ task, name: task.name, resolver });
this._scheduleRun();
});
}
flush() {
return this.runTask(() => {});
}
async drain() {
if (this._draining) return;
this._draining = true;
while (!this._taskHandles.isEmpty() || this._activePromises.size > 0) {
await this.flush();
if (this._activePromises.size > 0) {
await Promise.all(Array.from(this._activePromises));
}
}
this._draining = false;
}
}
Meteor._AsynchronousQueue = AsynchronousQueue;
/**
* Backwards compatibility
*/
Meteor._SynchronousQueue = AsynchronousQueue;

View File

@@ -34,6 +34,7 @@ Package.onUse(function (api) {
api.addFiles('timers.js', ['client', 'server']);
api.addFiles('errors.js', ['client', 'server']);
api.addFiles('asl-helpers.js', 'server');
api.addFiles('asynchronous_queue.js', 'server');
api.addFiles('async_helpers.js', ['client', 'server']);
api.addFiles('fiber_stubs_client.js', 'client');
api.addFiles('asl-helpers-client.js', 'client');

View File

@@ -4,24 +4,30 @@ let nextObserveHandleId = 1;
export type ObserveHandleCallbackInternal = '_added' | '_addedBefore' | '_changed' | '_movedBefore' | '_removed';
export type Callback<T = any> = (...args: T[]) => Promise<void> | void;
/**
* The "observe handle" returned from observeChanges.
* Contains a reference to an ObserveMultiplexer.
* Used to stop observation and clean up resources.
*/
export class ObserveHandle {
export class ObserveHandle<T = any> {
_id: number;
_multiplexer: ObserveMultiplexer;
nonMutatingCallbacks: boolean;
_stopped: boolean;
_added?: (...args: any[]) => void;
_addedBefore?: (...args: any[]) => void;
_changed?: (...args: any[]) => void;
_movedBefore?: (...args: any[]) => void;
_removed?: (...args: any[]) => void;
public initialAddsSentResolver: (value: void) => void = () => {};
public initialAddsSent: Promise<void>
constructor(multiplexer: any, callbacks: Record<ObserveHandleCallback, any>, nonMutatingCallbacks: boolean) {
_added?: Callback<T>;
_addedBefore?: Callback<T>;
_changed?: Callback<T>;
_movedBefore?: Callback<T>;
_removed?: Callback<T>;
constructor(multiplexer: any, callbacks: Record<ObserveHandleCallback, Callback<T>>, nonMutatingCallbacks: boolean) {
this._multiplexer = multiplexer;
multiplexer.callbackNames().forEach((name: ObserveHandleCallback) => {
@@ -40,6 +46,20 @@ export class ObserveHandle {
this._stopped = false;
this._id = nextObserveHandleId++;
this.nonMutatingCallbacks = nonMutatingCallbacks;
this.initialAddsSent = new Promise(resolve => {
const ready = () => {
resolve();
this.initialAddsSent = Promise.resolve();
}
const timeout = setTimeout(ready, 30000)
this.initialAddsSentResolver = () => {
ready();
clearTimeout(timeout);
};
});
}
async stop() {

View File

@@ -1,5 +1,5 @@
import { ObserveHandle } from './observe_handle';
import isEmpty from 'lodash.isempty';
import { ObserveHandle } from './observe_handle';
interface ObserveMultiplexerOptions {
ordered: boolean;
@@ -34,7 +34,6 @@ export class ObserveMultiplexer {
this._ordered = ordered;
this._onStop = onStop;
// @ts-ignore
this._queue = new Meteor._AsynchronousQueue();
this._handles = {};
this._resolver = null;
@@ -140,7 +139,7 @@ export class ObserveMultiplexer {
return !!this._isReady;
}
_applyCallback(callbackName: string, args: any[]): void {
_applyCallback(callbackName: string, args: any[]) {
this._queue.queueTask(async () => {
if (!this._handles) return;
@@ -152,31 +151,43 @@ export class ObserveMultiplexer {
for (const handleId of Object.keys(this._handles)) {
const handle = this._handles && this._handles[handleId];
if (!handle) return;
const callback = (handle as any)[`_${callbackName}`];
callback && (await callback.apply(
if (!callback) continue;
handle.initialAddsSent.then(callback.apply(
null,
handle.nonMutatingCallbacks ? args : EJSON.clone(args)
));
))
}
});
}
async _sendAdds(handle: ObserveHandle): Promise<void> {
const add = this._ordered ? handle._addedBefore : handle._added;
if (!add) return;
await this._cache.docs.forEachAsync(async (doc: any, id: string) => {
if (!(handle._id in this._handles!))
const addPromises: Promise<void>[] = [];
this._cache.docs.forEach((doc: any, id: string) => {
if (!(handle._id in this._handles!)) {
throw Error("handle got removed before sending initial adds!");
}
const { _id, ...fields } = handle.nonMutatingCallbacks ? doc : EJSON.clone(doc);
if (this._ordered)
await add(id, fields, null);
else
await add(id, fields);
const promise = this._ordered ?
add(id, fields, null) :
add(id, fields);
addPromises.push(promise);
});
await Promise.all(addPromises);
handle.initialAddsSentResolver();
}
}

View File

@@ -87,6 +87,7 @@ export const OplogObserveDriver = function (options) {
self._comparator = null;
self._sorter = null;
self._unpublishedBuffer = null;
// Memory Growth
self._published = new LocalCollection._IdMap;
}

View File

@@ -11,7 +11,7 @@ export const OPLOG_COLLECTION = 'oplog.rs';
let TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000);
const TAIL_TIMEOUT = +(process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000);
interface OplogEntry {
export interface OplogEntry {
op: string;
o: any;
o2?: any;
@@ -19,12 +19,12 @@ interface OplogEntry {
ns: string;
}
interface CatchingUpResolver {
export interface CatchingUpResolver {
ts: any;
resolver: () => void;
}
interface OplogTrigger {
export interface OplogTrigger {
dropCollection: boolean;
dropDatabase: boolean;
op: OplogEntry;
@@ -34,7 +34,7 @@ interface OplogTrigger {
export class OplogHandle {
private _oplogUrl: string;
private _dbName: string;
public _dbName: string;
private _oplogLastEntryConnection: MongoConnection | null;
private _oplogTailConnection: MongoConnection | null;
private _oplogOptions: { excludeCollections?: string[]; includeCollections?: string[] } | null;
@@ -42,16 +42,18 @@ export class OplogHandle {
private _tailHandle: any;
private _readyPromiseResolver: (() => void) | null;
private _readyPromise: Promise<void>;
private _crossbar: any;
public _crossbar: any;
private _baseOplogSelector: any;
private _catchingUpResolvers: CatchingUpResolver[];
private _lastProcessedTS: any;
private _onSkippedEntriesHook: any;
private _entryQueue: any;
private _workerActive: boolean;
private _startTrailingPromise: Promise<void>;
private _resolveTimeout: any;
private _entryQueue = new Meteor._DoubleEndedQueue();
private _workerActive = false;
private _workerPromise: Promise<void> | null = null;
constructor(oplogUrl: string, dbName: string) {
this._oplogUrl = oplogUrl;
this._dbName = dbName;
@@ -90,10 +92,6 @@ export class OplogHandle {
debugPrintExceptions: "onSkippedEntries callback"
});
// @ts-ignore
this._entryQueue = new Meteor._DoubleEndedQueue();
this._workerActive = false;
this._startTrailingPromise = this._startTailing();
}
@@ -298,64 +296,11 @@ export class OplogHandle {
}
private _maybeStartWorker(): void {
if (this._workerActive) return;
if (this._workerPromise) return;
this._workerActive = true;
Meteor.defer(async () => {
// May be called recursively in case of transactions.
const handleDoc = async (doc: OplogEntry): Promise<void> => {
if (doc.ns === "admin.$cmd") {
if (doc.o.applyOps) {
// This was a successful transaction, so we need to apply the
// operations that were involved.
let nextTimestamp = doc.ts;
for (const op of doc.o.applyOps) {
// See https://github.com/meteor/meteor/issues/10420.
if (!op.ts) {
op.ts = nextTimestamp;
nextTimestamp = nextTimestamp.add(Long.ONE);
}
await handleDoc(op);
}
return;
}
throw new Error("Unknown command " + JSON.stringify(doc));
}
const trigger: OplogTrigger = {
dropCollection: false,
dropDatabase: false,
op: doc,
};
if (typeof doc.ns === "string" && doc.ns.startsWith(this._dbName + ".")) {
trigger.collection = doc.ns.slice(this._dbName.length + 1);
}
// Is it a special command and the collection name is hidden
// somewhere in operator?
if (trigger.collection === "$cmd") {
if (doc.o.dropDatabase) {
delete trigger.collection;
trigger.dropDatabase = true;
} else if ("drop" in doc.o) {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else if ("create" in doc.o && "idIndex" in doc.o) {
// A collection got implicitly created within a transaction. There's
// no need to do anything about it.
} else {
throw Error("Unknown command " + JSON.stringify(doc));
}
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
await this._crossbar.fire(trigger);
};
// Convert to a proper promise-based queue processor
this._workerPromise = (async () => {
try {
while (!this._stopped && !this._entryQueue.isEmpty()) {
// Are we too far behind? Just tell our observers that they need to
@@ -375,23 +320,25 @@ export class OplogHandle {
continue;
}
// Process next batch from the queue
const doc = this._entryQueue.shift();
// Fire trigger(s) for this doc.
await handleDoc(doc);
// Now that we've processed this operation, process pending
// sequencers.
if (doc.ts) {
this._setLastProcessedTS(doc.ts);
} else {
throw Error("oplog entry without ts: " + JSON.stringify(doc));
try {
await handleDoc(this, doc);
// Process any waiting fence callbacks
if (doc.ts) {
this._setLastProcessedTS(doc.ts);
}
} catch (e) {
// Keep processing queue even if one entry fails
console.error('Error processing oplog entry:', e);
}
}
} finally {
this._workerPromise = null;
this._workerActive = false;
}
});
})();
}
_setLastProcessedTS(ts: any): void {
@@ -422,3 +369,58 @@ export function idForOp(op: OplogEntry): string {
throw Error("Unknown op: " + JSON.stringify(op));
}
}
async function handleDoc(handle: OplogHandle, doc: OplogEntry): Promise<void> {
if (doc.ns === "admin.$cmd") {
if (doc.o.applyOps) {
// This was a successful transaction, so we need to apply the
// operations that were involved.
let nextTimestamp = doc.ts;
for (const op of doc.o.applyOps) {
// See https://github.com/meteor/meteor/issues/10420.
if (!op.ts) {
op.ts = nextTimestamp;
nextTimestamp = nextTimestamp.add(Long.ONE);
}
await handleDoc(handle, op);
}
return;
}
throw new Error("Unknown command " + JSON.stringify(doc));
}
const trigger: OplogTrigger = {
dropCollection: false,
dropDatabase: false,
op: doc,
};
if (typeof doc.ns === "string" && doc.ns.startsWith(handle._dbName + ".")) {
trigger.collection = doc.ns.slice(handle._dbName.length + 1);
}
// Is it a special command and the collection name is hidden
// somewhere in operator?
if (trigger.collection === "$cmd") {
if (doc.o.dropDatabase) {
delete trigger.collection;
trigger.dropDatabase = true;
} else if ("drop" in doc.o) {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else if ("create" in doc.o && "idIndex" in doc.o) {
// A collection got implicitly created within a transaction. There's
// no need to do anything about it.
} else {
throw Error("Unknown command " + JSON.stringify(doc));
}
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
await handle._crossbar.fire(trigger);
await new Promise(resolve => setImmediate(resolve));
}

View File

@@ -1,127 +0,0 @@
// Converter of the new MongoDB Oplog format (>=5.0) to the one that Meteor
// handles well, i.e., `$set` and `$unset`. The new format is completely new,
// and looks as follows:
//
// { $v: 2, diff: Diff }
//
// where `Diff` is a recursive structure:
//
// {
// // Nested updates (sometimes also represented with an s-field).
// // Example: `{ $set: { 'foo.bar': 1 } }`.
// i: { <key>: <value>, ... },
//
// // Top-level updates.
// // Example: `{ $set: { foo: { bar: 1 } } }`.
// u: { <key>: <value>, ... },
//
// // Unsets.
// // Example: `{ $unset: { foo: '' } }`.
// d: { <key>: false, ... },
//
// // Array operations.
// // Example: `{ $push: { foo: 'bar' } }`.
// s<key>: { a: true, u<index>: <value>, ... },
// ...
//
// // Nested operations (sometimes also represented in the `i` field).
// // Example: `{ $set: { 'foo.bar': 1 } }`.
// s<key>: Diff,
// ...
// }
//
// (all fields are optional).
function join(prefix, key) {
return prefix ? `${prefix}.${key}` : key;
}
const arrayOperatorKeyRegex = /^(a|[su]\d+)$/;
function isArrayOperatorKey(field) {
return arrayOperatorKeyRegex.test(field);
}
function isArrayOperator(operator) {
return operator.a === true && Object.keys(operator).every(isArrayOperatorKey);
}
function flattenObjectInto(target, source, prefix) {
if (Array.isArray(source) || typeof source !== 'object' || source === null ||
source instanceof Mongo.ObjectID) {
target[prefix] = source;
} else {
const entries = Object.entries(source);
if (entries.length) {
entries.forEach(([key, value]) => {
flattenObjectInto(target, value, join(prefix, key));
});
} else {
target[prefix] = source;
}
}
}
const logDebugMessages = !!process.env.OPLOG_CONVERTER_DEBUG;
function convertOplogDiff(oplogEntry, diff, prefix) {
if (logDebugMessages) {
console.log(`convertOplogDiff(${JSON.stringify(oplogEntry)}, ${JSON.stringify(diff)}, ${JSON.stringify(prefix)})`);
}
Object.entries(diff).forEach(([diffKey, value]) => {
if (diffKey === 'd') {
// Handle `$unset`s.
oplogEntry.$unset ??= {};
Object.keys(value).forEach(key => {
oplogEntry.$unset[join(prefix, key)] = true;
});
} else if (diffKey === 'i') {
// Handle (potentially) nested `$set`s.
oplogEntry.$set ??= {};
flattenObjectInto(oplogEntry.$set, value, prefix);
} else if (diffKey === 'u') {
// Handle flat `$set`s.
oplogEntry.$set ??= {};
Object.entries(value).forEach(([key, value]) => {
oplogEntry.$set[join(prefix, key)] = value;
});
} else {
// Handle s-fields.
const key = diffKey.slice(1);
if (isArrayOperator(value)) {
// Array operator.
Object.entries(value).forEach(([position, value]) => {
if (position === 'a') {
return;
}
const positionKey = join(join(prefix, key), position.slice(1));
if (position[0] === 's') {
convertOplogDiff(oplogEntry, value, positionKey);
} else if (value === null) {
oplogEntry.$unset ??= {};
oplogEntry.$unset[positionKey] = true;
} else {
oplogEntry.$set ??= {};
oplogEntry.$set[positionKey] = value;
}
});
} else if (key) {
// Nested object.
convertOplogDiff(oplogEntry, value, join(prefix, key));
}
}
});
}
export function oplogV2V1Converter(oplogEntry) {
// Pass-through v1 and (probably) invalid entries.
if (oplogEntry.$v !== 2 || !oplogEntry.diff) {
return oplogEntry;
}
const convertedOplogEntry = { $v: 2 };
convertOplogDiff(convertedOplogEntry, oplogEntry.diff, '');
return convertedOplogEntry;
}

View File

@@ -0,0 +1,204 @@
/**
* Converter module for the new MongoDB Oplog format (>=5.0) to the one that Meteor
* handles well, i.e., `$set` and `$unset`. The new format is completely new,
* and looks as follows:
*
* ```js
* { $v: 2, diff: Diff }
* ```
*
* where `Diff` is a recursive structure:
* ```js
* {
* // Nested updates (sometimes also represented with an s-field).
* // Example: `{ $set: { 'foo.bar': 1 } }`.
* i: { <key>: <value>, ... },
*
* // Top-level updates.
* // Example: `{ $set: { foo: { bar: 1 } } }`.
* u: { <key>: <value>, ... },
*
* // Unsets.
* // Example: `{ $unset: { foo: '' } }`.
* d: { <key>: false, ... },
*
* // Array operations.
* // Example: `{ $push: { foo: 'bar' } }`.
* s<key>: { a: true, u<index>: <value>, ... },
* ...
*
* // Nested operations (sometimes also represented in the `i` field).
* // Example: `{ $set: { 'foo.bar': 1 } }`.
* s<key>: Diff,
* ...
* }
* ```
*
* (all fields are optional)
*/
import { EJSON } from 'meteor/ejson';
interface OplogEntry {
$v: number;
diff?: OplogDiff;
$set?: Record<string, any>;
$unset?: Record<string, true>;
}
interface OplogDiff {
i?: Record<string, any>;
u?: Record<string, any>;
d?: Record<string, boolean>;
[key: `s${string}`]: ArrayOperator | Record<string, any>;
}
interface ArrayOperator {
a: true;
[key: `u${number}`]: any;
}
const arrayOperatorKeyRegex = /^(a|[su]\d+)$/;
/**
* Checks if a field is an array operator key of form 'a' or 's1' or 'u1' etc
*/
function isArrayOperatorKey(field: string): boolean {
return arrayOperatorKeyRegex.test(field);
}
/**
* Type guard to check if an operator is a valid array operator.
* Array operators have 'a: true' and keys that match the arrayOperatorKeyRegex
*/
function isArrayOperator(operator: unknown): operator is ArrayOperator {
return (
operator !== null &&
typeof operator === 'object' &&
'a' in operator &&
(operator as ArrayOperator).a === true &&
Object.keys(operator).every(isArrayOperatorKey)
);
}
/**
* Joins two parts of a field path with a dot.
* Returns the key itself if prefix is empty.
*/
function join(prefix: string, key: string): string {
return prefix ? `${prefix}.${key}` : key;
}
/**
* Recursively flattens an object into a target object with dot notation paths.
* Handles special cases:
* - Arrays are assigned directly
* - Custom EJSON types are preserved
* - Mongo.ObjectIDs are preserved
* - Plain objects are recursively flattened
* - Empty objects are assigned directly
*/
function flattenObjectInto(
target: Record<string, any>,
source: any,
prefix: string
): void {
if (
Array.isArray(source) ||
typeof source !== 'object' ||
source === null ||
source instanceof Mongo.ObjectID ||
EJSON._isCustomType(source)
) {
target[prefix] = source;
return;
}
const entries = Object.entries(source);
if (entries.length) {
entries.forEach(([key, value]) => {
flattenObjectInto(target, value, join(prefix, key));
});
} else {
target[prefix] = source;
}
}
/**
* Converts an oplog diff to a series of $set and $unset operations.
* Handles several types of operations:
* - Direct unsets via 'd' field
* - Nested sets via 'i' field
* - Top-level sets via 'u' field
* - Array operations and nested objects via 's' prefixed fields
*
* Preserves the structure of EJSON custom types and ObjectIDs while
* flattening paths into dot notation for MongoDB updates.
*/
function convertOplogDiff(
oplogEntry: OplogEntry,
diff: OplogDiff,
prefix = ''
): void {
Object.entries(diff).forEach(([diffKey, value]) => {
if (diffKey === 'd') {
// Handle `$unset`s
oplogEntry.$unset ??= {};
Object.keys(value).forEach(key => {
oplogEntry.$unset![join(prefix, key)] = true;
});
} else if (diffKey === 'i') {
// Handle (potentially) nested `$set`s
oplogEntry.$set ??= {};
flattenObjectInto(oplogEntry.$set, value, prefix);
} else if (diffKey === 'u') {
// Handle flat `$set`s
oplogEntry.$set ??= {};
Object.entries(value).forEach(([key, fieldValue]) => {
oplogEntry.$set![join(prefix, key)] = fieldValue;
});
} else if (diffKey.startsWith('s')) {
// Handle s-fields (array operations and nested objects)
const key = diffKey.slice(1);
if (isArrayOperator(value)) {
// Array operator
Object.entries(value).forEach(([position, fieldValue]) => {
if (position === 'a') return;
const positionKey = join(prefix, `${key}.${position.slice(1)}`);
if (position[0] === 's') {
convertOplogDiff(oplogEntry, fieldValue, positionKey);
} else if (fieldValue === null) {
oplogEntry.$unset ??= {};
oplogEntry.$unset[positionKey] = true;
} else {
oplogEntry.$set ??= {};
oplogEntry.$set[positionKey] = fieldValue;
}
});
} else if (key) {
// Nested object
convertOplogDiff(oplogEntry, value, join(prefix, key));
}
}
});
}
/**
* Converts a MongoDB v2 oplog entry to v1 format.
* Returns the original entry unchanged if it's not a v2 oplog entry
* or doesn't contain a diff field.
*
* The converted entry will contain $set and $unset operations that are
* equivalent to the v2 diff format, with paths flattened to dot notation
* and special handling for EJSON custom types and ObjectIDs.
*/
export function oplogV2V1Converter(oplogEntry: OplogEntry): OplogEntry {
if (oplogEntry.$v !== 2 || !oplogEntry.diff) {
return oplogEntry;
}
const convertedOplogEntry: OplogEntry = { $v: 2 };
convertOplogDiff(convertedOplogEntry, oplogEntry.diff);
return convertedOplogEntry;
}

View File

@@ -89,7 +89,7 @@ Package.onUse(function (api) {
"doc_fetcher.js",
"polling_observe_driver.ts",
"oplog_observe_driver.js",
"oplog_v2_converter.js",
"oplog_v2_converter.ts",
"cursor_description.ts",
"mongo_connection.js",
"mongo_common.js",

View File

@@ -3,12 +3,12 @@
Package.describe({
summary: "Wrapper around the mongo npm package",
version: "6.10.0",
version: "6.10.1",
documentation: null,
});
Npm.depends({
mongodb: "6.10.0"
mongodb: "6.9.0"
});
Package.onUse(function (api) {

View File

@@ -1,6 +1,6 @@
import { Meteor } from "meteor/meteor";
import { toWebsocketUrl } from "./urls.js";
import { StreamClientCommon } from "./common.js";
import { toWebsocketUrl } from "./urls.js";
// @param endpoint {String} URL to Meteor app
// "http://subdomain.meteor.com/" or "/" or
@@ -132,7 +132,7 @@ export class ClientStream extends StreamClientCommon {
// require the module if we actually create a server-to-server
// connection.
var FayeWebSocket = Npm.require('faye-websocket');
var deflate = Npm.require('permessage-deflate');
var deflate = Npm.require('permessage-deflate2');
var targetUrl = toWebsocketUrl(this.endpoint);
var fayeOptions = {

View File

@@ -7,7 +7,7 @@ Package.describe({
Npm.depends({
"faye-websocket": "0.11.4",
"permessage-deflate": "0.1.7",
"permessage-deflate2": "0.1.8",
"lodash.isequal": "4.5.0",
"lodash.once": "4.1.1"
});

View File

@@ -34,6 +34,7 @@ const packages = {
// Ignored server files that has a features > 2016
ignoredFiles: [
"async_helpers.js",
"asynchronous_queue.js",
]
},
"accounts-ui": {},

View File

@@ -431,6 +431,15 @@ export default defineConfig({
],
collapsed: true,
},
{
text: "Performance",
items: [
{
text: "WebSocket Compression",
link: "/performance/websocket-compression",
},
],
},
],
socialLinks: [

View File

@@ -0,0 +1,104 @@
# Websocket Compression in Meteor
::: warning
Modifying websocket compression settings without understanding your application's DDP messaging patterns can negatively impact performance. Before changing these settings, you should:
- Use [Meteor DevTools Evolved](https://chromewebstore.google.com/detail/meteor-devtools-evolved/ibniinmoafhgbifjojidlagmggecmpgf) or your browser's Network tab to monitor WebSocket traffic
- Analyze your DDP message frequency and payload sizes
- Test changes in a staging environment with realistic data and user load
:::
Meteor's stream server uses the permessage-deflate extension for websocket compression by default. While compression can help reduce bandwidth usage, it may impact performance in reactivity-intensive applications due to the computational overhead of compressing numerous DDP messages.
## Configuration
### Disabling Compression
You can disable websocket compression by setting the `SERVER_WEBSOCKET_COMPRESSION` environment variable to `false`:
```bash
SERVER_WEBSOCKET_COMPRESSION=false
```
### Custom Compression Settings
To customize compression settings, set `SERVER_WEBSOCKET_COMPRESSION` to a JSON string with your desired configuration:
```bash
# Example with custom settings
SERVER_WEBSOCKET_COMPRESSION='{"threshold": 2048, "level": 1}'
```
Available configuration options:
- `threshold`: Minimum message size (in bytes) before compression is applied (default: 1024)
- `level`: Compression level (0-9, where 0=none, 1=fastest, 9=best compression)
- `memLevel`: Memory level (1-9, lower uses less memory)
- `noContextTakeover`: When true, compressor resets for each message (default: true)
- `maxWindowBits`: Window size for compression (9-15, lower uses less memory)
## Configuration Examples
Here are recommended configurations for different types of applications:
### High-Frequency Updates / Real-Time Dashboard
For applications with frequent small updates (e.g., real-time dashboards, trading platforms):
```bash
# Disable compression for optimal performance with small, frequent updates
SERVER_WEBSOCKET_COMPRESSION=false
```
### Large Data Transfers
For applications transferring large datasets (e.g., file sharing, data visualization):
```bash
# Optimize for large data transfers
SERVER_WEBSOCKET_COMPRESSION='{"threshold": 1024, "level": 6, "memLevel": 8}'
```
### Memory-Constrained Environment
For deployments with limited memory resources:
```bash
# Minimize memory usage while maintaining compression
SERVER_WEBSOCKET_COMPRESSION='{"threshold": 2048, "level": 1, "memLevel": 1, "maxWindowBits": 9}'
```
### Balanced Configuration
For typical applications with mixed message sizes:
```bash
# Balance between compression and performance
SERVER_WEBSOCKET_COMPRESSION='{"threshold": 1536, "level": 3, "memLevel": 4}'
```
## Verifying Compression Status
You can check if compression is enabled through the Meteor shell:
```javascript
Meteor.server.stream_server.server.options.faye_server_options.extensions
```
Results interpretation:
- `[]` (empty array): Compression is disabled
- `[{}]` (array with object): Compression is enabled
## Performance Considerations
- For apps with high message throughput or frequent small updates, disabling compression may improve performance
- Large message payloads may benefit from compression, especially over slower network connections
- Consider monitoring CPU usage and response times when adjusting compression settings
## Default Configuration
When enabled, the default configuration uses:
- Compression threshold: 1024 bytes
- Compression level: Z_BEST_SPEED (fastest)
- Memory level: Z_MIN_MEMLEVEL (minimum memory usage)
- Context takeover: Disabled
- Window bits: Z_MIN_WINDOWBITS (minimum window size)