diff --git a/.envrc b/.envrc index 9cbaa921fd..e023176dc5 100644 --- a/.envrc +++ b/.envrc @@ -56,6 +56,22 @@ function @docs-migration-start { npm run docs:dev --prefix "$ROOT_DIR/v3-docs/v3-migration-docs" } +function @get-changes { + git diff --numstat HEAD~1 HEAD | awk '($1 + $2) <= 5000 {print $3}' +} + +function @summarize-changes { + changes=$(@get-changes) + + if [ -n "$changes" ]; then + changes=$(git diff HEAD~1 HEAD -- $(echo "$changes" | tr '\n' ' ')) + else + changes=$(git diff HEAD~1 HEAD) + fi + + echo "$changes" | llm -s "Summarize the following changes in a few sentences:" +} + function @packages-bumped { git diff --name-only devel...$(git branch --show-current) | grep "packages/.*/package.js$" | while IFS= read -r file; do if ! git show devel:$file > /dev/null 2>&1; then diff --git a/.github/workflows/check-syntax.yml b/.github/workflows/check-syntax.yml index 90aa5285bd..a20bb011ca 100644 --- a/.github/workflows/check-syntax.yml +++ b/.github/workflows/check-syntax.yml @@ -1,6 +1,5 @@ name: Check legacy syntax on: - - push - pull_request jobs: check-code-style: diff --git a/package-lock.json b/package-lock.json index c9d647ebc5..2f50dcb896 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,8 @@ "@babel/preset-react": "^7.18.6", "@types/lodash.isempty": "^4.4.9", "@types/node": "^18.16.18", + "@types/sockjs": "^0.3.36", + "@types/sockjs-client": "^1.5.4", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "eslint": "^8.36.0", @@ -1127,6 +1129,21 @@ "integrity": "sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==", "dev": true }, + "node_modules/@types/sockjs": { + "version": "0.3.36", + "resolved": "https://registry.npmjs.org/@types/sockjs/-/sockjs-0.3.36.tgz", + "integrity": "sha512-MK9V6NzAS1+Ud7JV9lJLFqW85VbC9dq3LmwZCuBe4wBDgKC0Kj/jd8Xl+nSviU+Qc3+m7umHHyHg//2KSa0a0Q==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/sockjs-client": { + "version": "1.5.4", + "resolved": "https://registry.npmjs.org/@types/sockjs-client/-/sockjs-client-1.5.4.tgz", + "integrity": "sha512-zk+uFZeWyvJ5ZFkLIwoGA/DfJ+pYzcZ8eH4H/EILCm2OBZyHH6Hkdna1/UWL/CFruh5wj6ES7g75SvUB0VsH5w==", + "dev": true + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "5.62.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.62.0.tgz", diff --git a/package.json b/package.json index 5b0390a9b6..a21df794cd 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,8 @@ "@babel/preset-react": "^7.18.6", "@types/lodash.isempty": "^4.4.9", "@types/node": "^18.16.18", + "@types/sockjs": "^0.3.36", + "@types/sockjs-client": "^1.5.4", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "eslint": "^8.36.0", diff --git a/packages/ddp-client/client/client_convenience.js b/packages/ddp-client/client/client_convenience.js index 78a1ff7473..052a3087bb 100644 --- a/packages/ddp-client/client/client_convenience.js +++ b/packages/ddp-client/client/client_convenience.js @@ -1,6 +1,6 @@ import { DDP } from '../common/namespace.js'; import { Meteor } from 'meteor/meteor'; -import { loadAsyncStubHelpers } from "./queueStubsHelpers"; +import { loadAsyncStubHelpers } from "./queue_stub_helpers"; // Meteor.refresh can be called on the client (if you're in common code) but it // only has an effect on the server. diff --git a/packages/ddp-client/client/queueStubsHelpers.js b/packages/ddp-client/client/queue_stub_helpers.js similarity index 100% rename from packages/ddp-client/client/queueStubsHelpers.js rename to packages/ddp-client/client/queue_stub_helpers.js diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js new file mode 100644 index 0000000000..bfef109ff0 --- /dev/null +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -0,0 +1,202 @@ +import { DDPCommon } from 'meteor/ddp-common'; +import { Meteor } from 'meteor/meteor'; + +export class ConnectionStreamHandlers { + constructor(connection) { + this._connection = connection; + } + + /** + * Handles incoming raw messages from the DDP stream + * @param {String} raw_msg The raw message received from the stream + */ + async onMessage(raw_msg) { + let msg; + try { + msg = DDPCommon.parseDDP(raw_msg); + } catch (e) { + Meteor._debug('Exception while parsing DDP', e); + return; + } + + // Any message counts as receiving a pong, as it demonstrates that + // the server is still alive. + if (this._connection._heartbeat) { + this._connection._heartbeat.messageReceived(); + } + + if (msg === null || !msg.msg) { + if(!msg || !msg.testMessageOnConnect) { + if (Object.keys(msg).length === 1 && msg.server_id) return; + Meteor._debug('discarding invalid livedata message', msg); + } + return; + } + + // Important: This was missing from previous version + // We need to set the current version before routing the message + if (msg.msg === 'connected') { + this._connection._version = this._connection._versionSuggestion; + } + + await this._routeMessage(msg); + } + + /** + * Routes messages to their appropriate handlers based on message type + * @private + * @param {Object} msg The parsed DDP message + */ + async _routeMessage(msg) { + switch (msg.msg) { + case 'connected': + await this._connection._livedata_connected(msg); + this._connection.options.onConnected(); + break; + + case 'failed': + await this._handleFailedMessage(msg); + break; + + case 'ping': + if (this._connection.options.respondToPings) { + this._connection._send({ msg: 'pong', id: msg.id }); + } + break; + + case 'pong': + // noop, as we assume everything's a pong + break; + + case 'added': + case 'changed': + case 'removed': + case 'ready': + case 'updated': + await this._connection._livedata_data(msg); + break; + + case 'nosub': + await this._connection._livedata_nosub(msg); + break; + + case 'result': + await this._connection._livedata_result(msg); + break; + + case 'error': + this._connection._livedata_error(msg); + break; + + default: + Meteor._debug('discarding unknown livedata message type', msg); + } + } + + /** + * Handles failed connection messages + * @private + * @param {Object} msg The failed message object + */ + _handleFailedMessage(msg) { + if (this._connection._supportedDDPVersions.indexOf(msg.version) >= 0) { + this._connection._versionSuggestion = msg.version; + this._connection._stream.reconnect({ _force: true }); + } else { + const description = + 'DDP version negotiation failed; server requested version ' + + msg.version; + this._connection._stream.disconnect({ _permanent: true, _error: description }); + this._connection.options.onDDPVersionNegotiationFailure(description); + } + } + + /** + * Handles connection reset events + */ + onReset() { + // Reset is called even on the first connection, so this is + // the only place we send this message. + const msg = this._buildConnectMessage(); + this._connection._send(msg); + + // Mark non-retry calls as failed and handle outstanding methods + this._handleOutstandingMethodsOnReset(); + + // Now, to minimize setup latency, go ahead and blast out all of + // our pending methods ands subscriptions before we've even taken + // the necessary RTT to know if we successfully reconnected. + this._connection._callOnReconnectAndSendAppropriateOutstandingMethods(); + this._resendSubscriptions(); + } + + /** + * Builds the initial connect message + * @private + * @returns {Object} The connect message object + */ + _buildConnectMessage() { + const msg = { msg: 'connect' }; + if (this._connection._lastSessionId) { + msg.session = this._connection._lastSessionId; + } + msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0]; + this._connection._versionSuggestion = msg.version; + msg.support = this._connection._supportedDDPVersions; + return msg; + } + + /** + * Handles outstanding methods during a reset + * @private + */ + _handleOutstandingMethodsOnReset() { + const blocks = this._connection._outstandingMethodBlocks; + if (blocks.length === 0) return; + + const currentMethodBlock = blocks[0].methods; + blocks[0].methods = currentMethodBlock.filter( + methodInvoker => { + // Methods with 'noRetry' option set are not allowed to re-send after + // recovering dropped connection. + if (methodInvoker.sentMessage && methodInvoker.noRetry) { + methodInvoker.receiveResult( + new Meteor.Error( + 'invocation-failed', + 'Method invocation might have failed due to dropped connection. ' + + 'Failing because `noRetry` option was passed to Meteor.apply.' + ) + ); + } + + // Only keep a method if it wasn't sent or it's allowed to retry. + return !(methodInvoker.sentMessage && methodInvoker.noRetry); + } + ); + + // Clear empty blocks + if (blocks.length > 0 && blocks[0].methods.length === 0) { + blocks.shift(); + } + + // Reset all method invokers as unsent + Object.values(this._connection._methodInvokers).forEach(invoker => { + invoker.sentMessage = false; + }); + } + + /** + * Resends all active subscriptions + * @private + */ + _resendSubscriptions() { + Object.entries(this._connection._subscriptions).forEach(([id, sub]) => { + this._connection._sendQueued({ + msg: 'sub', + id: id, + name: sub.name, + params: sub.params + }); + }); + } +} \ No newline at end of file diff --git a/packages/ddp-client/common/document_processors.js b/packages/ddp-client/common/document_processors.js new file mode 100644 index 0000000000..11c4b2f8c0 --- /dev/null +++ b/packages/ddp-client/common/document_processors.js @@ -0,0 +1,206 @@ +import { MongoID } from 'meteor/mongo-id'; +import { DiffSequence } from 'meteor/diff-sequence'; +import { hasOwn } from "meteor/ddp-common/utils"; +import { isEmpty } from "meteor/ddp-common/utils"; + +export class DocumentProcessors { + constructor(connection) { + this._connection = connection; + } + + /** + * @summary Process an 'added' message from the server + * @param {Object} msg The added message + * @param {Object} updates The updates accumulator + */ + async _process_added(msg, updates) { + const self = this._connection; + const id = MongoID.idParse(msg.id); + const serverDoc = self._getServerDoc(msg.collection, id); + + if (serverDoc) { + // Some outstanding stub wrote here. + const isExisting = serverDoc.document !== undefined; + + serverDoc.document = msg.fields || Object.create(null); + serverDoc.document._id = id; + + if (self._resetStores) { + // During reconnect the server is sending adds for existing ids. + // Always push an update so that document stays in the store after + // reset. Use current version of the document for this update, so + // that stub-written values are preserved. + const currentDoc = await self._stores[msg.collection].getDoc(msg.id); + if (currentDoc !== undefined) msg.fields = currentDoc; + + self._pushUpdate(updates, msg.collection, msg); + } else if (isExisting) { + throw new Error('Server sent add for existing id: ' + msg.id); + } + } else { + self._pushUpdate(updates, msg.collection, msg); + } + } + + /** + * @summary Process a 'changed' message from the server + * @param {Object} msg The changed message + * @param {Object} updates The updates accumulator + */ + _process_changed(msg, updates) { + const self = this._connection; + const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id)); + + if (serverDoc) { + if (serverDoc.document === undefined) { + throw new Error('Server sent changed for nonexisting id: ' + msg.id); + } + DiffSequence.applyChanges(serverDoc.document, msg.fields); + } else { + self._pushUpdate(updates, msg.collection, msg); + } + } + + /** + * @summary Process a 'removed' message from the server + * @param {Object} msg The removed message + * @param {Object} updates The updates accumulator + */ + _process_removed(msg, updates) { + const self = this._connection; + const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id)); + + if (serverDoc) { + // Some outstanding stub wrote here. + if (serverDoc.document === undefined) { + throw new Error('Server sent removed for nonexisting id:' + msg.id); + } + serverDoc.document = undefined; + } else { + self._pushUpdate(updates, msg.collection, { + msg: 'removed', + collection: msg.collection, + id: msg.id + }); + } + } + + /** + * @summary Process a 'ready' message from the server + * @param {Object} msg The ready message + * @param {Object} updates The updates accumulator + */ + _process_ready(msg, updates) { + const self = this._connection; + + // Process "sub ready" messages. "sub ready" messages don't take effect + // until all current server documents have been flushed to the local + // database. We can use a write fence to implement this. + msg.subs.forEach((subId) => { + self._runWhenAllServerDocsAreFlushed(() => { + const subRecord = self._subscriptions[subId]; + // Did we already unsubscribe? + if (!subRecord) return; + // Did we already receive a ready message? (Oops!) + if (subRecord.ready) return; + subRecord.ready = true; + subRecord.readyCallback && subRecord.readyCallback(); + subRecord.readyDeps.changed(); + }); + }); + } + + /** + * @summary Process an 'updated' message from the server + * @param {Object} msg The updated message + * @param {Object} updates The updates accumulator + */ + _process_updated(msg, updates) { + const self = this._connection; + // Process "method done" messages. + msg.methods.forEach((methodId) => { + const docs = self._documentsWrittenByStub[methodId] || {}; + Object.values(docs).forEach((written) => { + const serverDoc = self._getServerDoc(written.collection, written.id); + if (!serverDoc) { + throw new Error('Lost serverDoc for ' + JSON.stringify(written)); + } + if (!serverDoc.writtenByStubs[methodId]) { + throw new Error( + 'Doc ' + + JSON.stringify(written) + + ' not written by method ' + + methodId + ); + } + delete serverDoc.writtenByStubs[methodId]; + if (isEmpty(serverDoc.writtenByStubs)) { + // All methods whose stubs wrote this method have completed! We can + // now copy the saved document to the database (reverting the stub's + // change if the server did not write to this object, or applying the + // server's writes if it did). + + // This is a fake ddp 'replace' message. It's just for talking + // between livedata connections and minimongo. (We have to stringify + // the ID because it's supposed to look like a wire message.) + self._pushUpdate(updates, written.collection, { + msg: 'replace', + id: MongoID.idStringify(written.id), + replace: serverDoc.document + }); + // Call all flush callbacks. + serverDoc.flushCallbacks.forEach((c) => { + c(); + }); + + // Delete this completed serverDocument. Don't bother to GC empty + // IdMaps inside self._serverDocuments, since there probably aren't + // many collections and they'll be written repeatedly. + self._serverDocuments[written.collection].remove(written.id); + } + }); + delete self._documentsWrittenByStub[methodId]; + + // We want to call the data-written callback, but we can't do so until all + // currently buffered messages are flushed. + const callbackInvoker = self._methodInvokers[methodId]; + if (!callbackInvoker) { + throw new Error('No callback invoker for method ' + methodId); + } + + self._runWhenAllServerDocsAreFlushed( + (...args) => callbackInvoker.dataVisible(...args) + ); + }); + } + + /** + * @summary Push an update to the buffer + * @private + * @param {Object} updates The updates accumulator + * @param {String} collection The collection name + * @param {Object} msg The update message + */ + _pushUpdate(updates, collection, msg) { + if (!hasOwn.call(updates, collection)) { + updates[collection] = []; + } + updates[collection].push(msg); + } + + /** + * @summary Get a server document by collection and id + * @private + * @param {String} collection The collection name + * @param {String} id The document id + * @returns {Object|null} The server document or null + */ + _getServerDoc(collection, id) { + const self = this._connection; + if (!hasOwn.call(self._serverDocuments, collection)) { + return null; + } + const serverDocsForCollection = self._serverDocuments[collection]; + return serverDocsForCollection.get(id) || null; + } +} \ No newline at end of file diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index e8967e3651..abe1161d21 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -5,20 +5,18 @@ import { EJSON } from 'meteor/ejson'; import { Random } from 'meteor/random'; import { MongoID } from 'meteor/mongo-id'; import { DDP } from './namespace.js'; -import MethodInvoker from './MethodInvoker.js'; +import { MethodInvoker } from './method_invoker'; import { hasOwn, slice, keys, isEmpty, last, -} from "meteor/ddp-common/utils.js"; - -class MongoIDMap extends IdMap { - constructor() { - super(MongoID.idStringify, MongoID.idParse); - } -} +} from "meteor/ddp-common/utils"; +import { ConnectionStreamHandlers } from './connection_stream_handlers'; +import { MongoIDMap } from './mongo_id_map'; +import { MessageProcessors } from './message_processors'; +import { DocumentProcessors } from './document_processors'; // @param url {String|Object} URL to Meteor app, // or an object as a test hook (see code) @@ -202,12 +200,6 @@ export class Connection { self._updatesForUnknownStores = {}; // if we're blocking a migration, the retry func self._retryMigrate = null; - - self.__flushBufferedWrites = Meteor.bindEnvironment( - self._flushBufferedWrites, - 'flushing DDP buffered writes', - self - ); // Collection name -> array of messages. self._bufferedWrites = {}; // When current buffer of updates must be flushed at, in ms timestamp. @@ -249,34 +241,63 @@ export class Connection { }); } + this._streamHandlers = new ConnectionStreamHandlers(this); + const onDisconnect = () => { - if (self._heartbeat) { - self._heartbeat.stop(); - self._heartbeat = null; + if (this._heartbeat) { + this._heartbeat.stop(); + this._heartbeat = null; } }; if (Meteor.isServer) { - self._stream.on( + this._stream.on( 'message', Meteor.bindEnvironment( - this.onMessage.bind(this), + msg => this._streamHandlers.onMessage(msg), 'handling DDP message' ) ); - self._stream.on( + this._stream.on( 'reset', - Meteor.bindEnvironment(this.onReset.bind(this), 'handling DDP reset') + Meteor.bindEnvironment( + () => this._streamHandlers.onReset(), + 'handling DDP reset' + ) ); - self._stream.on( + this._stream.on( 'disconnect', Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect') ); } else { - self._stream.on('message', this.onMessage.bind(this)); - self._stream.on('reset', this.onReset.bind(this)); - self._stream.on('disconnect', onDisconnect); + this._stream.on('message', msg => this._streamHandlers.onMessage(msg)); + this._stream.on('reset', () => this._streamHandlers.onReset()); + this._stream.on('disconnect', onDisconnect); } + + this._messageProcessors = new MessageProcessors(this); + + // Expose message processor methods to maintain backward compatibility + this._livedata_connected = (msg) => this._messageProcessors._livedata_connected(msg); + this._livedata_data = (msg) => this._messageProcessors._livedata_data(msg); + this._livedata_nosub = (msg) => this._messageProcessors._livedata_nosub(msg); + this._livedata_result = (msg) => this._messageProcessors._livedata_result(msg); + this._livedata_error = (msg) => this._messageProcessors._livedata_error(msg); + + this._documentProcessors = new DocumentProcessors(this); + + // Expose document processor methods to maintain backward compatibility + this._process_added = (msg, updates) => this._documentProcessors._process_added(msg, updates); + this._process_changed = (msg, updates) => this._documentProcessors._process_changed(msg, updates); + this._process_removed = (msg, updates) => this._documentProcessors._process_removed(msg, updates); + this._process_ready = (msg, updates) => this._documentProcessors._process_ready(msg, updates); + this._process_updated = (msg, updates) => this._documentProcessors._process_updated(msg, updates); + + // Also expose utility methods used by other parts of the system + this._pushUpdate = (updates, collection, msg) => + this._documentProcessors._pushUpdate(updates, collection, msg); + this._getServerDoc = (collection, id) => + this._documentProcessors._getServerDoc(collection, id); } // 'name' is the name of the data on the wire that should go in the @@ -941,7 +962,7 @@ export class Connection { // documents. _saveOriginals() { if (! this._waitingForQuiescence()) { - this._flushBufferedWritesClient(); + this._flushBufferedWrites(); } Object.values(this._stores).forEach((store) => { @@ -1099,121 +1120,6 @@ export class Connection { return Object.values(invokers).some((invoker) => !!invoker.sentMessage); } - async _livedata_connected(msg) { - const self = this; - - if (self._version !== 'pre1' && self._heartbeatInterval !== 0) { - self._heartbeat = new DDPCommon.Heartbeat({ - heartbeatInterval: self._heartbeatInterval, - heartbeatTimeout: self._heartbeatTimeout, - onTimeout() { - self._lostConnection( - new DDP.ConnectionError('DDP heartbeat timed out') - ); - }, - sendPing() { - self._send({ msg: 'ping' }); - } - }); - self._heartbeat.start(); - } - - // If this is a reconnect, we'll have to reset all stores. - if (self._lastSessionId) self._resetStores = true; - - let reconnectedToPreviousSession; - if (typeof msg.session === 'string') { - reconnectedToPreviousSession = self._lastSessionId === msg.session; - self._lastSessionId = msg.session; - } - - if (reconnectedToPreviousSession) { - // Successful reconnection -- pick up where we left off. Note that right - // now, this never happens: the server never connects us to a previous - // session, because DDP doesn't provide enough data for the server to know - // what messages the client has processed. We need to improve DDP to make - // this possible, at which point we'll probably need more code here. - return; - } - - // Server doesn't have our data any more. Re-sync a new session. - - // Forget about messages we were buffering for unknown collections. They'll - // be resent if still relevant. - self._updatesForUnknownStores = Object.create(null); - - if (self._resetStores) { - // Forget about the effects of stubs. We'll be resetting all collections - // anyway. - self._documentsWrittenByStub = Object.create(null); - self._serverDocuments = Object.create(null); - } - - // Clear _afterUpdateCallbacks. - self._afterUpdateCallbacks = []; - - // Mark all named subscriptions which are ready (ie, we already called the - // ready callback) as needing to be revived. - // XXX We should also block reconnect quiescence until unnamed subscriptions - // (eg, autopublish) are done re-publishing to avoid flicker! - self._subsBeingRevived = Object.create(null); - Object.entries(self._subscriptions).forEach(([id, sub]) => { - if (sub.ready) { - self._subsBeingRevived[id] = true; - } - }); - - // Arrange for "half-finished" methods to have their callbacks run, and - // track methods that were sent on this connection so that we don't - // quiesce until they are all done. - // - // Start by clearing _methodsBlockingQuiescence: methods sent before - // reconnect don't matter, and any "wait" methods sent on the new connection - // that we drop here will be restored by the loop below. - self._methodsBlockingQuiescence = Object.create(null); - if (self._resetStores) { - const invokers = self._methodInvokers; - keys(invokers).forEach(id => { - const invoker = invokers[id]; - if (invoker.gotResult()) { - // This method already got its result, but it didn't call its callback - // because its data didn't become visible. We did not resend the - // method RPC. We'll call its callback when we get a full quiesce, - // since that's as close as we'll get to "data must be visible". - self._afterUpdateCallbacks.push( - (...args) => invoker.dataVisible(...args) - ); - } else if (invoker.sentMessage) { - // This method has been sent on this connection (maybe as a resend - // from the last connection, maybe from onReconnect, maybe just very - // quickly before processing the connected message). - // - // We don't need to do anything special to ensure its callbacks get - // called, but we'll count it as a method which is preventing - // reconnect quiescence. (eg, it might be a login method that was run - // from onReconnect, and we don't want to see flicker by seeing a - // logged-out state.) - self._methodsBlockingQuiescence[invoker.methodId] = true; - } - }); - } - - self._messagesBufferedUntilQuiescence = []; - - // If we're not waiting on any methods or subs, we can reset the stores and - // call the callbacks immediately. - if (! self._waitingForQuiescence()) { - if (self._resetStores) { - for (const store of Object.values(self._stores)) { - await store.beginUpdate(0, true); - await store.endUpdate(); - } - self._resetStores = false; - } - self._runAfterUpdateCallbacks(); - } - } - async _processOneDataMessage(msg, updates) { const messageType = msg.msg; @@ -1235,87 +1141,6 @@ export class Connection { } } - async _livedata_data(msg) { - const self = this; - - if (self._waitingForQuiescence()) { - self._messagesBufferedUntilQuiescence.push(msg); - - if (msg.msg === 'nosub') { - delete self._subsBeingRevived[msg.id]; - } - - if (msg.subs) { - msg.subs.forEach(subId => { - delete self._subsBeingRevived[subId]; - }); - } - - if (msg.methods) { - msg.methods.forEach(methodId => { - delete self._methodsBlockingQuiescence[methodId]; - }); - } - - if (self._waitingForQuiescence()) { - return; - } - - // No methods or subs are blocking quiescence! - // We'll now process and all of our buffered messages, reset all stores, - // and apply them all at once. - - const bufferedMessages = self._messagesBufferedUntilQuiescence; - for (const bufferedMessage of Object.values(bufferedMessages)) { - await self._processOneDataMessage( - bufferedMessage, - self._bufferedWrites - ); - } - - self._messagesBufferedUntilQuiescence = []; - - } else { - await self._processOneDataMessage(msg, self._bufferedWrites); - } - - // Immediately flush writes when: - // 1. Buffering is disabled. Or; - // 2. any non-(added/changed/removed) message arrives. - const standardWrite = - msg.msg === "added" || - msg.msg === "changed" || - msg.msg === "removed"; - - if (self._bufferedWritesInterval === 0 || ! standardWrite) { - await self._flushBufferedWrites(); - return; - } - - if (self._bufferedWritesFlushAt === null) { - self._bufferedWritesFlushAt = - new Date().valueOf() + self._bufferedWritesMaxAge; - } else if (self._bufferedWritesFlushAt < new Date().valueOf()) { - await self._flushBufferedWrites(); - return; - } - - if (self._bufferedWritesFlushHandle) { - clearTimeout(self._bufferedWritesFlushHandle); - } - self._bufferedWritesFlushHandle = setTimeout(() => { - // __flushBufferedWrites is a promise, so with this we can wait the promise to finish - // before doing something - self._liveDataWritesPromise = self.__flushBufferedWrites(); - - if (Meteor._isPromise(self._liveDataWritesPromise)) { - self._liveDataWritesPromise.finally( - () => (self._liveDataWritesPromise = undefined) - ); - } - }, self._bufferedWritesInterval); - } - _prepareBuffersToFlush() { const self = this; if (self._bufferedWritesFlushHandle) { @@ -1332,61 +1157,49 @@ export class Connection { return writes; } - async _flushBufferedWritesServer() { - const self = this; - const writes = self._prepareBuffersToFlush(); - await self._performWritesServer(writes); - } - _flushBufferedWritesClient() { - const self = this; - const writes = self._prepareBuffersToFlush(); - self._performWritesClient(writes); - } - _flushBufferedWrites() { - const self = this; - return Meteor.isClient - ? self._flushBufferedWritesClient() - : self._flushBufferedWritesServer(); - } + /** + * Server-side store updates handled asynchronously + * @private + */ async _performWritesServer(updates) { const self = this; - if (self._resetStores || ! isEmpty(updates)) { - // Begin a transactional update of each store. - - for (const [storeName, store] of Object.entries(self._stores)) { + if (self._resetStores || !isEmpty(updates)) { + // Start all store updates - keeping original loop structure + for (const store of Object.values(self._stores)) { await store.beginUpdate( - hasOwn.call(updates, storeName) - ? updates[storeName].length - : 0, + updates[store._name]?.length || 0, self._resetStores ); } self._resetStores = false; - for (const [storeName, updateMessages] of Object.entries(updates)) { + // Process each store's updates sequentially as before + for (const [storeName, messages] of Object.entries(updates)) { const store = self._stores[storeName]; if (store) { - for (const updateMessage of updateMessages) { - await store.update(updateMessage); + // Batch each store's messages in modest chunks to prevent event loop blocking + // while maintaining operation order + const CHUNK_SIZE = 100; + for (let i = 0; i < messages.length; i += CHUNK_SIZE) { + const chunk = messages.slice(i, Math.min(i + CHUNK_SIZE, messages.length)); + + for (const msg of chunk) { + await store.update(msg); + } + + await new Promise(resolve => process.nextTick(resolve)); } } else { - // Nobody's listening for this data. Queue it up until - // someone wants it. - // XXX memory use will grow without bound if you forget to - // create a collection or just don't care about it... going - // to have to do something about that. - const updates = self._updatesForUnknownStores; - - if (! hasOwn.call(updates, storeName)) { - updates[storeName] = []; - } - - updates[storeName].push(...updateMessages); + // Queue updates for uninitialized stores + self._updatesForUnknownStores[storeName] = + self._updatesForUnknownStores[storeName] || []; + self._updatesForUnknownStores[storeName].push(...messages); } } - // End update transaction. + + // Complete all updates for (const store of Object.values(self._stores)) { await store.endUpdate(); } @@ -1394,53 +1207,55 @@ export class Connection { self._runAfterUpdateCallbacks(); } + + /** + * Client-side store updates handled synchronously for optimistic UI + * @private + */ _performWritesClient(updates) { const self = this; - if (self._resetStores || ! isEmpty(updates)) { - // Begin a transactional update of each store. - - for (const [storeName, store] of Object.entries(self._stores)) { + if (self._resetStores || !isEmpty(updates)) { + // Synchronous store updates for client + Object.values(self._stores).forEach(store => { store.beginUpdate( - hasOwn.call(updates, storeName) - ? updates[storeName].length - : 0, + updates[store._name]?.length || 0, self._resetStores ); - } + }); self._resetStores = false; - for (const [storeName, updateMessages] of Object.entries(updates)) { + Object.entries(updates).forEach(([storeName, messages]) => { const store = self._stores[storeName]; if (store) { - for (const updateMessage of updateMessages) { - store.update(updateMessage); - } + messages.forEach(msg => store.update(msg)); } else { - // Nobody's listening for this data. Queue it up until - // someone wants it. - // XXX memory use will grow without bound if you forget to - // create a collection or just don't care about it... going - // to have to do something about that. - const updates = self._updatesForUnknownStores; - - if (! hasOwn.call(updates, storeName)) { - updates[storeName] = []; - } - - updates[storeName].push(...updateMessages); + self._updatesForUnknownStores[storeName] = + self._updatesForUnknownStores[storeName] || []; + self._updatesForUnknownStores[storeName].push(...messages); } - } - // End update transaction. - for (const store of Object.values(self._stores)) { - store.endUpdate(); - } + }); + + Object.values(self._stores).forEach(store => store.endUpdate()); } self._runAfterUpdateCallbacks(); } + /** + * Executes buffered writes either synchronously (client) or async (server) + * @private + */ + async _flushBufferedWrites() { + const self = this; + const writes = self._prepareBuffersToFlush(); + + return Meteor.isClient + ? self._performWritesClient(writes) + : self._performWritesServer(writes); + } + // Call any callbacks deferred with _runWhenAllServerDocsAreFlushed whose // relevant docs have been flushed, as well as dataVisible callbacks at // reconnect-quiescence time. @@ -1453,160 +1268,6 @@ export class Connection { }); } - _pushUpdate(updates, collection, msg) { - if (! hasOwn.call(updates, collection)) { - updates[collection] = []; - } - updates[collection].push(msg); - } - - _getServerDoc(collection, id) { - const self = this; - if (! hasOwn.call(self._serverDocuments, collection)) { - return null; - } - const serverDocsForCollection = self._serverDocuments[collection]; - return serverDocsForCollection.get(id) || null; - } - - async _process_added(msg, updates) { - const self = this; - const id = MongoID.idParse(msg.id); - const serverDoc = self._getServerDoc(msg.collection, id); - if (serverDoc) { - // Some outstanding stub wrote here. - const isExisting = serverDoc.document !== undefined; - - serverDoc.document = msg.fields || Object.create(null); - serverDoc.document._id = id; - - if (self._resetStores) { - // During reconnect the server is sending adds for existing ids. - // Always push an update so that document stays in the store after - // reset. Use current version of the document for this update, so - // that stub-written values are preserved. - const currentDoc = await self._stores[msg.collection].getDoc(msg.id); - if (currentDoc !== undefined) msg.fields = currentDoc; - - self._pushUpdate(updates, msg.collection, msg); - } else if (isExisting) { - throw new Error('Server sent add for existing id: ' + msg.id); - } - } else { - self._pushUpdate(updates, msg.collection, msg); - } - } - - _process_changed(msg, updates) { - const self = this; - const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id)); - if (serverDoc) { - if (serverDoc.document === undefined) - throw new Error('Server sent changed for nonexisting id: ' + msg.id); - DiffSequence.applyChanges(serverDoc.document, msg.fields); - } else { - self._pushUpdate(updates, msg.collection, msg); - } - } - - _process_removed(msg, updates) { - const self = this; - const serverDoc = self._getServerDoc(msg.collection, MongoID.idParse(msg.id)); - if (serverDoc) { - // Some outstanding stub wrote here. - if (serverDoc.document === undefined) - throw new Error('Server sent removed for nonexisting id:' + msg.id); - serverDoc.document = undefined; - } else { - self._pushUpdate(updates, msg.collection, { - msg: 'removed', - collection: msg.collection, - id: msg.id - }); - } - } - - _process_updated(msg, updates) { - const self = this; - // Process "method done" messages. - - msg.methods.forEach((methodId) => { - const docs = self._documentsWrittenByStub[methodId] || {}; - Object.values(docs).forEach((written) => { - const serverDoc = self._getServerDoc(written.collection, written.id); - if (! serverDoc) { - throw new Error('Lost serverDoc for ' + JSON.stringify(written)); - } - if (! serverDoc.writtenByStubs[methodId]) { - throw new Error( - 'Doc ' + - JSON.stringify(written) + - ' not written by method ' + - methodId - ); - } - delete serverDoc.writtenByStubs[methodId]; - if (isEmpty(serverDoc.writtenByStubs)) { - // All methods whose stubs wrote this method have completed! We can - // now copy the saved document to the database (reverting the stub's - // change if the server did not write to this object, or applying the - // server's writes if it did). - - // This is a fake ddp 'replace' message. It's just for talking - // between livedata connections and minimongo. (We have to stringify - // the ID because it's supposed to look like a wire message.) - self._pushUpdate(updates, written.collection, { - msg: 'replace', - id: MongoID.idStringify(written.id), - replace: serverDoc.document - }); - // Call all flush callbacks. - - serverDoc.flushCallbacks.forEach((c) => { - c(); - }); - - // Delete this completed serverDocument. Don't bother to GC empty - // IdMaps inside self._serverDocuments, since there probably aren't - // many collections and they'll be written repeatedly. - self._serverDocuments[written.collection].remove(written.id); - } - }); - delete self._documentsWrittenByStub[methodId]; - - // We want to call the data-written callback, but we can't do so until all - // currently buffered messages are flushed. - const callbackInvoker = self._methodInvokers[methodId]; - if (! callbackInvoker) { - throw new Error('No callback invoker for method ' + methodId); - } - - self._runWhenAllServerDocsAreFlushed( - (...args) => callbackInvoker.dataVisible(...args) - ); - }); - } - - _process_ready(msg, updates) { - const self = this; - // Process "sub ready" messages. "sub ready" messages don't take effect - // until all current server documents have been flushed to the local - // database. We can use a write fence to implement this. - - msg.subs.forEach((subId) => { - self._runWhenAllServerDocsAreFlushed(() => { - const subRecord = self._subscriptions[subId]; - // Did we already unsubscribe? - if (!subRecord) return; - // Did we already receive a ready message? (Oops!) - if (subRecord.ready) return; - subRecord.ready = true; - subRecord.readyCallback && subRecord.readyCallback(); - subRecord.readyDeps.changed(); - }); - }); - } - // Ensures that "f" will be called after all documents currently in // _serverDocuments have been written to the local cache. f will not be called // if the connection is lost before then! @@ -1646,93 +1307,6 @@ export class Connection { } } - async _livedata_nosub(msg) { - const self = this; - - // First pass it through _livedata_data, which only uses it to help get - // towards quiescence. - await self._livedata_data(msg); - - // Do the rest of our processing immediately, with no - // buffering-until-quiescence. - - // we weren't subbed anyway, or we initiated the unsub. - if (! hasOwn.call(self._subscriptions, msg.id)) { - return; - } - - // XXX COMPAT WITH 1.0.3.1 #errorCallback - const errorCallback = self._subscriptions[msg.id].errorCallback; - const stopCallback = self._subscriptions[msg.id].stopCallback; - - self._subscriptions[msg.id].remove(); - - const meteorErrorFromMsg = msgArg => { - return ( - msgArg && - msgArg.error && - new Meteor.Error( - msgArg.error.error, - msgArg.error.reason, - msgArg.error.details - ) - ); - }; - - // XXX COMPAT WITH 1.0.3.1 #errorCallback - if (errorCallback && msg.error) { - errorCallback(meteorErrorFromMsg(msg)); - } - - if (stopCallback) { - stopCallback(meteorErrorFromMsg(msg)); - } - } - - async _livedata_result(msg) { - // id, result or error. error has error (code), reason, details - - const self = this; - - // Lets make sure there are no buffered writes before returning result. - if (! isEmpty(self._bufferedWrites)) { - await self._flushBufferedWrites(); - } - - // find the outstanding request - // should be O(1) in nearly all realistic use cases - if (isEmpty(self._outstandingMethodBlocks)) { - Meteor._debug('Received method result but no methods outstanding'); - return; - } - const currentMethodBlock = self._outstandingMethodBlocks[0].methods; - let i; - const m = currentMethodBlock.find((method, idx) => { - const found = method.methodId === msg.id; - if (found) i = idx; - return found; - }); - if (!m) { - Meteor._debug("Can't match method response to original method call", msg); - return; - } - - // Remove from current method block. This may leave the block empty, but we - // don't move on to the next block until the callback has been delivered, in - // _outstandingMethodFinished. - currentMethodBlock.splice(i, 1); - - if (hasOwn.call(msg, 'error')) { - m.receiveResult( - new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details) - ); - } else { - // msg.result may be undefined if the method didn't return a - // value - m.receiveResult(undefined, msg.result); - } - } - _addOutstandingMethod(methodInvoker, options) { if (options?.wait) { // It's a wait method! Wait methods go in their own block. @@ -1801,11 +1375,6 @@ export class Connection { }); } - _livedata_error(msg) { - Meteor._debug('Received error from server: ', msg.reason); - if (msg.offendingMessage) Meteor._debug('For: ', msg.offendingMessage); - } - _sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks) { const self = this; if (isEmpty(oldOutstandingMethodBlocks)) return; @@ -1870,148 +1439,4 @@ export class Connection { self._retryMigrate = null; } } - - async onMessage(raw_msg) { - let msg; - try { - msg = DDPCommon.parseDDP(raw_msg); - } catch (e) { - Meteor._debug('Exception while parsing DDP', e); - return; - } - - // Any message counts as receiving a pong, as it demonstrates that - // the server is still alive. - if (this._heartbeat) { - this._heartbeat.messageReceived(); - } - - if (msg === null || !msg.msg) { - if(!msg || !msg.testMessageOnConnect) { - if (Object.keys(msg).length === 1 && msg.server_id) return; - Meteor._debug('discarding invalid livedata message', msg); - } - return; - } - - if (msg.msg === 'connected') { - this._version = this._versionSuggestion; - await this._livedata_connected(msg); - this.options.onConnected(); - } else if (msg.msg === 'failed') { - if (this._supportedDDPVersions.indexOf(msg.version) >= 0) { - this._versionSuggestion = msg.version; - this._stream.reconnect({ _force: true }); - } else { - const description = - 'DDP version negotiation failed; server requested version ' + - msg.version; - this._stream.disconnect({ _permanent: true, _error: description }); - this.options.onDDPVersionNegotiationFailure(description); - } - } else if (msg.msg === 'ping' && this.options.respondToPings) { - this._send({ msg: 'pong', id: msg.id }); - } else if (msg.msg === 'pong') { - // noop, as we assume everything's a pong - } else if ( - ['added', 'changed', 'removed', 'ready', 'updated'].includes(msg.msg) - ) { - await this._livedata_data(msg); - } else if (msg.msg === 'nosub') { - await this._livedata_nosub(msg); - } else if (msg.msg === 'result') { - await this._livedata_result(msg); - } else if (msg.msg === 'error') { - this._livedata_error(msg); - } else { - Meteor._debug('discarding unknown livedata message type', msg); - } - } - - onReset() { - // Send a connect message at the beginning of the stream. - // NOTE: reset is called even on the first connection, so this is - // the only place we send this message. - const msg = { msg: 'connect' }; - if (this._lastSessionId) msg.session = this._lastSessionId; - msg.version = this._versionSuggestion || this._supportedDDPVersions[0]; - this._versionSuggestion = msg.version; - msg.support = this._supportedDDPVersions; - this._send(msg); - - // Mark non-retry calls as failed. This has to be done early as getting these methods out of the - // current block is pretty important to making sure that quiescence is properly calculated, as - // well as possibly moving on to another useful block. - - // Only bother testing if there is an outstandingMethodBlock (there might not be, especially if - // we are connecting for the first time. - if (this._outstandingMethodBlocks.length > 0) { - // If there is an outstanding method block, we only care about the first one as that is the - // one that could have already sent messages with no response, that are not allowed to retry. - const currentMethodBlock = this._outstandingMethodBlocks[0].methods; - this._outstandingMethodBlocks[0].methods = currentMethodBlock.filter( - methodInvoker => { - // Methods with 'noRetry' option set are not allowed to re-send after - // recovering dropped connection. - if (methodInvoker.sentMessage && methodInvoker.noRetry) { - // Make sure that the method is told that it failed. - methodInvoker.receiveResult( - new Meteor.Error( - 'invocation-failed', - 'Method invocation might have failed due to dropped connection. ' + - 'Failing because `noRetry` option was passed to Meteor.apply.' - ) - ); - } - - // Only keep a method if it wasn't sent or it's allowed to retry. - // This may leave the block empty, but we don't move on to the next - // block until the callback has been delivered, in _outstandingMethodFinished. - return !(methodInvoker.sentMessage && methodInvoker.noRetry); - } - ); - } - - // Now, to minimize setup latency, go ahead and blast out all of - // our pending methods ands subscriptions before we've even taken - // the necessary RTT to know if we successfully reconnected. (1) - // They're supposed to be idempotent, and where they are not, - // they can block retry in apply; (2) even if we did reconnect, - // we're not sure what messages might have gotten lost - // (in either direction) since we were disconnected (TCP being - // sloppy about that.) - - // If the current block of methods all got their results (but didn't all get - // their data visible), discard the empty block now. - if ( - this._outstandingMethodBlocks.length > 0 && - this._outstandingMethodBlocks[0].methods.length === 0 - ) { - this._outstandingMethodBlocks.shift(); - } - - // Mark all messages as unsent, they have not yet been sent on this - // connection. - keys(this._methodInvokers).forEach(id => { - this._methodInvokers[id].sentMessage = false; - }); - - // If an `onReconnect` handler is set, call it first. Go through - // some hoops to ensure that methods that are called from within - // `onReconnect` get executed _before_ ones that were originally - // outstanding (since `onReconnect` is used to re-establish auth - // certificates) - this._callOnReconnectAndSendAppropriateOutstandingMethods(); - - // add new subscriptions at the end. this way they take effect after - // the handlers and we don't see flicker. - Object.entries(this._subscriptions).forEach(([id, sub]) => { - this._sendQueued({ - msg: 'sub', - id: id, - name: sub.name, - params: sub.params - }); - }); - } } diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js new file mode 100644 index 0000000000..09b13f742e --- /dev/null +++ b/packages/ddp-client/common/message_processors.js @@ -0,0 +1,336 @@ +import { DDPCommon } from 'meteor/ddp-common'; +import { Meteor } from 'meteor/meteor'; +import { DDP } from './namespace.js'; +import { EJSON } from 'meteor/ejson'; +import { isEmpty, hasOwn } from "meteor/ddp-common/utils"; + +export class MessageProcessors { + constructor(connection) { + this._connection = connection; + } + + /** + * @summary Process the connection message and set up the session + * @param {Object} msg The connection message + */ + async _livedata_connected(msg) { + const self = this._connection; + + if (self._version !== 'pre1' && self._heartbeatInterval !== 0) { + self._heartbeat = new DDPCommon.Heartbeat({ + heartbeatInterval: self._heartbeatInterval, + heartbeatTimeout: self._heartbeatTimeout, + onTimeout() { + self._lostConnection( + new DDP.ConnectionError('DDP heartbeat timed out') + ); + }, + sendPing() { + self._send({ msg: 'ping' }); + } + }); + self._heartbeat.start(); + } + + // If this is a reconnect, we'll have to reset all stores. + if (self._lastSessionId) self._resetStores = true; + + let reconnectedToPreviousSession; + if (typeof msg.session === 'string') { + reconnectedToPreviousSession = self._lastSessionId === msg.session; + self._lastSessionId = msg.session; + } + + if (reconnectedToPreviousSession) { + // Successful reconnection -- pick up where we left off. + return; + } + + // Server doesn't have our data anymore. Re-sync a new session. + + // Forget about messages we were buffering for unknown collections. They'll + // be resent if still relevant. + self._updatesForUnknownStores = Object.create(null); + + if (self._resetStores) { + // Forget about the effects of stubs. We'll be resetting all collections + // anyway. + self._documentsWrittenByStub = Object.create(null); + self._serverDocuments = Object.create(null); + } + + // Clear _afterUpdateCallbacks. + self._afterUpdateCallbacks = []; + + // Mark all named subscriptions which are ready as needing to be revived. + self._subsBeingRevived = Object.create(null); + Object.entries(self._subscriptions).forEach(([id, sub]) => { + if (sub.ready) { + self._subsBeingRevived[id] = true; + } + }); + + // Arrange for "half-finished" methods to have their callbacks run, and + // track methods that were sent on this connection so that we don't + // quiesce until they are all done. + // + // Start by clearing _methodsBlockingQuiescence: methods sent before + // reconnect don't matter, and any "wait" methods sent on the new connection + // that we drop here will be restored by the loop below. + self._methodsBlockingQuiescence = Object.create(null); + if (self._resetStores) { + const invokers = self._methodInvokers; + Object.keys(invokers).forEach(id => { + const invoker = invokers[id]; + if (invoker.gotResult()) { + // This method already got its result, but it didn't call its callback + // because its data didn't become visible. We did not resend the + // method RPC. We'll call its callback when we get a full quiesce, + // since that's as close as we'll get to "data must be visible". + self._afterUpdateCallbacks.push( + (...args) => invoker.dataVisible(...args) + ); + } else if (invoker.sentMessage) { + // This method has been sent on this connection (maybe as a resend + // from the last connection, maybe from onReconnect, maybe just very + // quickly before processing the connected message). + // + // We don't need to do anything special to ensure its callbacks get + // called, but we'll count it as a method which is preventing + // reconnect quiescence. (eg, it might be a login method that was run + // from onReconnect, and we don't want to see flicker by seeing a + // logged-out state.) + self._methodsBlockingQuiescence[invoker.methodId] = true; + } + }); + } + + self._messagesBufferedUntilQuiescence = []; + + // If we're not waiting on any methods or subs, we can reset the stores and + // call the callbacks immediately. + if (!self._waitingForQuiescence()) { + if (self._resetStores) { + for (const store of Object.values(self._stores)) { + await store.beginUpdate(0, true); + await store.endUpdate(); + } + self._resetStores = false; + } + self._runAfterUpdateCallbacks(); + } + } + + /** + * @summary Process various data messages from the server + * @param {Object} msg The data message + */ + async _livedata_data(msg) { + const self = this._connection; + + if (self._waitingForQuiescence()) { + self._messagesBufferedUntilQuiescence.push(msg); + + if (msg.msg === 'nosub') { + delete self._subsBeingRevived[msg.id]; + } + + if (msg.subs) { + msg.subs.forEach(subId => { + delete self._subsBeingRevived[subId]; + }); + } + + if (msg.methods) { + msg.methods.forEach(methodId => { + delete self._methodsBlockingQuiescence[methodId]; + }); + } + + if (self._waitingForQuiescence()) { + return; + } + + // No methods or subs are blocking quiescence! + // We'll now process and all of our buffered messages, reset all stores, + // and apply them all at once. + const bufferedMessages = self._messagesBufferedUntilQuiescence; + for (const bufferedMessage of Object.values(bufferedMessages)) { + await this._processOneDataMessage( + bufferedMessage, + self._bufferedWrites + ); + } + self._messagesBufferedUntilQuiescence = []; + } else { + await this._processOneDataMessage(msg, self._bufferedWrites); + } + + // Immediately flush writes when: + // 1. Buffering is disabled. Or; + // 2. any non-(added/changed/removed) message arrives. + const standardWrite = + msg.msg === "added" || + msg.msg === "changed" || + msg.msg === "removed"; + + if (self._bufferedWritesInterval === 0 || !standardWrite) { + await self._flushBufferedWrites(); + return; + } + + if (self._bufferedWritesFlushAt === null) { + self._bufferedWritesFlushAt = + new Date().valueOf() + self._bufferedWritesMaxAge; + } else if (self._bufferedWritesFlushAt < new Date().valueOf()) { + await self._flushBufferedWrites(); + return; + } + + if (self._bufferedWritesFlushHandle) { + clearTimeout(self._bufferedWritesFlushHandle); + } + self._bufferedWritesFlushHandle = setTimeout(() => { + self._liveDataWritesPromise = self._flushBufferedWrites(); + if (Meteor._isPromise(self._liveDataWritesPromise)) { + self._liveDataWritesPromise.finally( + () => (self._liveDataWritesPromise = undefined) + ); + } + }, self._bufferedWritesInterval); + } + + /** + * @summary Process individual data messages by type + * @private + */ + async _processOneDataMessage(msg, updates) { + const messageType = msg.msg; + + switch (messageType) { + case 'added': + await this._connection._process_added(msg, updates); + break; + case 'changed': + this._connection._process_changed(msg, updates); + break; + case 'removed': + this._connection._process_removed(msg, updates); + break; + case 'ready': + this._connection._process_ready(msg, updates); + break; + case 'updated': + this._connection._process_updated(msg, updates); + break; + case 'nosub': + // ignore this + break; + default: + Meteor._debug('discarding unknown livedata data message type', msg); + } + } + + /** + * @summary Handle method results arriving from the server + * @param {Object} msg The method result message + */ + async _livedata_result(msg) { + const self = this._connection; + + // Lets make sure there are no buffered writes before returning result. + if (!isEmpty(self._bufferedWrites)) { + await self._flushBufferedWrites(); + } + + // find the outstanding request + // should be O(1) in nearly all realistic use cases + if (isEmpty(self._outstandingMethodBlocks)) { + Meteor._debug('Received method result but no methods outstanding'); + return; + } + const currentMethodBlock = self._outstandingMethodBlocks[0].methods; + let i; + const m = currentMethodBlock.find((method, idx) => { + const found = method.methodId === msg.id; + if (found) i = idx; + return found; + }); + if (!m) { + Meteor._debug("Can't match method response to original method call", msg); + return; + } + + // Remove from current method block. This may leave the block empty, but we + // don't move on to the next block until the callback has been delivered, in + // _outstandingMethodFinished. + currentMethodBlock.splice(i, 1); + + if (hasOwn.call(msg, 'error')) { + m.receiveResult( + new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details) + ); + } else { + // msg.result may be undefined if the method didn't return a value + m.receiveResult(undefined, msg.result); + } + } + + /** + * @summary Handle "nosub" messages arriving from the server + * @param {Object} msg The nosub message + */ + async _livedata_nosub(msg) { + const self = this._connection; + + // First pass it through _livedata_data, which only uses it to help get + // towards quiescence. + await this._livedata_data(msg); + + // Do the rest of our processing immediately, with no + // buffering-until-quiescence. + + // we weren't subbed anyway, or we initiated the unsub. + if (!hasOwn.call(self._subscriptions, msg.id)) { + return; + } + + // XXX COMPAT WITH 1.0.3.1 #errorCallback + const errorCallback = self._subscriptions[msg.id].errorCallback; + const stopCallback = self._subscriptions[msg.id].stopCallback; + + self._subscriptions[msg.id].remove(); + + const meteorErrorFromMsg = msgArg => { + return ( + msgArg && + msgArg.error && + new Meteor.Error( + msgArg.error.error, + msgArg.error.reason, + msgArg.error.details + ) + ); + }; + + // XXX COMPAT WITH 1.0.3.1 #errorCallback + if (errorCallback && msg.error) { + errorCallback(meteorErrorFromMsg(msg)); + } + + if (stopCallback) { + stopCallback(meteorErrorFromMsg(msg)); + } + } + + /** + * @summary Handle errors from the server + * @param {Object} msg The error message + */ + _livedata_error(msg) { + Meteor._debug('Received error from server: ', msg.reason); + if (msg.offendingMessage) Meteor._debug('For: ', msg.offendingMessage); + } + + // Document change message processors will be defined in a separate class +} \ No newline at end of file diff --git a/packages/ddp-client/common/MethodInvoker.js b/packages/ddp-client/common/method_invoker.js similarity index 98% rename from packages/ddp-client/common/MethodInvoker.js rename to packages/ddp-client/common/method_invoker.js index f2490b92f8..e0ef59e478 100644 --- a/packages/ddp-client/common/MethodInvoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -3,7 +3,7 @@ // _methodInvokers map; it removes itself once the method is fully finished and // the callback is invoked. This occurs when it has both received a result, // and the data written by it is fully visible. -export default class MethodInvoker { +export class MethodInvoker { constructor(options) { // Public (within this file) fields. this.methodId = options.methodId; diff --git a/packages/ddp-client/common/mongo_id_map.js b/packages/ddp-client/common/mongo_id_map.js new file mode 100644 index 0000000000..179aa214f1 --- /dev/null +++ b/packages/ddp-client/common/mongo_id_map.js @@ -0,0 +1,7 @@ +import { MongoID } from 'meteor/mongo-id'; + +export class MongoIDMap extends IdMap { + constructor() { + super(MongoID.idStringify, MongoID.idParse); + } +} \ No newline at end of file diff --git a/packages/ddp-server/dummy_document_view.ts b/packages/ddp-server/dummy_document_view.ts new file mode 100644 index 0000000000..f87f0839aa --- /dev/null +++ b/packages/ddp-server/dummy_document_view.ts @@ -0,0 +1,40 @@ +interface ChangeCollector { + [key: string]: any; +} + +interface DataEntry { + subscriptionHandle: string; + value: any; +} + +export class DummyDocumentView { + private existsIn: Set; + private dataByKey: Map; + + constructor() { + this.existsIn = new Set(); // set of subscriptionHandle + this.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence] + } + + getFields(): Record { + return {}; + } + + clearField( + subscriptionHandle: string, + key: string, + changeCollector: ChangeCollector + ): void { + changeCollector[key] = undefined; + } + + changeField( + subscriptionHandle: string, + key: string, + value: any, + changeCollector: ChangeCollector, + isAdd?: boolean + ): void { + changeCollector[key] = value; + } +} \ No newline at end of file diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index 0cfd4cad8d..23ca1cce52 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -1,6 +1,8 @@ import isEmpty from 'lodash.isempty'; -import isString from 'lodash.isstring'; import isObject from 'lodash.isobject'; +import isString from 'lodash.isstring'; +import { SessionCollectionView } from './session_collection_view'; +import { SessionDocumentView } from './session_document_view'; DDPServer = {}; @@ -55,33 +57,7 @@ DDPServer.publicationStrategies = publicationStrategies; // Session and Subscription are file scope. For now, until we freeze // the interface, Server is package scope (in the future it should be // exported). -var DummyDocumentView = function () { - var self = this; - self.existsIn = new Set(); // set of subscriptionHandle - self.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence] -}; -Object.assign(DummyDocumentView.prototype, { - getFields: function () { - return {} - }, - - clearField: function (subscriptionHandle, key, changeCollector) { - changeCollector[key] = undefined - }, - - changeField: function (subscriptionHandle, key, value, - changeCollector, isAdd) { - changeCollector[key] = value - } -}); - -// Represents a single document in a SessionCollectionView -var SessionDocumentView = function () { - var self = this; - self.existsIn = new Set(); // set of subscriptionHandle - self.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence] -}; DDPServer._SessionDocumentView = SessionDocumentView; @@ -94,210 +70,9 @@ DDPServer._getCurrentFence = function () { return currentInvocation ? currentInvocation.fence : undefined; }; -Object.assign(SessionDocumentView.prototype, { - - getFields: function () { - var self = this; - var ret = {}; - self.dataByKey.forEach(function (precedenceList, key) { - ret[key] = precedenceList[0].value; - }); - return ret; - }, - - clearField: function (subscriptionHandle, key, changeCollector) { - var self = this; - // Publish API ignores _id if present in fields - if (key === "_id") - return; - var precedenceList = self.dataByKey.get(key); - - // It's okay to clear fields that didn't exist. No need to throw - // an error. - if (!precedenceList) - return; - - var removedValue = undefined; - for (var i = 0; i < precedenceList.length; i++) { - var precedence = precedenceList[i]; - if (precedence.subscriptionHandle === subscriptionHandle) { - // The view's value can only change if this subscription is the one that - // used to have precedence. - if (i === 0) - removedValue = precedence.value; - precedenceList.splice(i, 1); - break; - } - } - if (precedenceList.length === 0) { - self.dataByKey.delete(key); - changeCollector[key] = undefined; - } else if (removedValue !== undefined && - !EJSON.equals(removedValue, precedenceList[0].value)) { - changeCollector[key] = precedenceList[0].value; - } - }, - - changeField: function (subscriptionHandle, key, value, - changeCollector, isAdd) { - var self = this; - // Publish API ignores _id if present in fields - if (key === "_id") - return; - - // Don't share state with the data passed in by the user. - value = EJSON.clone(value); - - if (!self.dataByKey.has(key)) { - self.dataByKey.set(key, [{subscriptionHandle: subscriptionHandle, - value: value}]); - changeCollector[key] = value; - return; - } - var precedenceList = self.dataByKey.get(key); - var elt; - if (!isAdd) { - elt = precedenceList.find(function (precedence) { - return precedence.subscriptionHandle === subscriptionHandle; - }); - } - - if (elt) { - if (elt === precedenceList[0] && !EJSON.equals(value, elt.value)) { - // this subscription is changing the value of this field. - changeCollector[key] = value; - } - elt.value = value; - } else { - // this subscription is newly caring about this field - precedenceList.push({subscriptionHandle: subscriptionHandle, value: value}); - } - - } -}); - -/** - * Represents a client's view of a single collection - * @param {String} collectionName Name of the collection it represents - * @param {Object.} sessionCallbacks The callbacks for added, changed, removed - * @class SessionCollectionView - */ -var SessionCollectionView = function (collectionName, sessionCallbacks) { - var self = this; - self.collectionName = collectionName; - self.documents = new Map(); - self.callbacks = sessionCallbacks; -}; DDPServer._SessionCollectionView = SessionCollectionView; - -Object.assign(SessionCollectionView.prototype, { - - isEmpty: function () { - var self = this; - return self.documents.size === 0; - }, - - diff: function (previous) { - var self = this; - DiffSequence.diffMaps(previous.documents, self.documents, { - both: self.diffDocument.bind(self), - - rightOnly: function (id, nowDV) { - self.callbacks.added(self.collectionName, id, nowDV.getFields()); - }, - - leftOnly: function (id, prevDV) { - self.callbacks.removed(self.collectionName, id); - } - }); - }, - - diffDocument: function (id, prevDV, nowDV) { - var self = this; - var fields = {}; - DiffSequence.diffObjects(prevDV.getFields(), nowDV.getFields(), { - both: function (key, prev, now) { - if (!EJSON.equals(prev, now)) - fields[key] = now; - }, - rightOnly: function (key, now) { - fields[key] = now; - }, - leftOnly: function(key, prev) { - fields[key] = undefined; - } - }); - self.callbacks.changed(self.collectionName, id, fields); - }, - - added: function (subscriptionHandle, id, fields) { - var self = this; - var docView = self.documents.get(id); - var added = false; - if (!docView) { - added = true; - if (Meteor.server.getPublicationStrategy(this.collectionName).useDummyDocumentView) { - docView = new DummyDocumentView(); - } else { - docView = new SessionDocumentView(); - } - - self.documents.set(id, docView); - } - docView.existsIn.add(subscriptionHandle); - var changeCollector = {}; - Object.entries(fields).forEach(function ([key, value]) { - docView.changeField( - subscriptionHandle, key, value, changeCollector, true); - }); - if (added) - self.callbacks.added(self.collectionName, id, changeCollector); - else - self.callbacks.changed(self.collectionName, id, changeCollector); - }, - - changed: function (subscriptionHandle, id, changed) { - var self = this; - var changedResult = {}; - var docView = self.documents.get(id); - if (!docView) - throw new Error("Could not find element with id " + id + " to change"); - Object.entries(changed).forEach(function ([key, value]) { - if (value === undefined) - docView.clearField(subscriptionHandle, key, changedResult); - else - docView.changeField(subscriptionHandle, key, value, changedResult); - }); - self.callbacks.changed(self.collectionName, id, changedResult); - }, - - removed: function (subscriptionHandle, id) { - var self = this; - var docView = self.documents.get(id); - if (!docView) { - var err = new Error("Removed nonexistent document " + id); - throw err; - } - docView.existsIn.delete(subscriptionHandle); - if (docView.existsIn.size === 0) { - // it is gone from everyone - self.callbacks.removed(self.collectionName, id); - self.documents.delete(id); - } else { - var changed = {}; - // remove this subscription from every precedence list - // and record the changes - docView.dataByKey.forEach(function (precedenceList, key) { - docView.clearField(subscriptionHandle, key, changed); - }); - - self.callbacks.changed(self.collectionName, id, changed); - } - } -}); - /******************************************************************************/ /* Session */ /******************************************************************************/ @@ -636,7 +411,7 @@ Object.assign(Session.prototype, { if (!blocked) return; // idempotent blocked = false; - processNext(); + setImmediate(processNext); }; self.server.onMessageHook.each(function (callback) { diff --git a/packages/ddp-server/package.js b/packages/ddp-server/package.js index d3eedde20a..2f847a0801 100644 --- a/packages/ddp-server/package.js +++ b/packages/ddp-server/package.js @@ -5,7 +5,7 @@ Package.describe({ }); Npm.depends({ - "permessage-deflate": "0.1.7", + "permessage-deflate2": "0.1.8", sockjs: "0.3.24", "lodash.once": "4.1.1", "lodash.isempty": "4.4.0", @@ -23,6 +23,7 @@ Package.onUse(function (api) { "mongo-id", "diff-sequence", "ecmascript", + "typescript", ], "server" ); diff --git a/packages/ddp-server/session_collection_view.ts b/packages/ddp-server/session_collection_view.ts new file mode 100644 index 0000000000..153c569bd0 --- /dev/null +++ b/packages/ddp-server/session_collection_view.ts @@ -0,0 +1,140 @@ +import { DummyDocumentView } from "./dummy_document_view"; +import { SessionDocumentView } from "./session_document_view"; + +interface SessionCallbacks { + added: (collectionName: string, id: string, fields: Record) => void; + changed: (collectionName: string, id: string, fields: Record) => void; + removed: (collectionName: string, id: string) => void; +} + +type DocumentView = SessionDocumentView | DummyDocumentView; + +export class SessionCollectionView { + private readonly collectionName: string; + private readonly documents: Map; + private readonly callbacks: SessionCallbacks; + + /** + * Represents a client's view of a single collection + * @param collectionName - Name of the collection it represents + * @param sessionCallbacks - The callbacks for added, changed, removed + */ + constructor(collectionName: string, sessionCallbacks: SessionCallbacks) { + this.collectionName = collectionName; + this.documents = new Map(); + this.callbacks = sessionCallbacks; + } + + public isEmpty(): boolean { + return this.documents.size === 0; + } + + public diff(previous: SessionCollectionView): void { + DiffSequence.diffMaps(previous.documents, this.documents, { + both: this.diffDocument.bind(this), + rightOnly: (id: string, nowDV: DocumentView) => { + this.callbacks.added(this.collectionName, id, nowDV.getFields()); + }, + leftOnly: (id: string, prevDV: DocumentView) => { + this.callbacks.removed(this.collectionName, id); + } + }); + } + + private diffDocument(id: string, prevDV: DocumentView, nowDV: DocumentView): void { + const fields: Record = {}; + + DiffSequence.diffObjects(prevDV.getFields(), nowDV.getFields(), { + both: (key: string, prev: any, now: any) => { + if (!EJSON.equals(prev, now)) { + fields[key] = now; + } + }, + rightOnly: (key: string, now: any) => { + fields[key] = now; + }, + leftOnly: (key: string, prev: any) => { + fields[key] = undefined; + } + }); + + this.callbacks.changed(this.collectionName, id, fields); + } + + public added(subscriptionHandle: string, id: string, fields: Record): void { + let docView: DocumentView | undefined = this.documents.get(id); + let added = false; + + if (!docView) { + added = true; + if (Meteor.server.getPublicationStrategy(this.collectionName).useDummyDocumentView) { + docView = new DummyDocumentView(); + } else { + docView = new SessionDocumentView(); + } + this.documents.set(id, docView); + } + + docView.existsIn.add(subscriptionHandle); + const changeCollector: Record = {}; + + Object.entries(fields).forEach(([key, value]) => { + docView!.changeField( + subscriptionHandle, + key, + value, + changeCollector, + true + ); + }); + + if (added) { + this.callbacks.added(this.collectionName, id, changeCollector); + } else { + this.callbacks.changed(this.collectionName, id, changeCollector); + } + } + + public changed(subscriptionHandle: string, id: string, changed: Record): void { + const changedResult: Record = {}; + const docView = this.documents.get(id); + + if (!docView) { + throw new Error(`Could not find element with id ${id} to change`); + } + + Object.entries(changed).forEach(([key, value]) => { + if (value === undefined) { + docView.clearField(subscriptionHandle, key, changedResult); + } else { + docView.changeField(subscriptionHandle, key, value, changedResult); + } + }); + + this.callbacks.changed(this.collectionName, id, changedResult); + } + + public removed(subscriptionHandle: string, id: string): void { + const docView = this.documents.get(id); + + if (!docView) { + throw new Error(`Removed nonexistent document ${id}`); + } + + docView.existsIn.delete(subscriptionHandle); + + if (docView.existsIn.size === 0) { + // it is gone from everyone + this.callbacks.removed(this.collectionName, id); + this.documents.delete(id); + } else { + const changed: Record = {}; + // remove this subscription from every precedence list + // and record the changes + docView.dataByKey.forEach((precedenceList, key) => { + docView.clearField(subscriptionHandle, key, changed); + }); + this.callbacks.changed(this.collectionName, id, changed); + } + } +} \ No newline at end of file diff --git a/packages/ddp-server/session_document_view.ts b/packages/ddp-server/session_document_view.ts new file mode 100644 index 0000000000..4bd838c177 --- /dev/null +++ b/packages/ddp-server/session_document_view.ts @@ -0,0 +1,106 @@ +interface PrecedenceItem { + subscriptionHandle: string; + value: any; +} + +interface ChangeCollector { + [key: string]: any; +} + +export class SessionDocumentView { + private existsIn: Set; + private dataByKey: Map; + + constructor() { + this.existsIn = new Set(); // set of subscriptionHandle + // Memory Growth + this.dataByKey = new Map(); // key-> [ {subscriptionHandle, value} by precedence] + } + + getFields(): Record { + const ret: Record = {}; + this.dataByKey.forEach((precedenceList, key) => { + ret[key] = precedenceList[0].value; + }); + return ret; + } + + clearField( + subscriptionHandle: string, + key: string, + changeCollector: ChangeCollector + ): void { + // Publish API ignores _id if present in fields + if (key === "_id") return; + + const precedenceList = this.dataByKey.get(key); + // It's okay to clear fields that didn't exist. No need to throw + // an error. + if (!precedenceList) return; + + let removedValue: any = undefined; + + for (let i = 0; i < precedenceList.length; i++) { + const precedence = precedenceList[i]; + if (precedence.subscriptionHandle === subscriptionHandle) { + // The view's value can only change if this subscription is the one that + // used to have precedence. + if (i === 0) removedValue = precedence.value; + precedenceList.splice(i, 1); + break; + } + } + + if (precedenceList.length === 0) { + this.dataByKey.delete(key); + changeCollector[key] = undefined; + } else if ( + removedValue !== undefined && + !EJSON.equals(removedValue, precedenceList[0].value) + ) { + changeCollector[key] = precedenceList[0].value; + } + } + + changeField( + subscriptionHandle: string, + key: string, + value: any, + changeCollector: ChangeCollector, + isAdd: boolean = false + ): void { + // Publish API ignores _id if present in fields + if (key === "_id") return; + + // Don't share state with the data passed in by the user. + value = EJSON.clone(value); + + if (!this.dataByKey.has(key)) { + this.dataByKey.set(key, [ + { subscriptionHandle: subscriptionHandle, value: value }, + ]); + changeCollector[key] = value; + return; + } + + const precedenceList = this.dataByKey.get(key)!; + let elt: PrecedenceItem | undefined; + + if (!isAdd) { + elt = precedenceList.find( + (precedence) => precedence.subscriptionHandle === subscriptionHandle + ); + } + + if (elt) { + if (elt === precedenceList[0] && !EJSON.equals(value, elt.value)) { + // this subscription is changing the value of this field. + changeCollector[key] = value; + } + elt.value = value; + } else { + // this subscription is newly caring about this field + precedenceList.push({ subscriptionHandle: subscriptionHandle, value: value }); + } + } +} \ No newline at end of file diff --git a/packages/ddp-server/stream_server.js b/packages/ddp-server/stream_server.js index d57b81456c..3691f59e1d 100644 --- a/packages/ddp-server/stream_server.js +++ b/packages/ddp-server/stream_server.js @@ -1,4 +1,5 @@ import once from 'lodash.once'; +import zlib from 'node:zlib'; // By default, we use the permessage-deflate extension with default // configuration. If $SERVER_WEBSOCKET_COMPRESSION is set, then it must be valid @@ -14,12 +15,18 @@ import once from 'lodash.once'; var websocketExtensions = once(function () { var extensions = []; - var websocketCompressionConfig = process.env.SERVER_WEBSOCKET_COMPRESSION - ? JSON.parse(process.env.SERVER_WEBSOCKET_COMPRESSION) : {}; + var websocketCompressionConfig = process.env.SERVER_WEBSOCKET_COMPRESSION ? + JSON.parse(process.env.SERVER_WEBSOCKET_COMPRESSION) : {}; + if (websocketCompressionConfig) { - extensions.push(Npm.require('permessage-deflate').configure( - websocketCompressionConfig - )); + extensions.push(Npm.require('permessage-deflate2').configure({ + threshold: 1024, + level: zlib.constants.Z_BEST_SPEED, + memLevel: zlib.constants.Z_MIN_MEMLEVEL, + noContextTakeover: true, + maxWindowBits: zlib.constants.Z_MIN_WINDOWBITS, + ...(websocketCompressionConfig || {}) + })); } return extensions; diff --git a/packages/ddp-server/writefence.js b/packages/ddp-server/writefence.js index a07ab575fc..aee0f1fde7 100644 --- a/packages/ddp-server/writefence.js +++ b/packages/ddp-server/writefence.js @@ -1,7 +1,3 @@ -// A write fence collects a group of writes, and provides a callback -// when all of the writes are fully committed and propagated (all -// observers have been notified of the write and acknowledged it.) -// DDPServer._WriteFence = class { constructor() { this.armed = false; @@ -12,58 +8,49 @@ DDPServer._WriteFence = class { this.completion_callbacks = []; } - // Start tracking a write, and return an object to represent it. The - // object has a single method, committed(). This method should be - // called when the write is fully committed and propagated. You can - // continue to add writes to the WriteFence up until it is triggered - // (calls its callbacks because all writes have committed.) beginWrite() { - if (this.retired) - return { committed: function () {} }; + if (this.retired) { + return { committed: () => {} }; + } - if (this.fired) + if (this.fired) { throw new Error("fence has already activated -- too late to add writes"); + } this.outstanding_writes++; let committed = false; - const _committedFn = async () => { - if (committed) - throw new Error("committed called twice on the same write"); - committed = true; - this.outstanding_writes--; - await this._maybeFire(); - }; return { - committed: _committedFn, + committed: async () => { + if (committed) { + throw new Error("committed called twice on the same write"); + } + committed = true; + this.outstanding_writes--; + await this._maybeFire(); + } }; } - // Arm the fence. Once the fence is armed, and there are no more - // uncommitted writes, it will activate. arm() { - - if (this === DDPServer._getCurrentFence()) + if (this === DDPServer._getCurrentFence()) { throw Error("Can't arm the current fence"); + } this.armed = true; return this._maybeFire(); } - // Register a function to be called once before firing the fence. - // Callback function can add new writes to the fence, in which case - // it won't fire until those writes are done as well. onBeforeFire(func) { - if (this.fired) - throw new Error("fence has already activated -- too late to " + - "add a callback"); + if (this.fired) { + throw new Error("fence has already activated -- too late to add a callback"); + } this.before_fire_callbacks.push(func); } - // Register a function to be called when the fence fires. onAllCommitted(func) { - if (this.fired) - throw new Error("fence has already activated -- too late to " + - "add a callback"); + if (this.fired) { + throw new Error("fence has already activated -- too late to add a callback"); + } this.completion_callbacks.push(func); } @@ -72,56 +59,54 @@ DDPServer._WriteFence = class { const returnValue = new Promise(r => resolver = r); this.onAllCommitted(resolver); await this.arm(); - return returnValue; } - // Convenience function. Arms the fence, then blocks until it fires. - async armAndWait() { + + armAndWait() { return this._armAndWait(); } async _maybeFire() { - if (this.fired) + if (this.fired) { throw new Error("write fence already activated?"); - if (this.armed && !this.outstanding_writes) { - const invokeCallback = async (func) => { - try { - await func(this); - } catch (err) { - Meteor._debug("exception in write fence callback:", err); - } - }; + } - this.outstanding_writes++; - while (this.before_fire_callbacks.length > 0) { - const cb = this.before_fire_callbacks.shift(); - await invokeCallback(cb); - } - this.outstanding_writes--; + if (!this.armed || this.outstanding_writes > 0) { + return; + } - if (!this.outstanding_writes) { - this.fired = true; - const callbacks = this.completion_callbacks || []; - this.completion_callbacks = []; - while (callbacks.length > 0) { - const cb = callbacks.shift(); - await invokeCallback(cb); - } + const invokeCallback = async (func) => { + try { + await func(this); + } catch (err) { + Meteor._debug("exception in write fence callback:", err); } + }; + + this.outstanding_writes++; + + // Process all before_fire callbacks in parallel + const beforeCallbacks = [...this.before_fire_callbacks]; + this.before_fire_callbacks = []; + await Promise.all(beforeCallbacks.map(cb => invokeCallback(cb))); + + this.outstanding_writes--; + + if (this.outstanding_writes === 0) { + this.fired = true; + // Process all completion callbacks in parallel + const callbacks = [...this.completion_callbacks]; + this.completion_callbacks = []; + await Promise.all(callbacks.map(cb => invokeCallback(cb))); } } - // Deactivate this fence so that adding more writes has no effect. - // The fence must have already fired. retire() { - if (!this.fired) + if (!this.fired) { throw new Error("Can't retire a fence that hasn't fired."); + } this.retired = true; } }; -// The current write fence. When there is a current write fence, code -// that writes to databases should register their writes with it using -// beginWrite(). -// -DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable; +DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable; \ No newline at end of file diff --git a/packages/meteor/async_helpers.js b/packages/meteor/async_helpers.js index 240915eeed..bbf8bdc1bf 100644 --- a/packages/meteor/async_helpers.js +++ b/packages/meteor/async_helpers.js @@ -24,145 +24,6 @@ FakeDoubleEndedQueue.prototype.isEmpty = function () { Meteor._DoubleEndedQueue = Meteor.isServer ? Npm.require('denque') : FakeDoubleEndedQueue; -// Meteor._SynchronousQueue is a queue which runs task functions serially. -// Tasks are assumed to be synchronous: ie, it's assumed that they are -// done when they return. -// -// It has two methods: -// - queueTask queues a task to be run, and returns immediately. -// - runTask queues a task to be run, and then yields. It returns -// when the task finishes running. -// -// It's safe to call queueTask from within a task, but not runTask (unless -// you're calling runTask from a nested Fiber). -// -// Somewhat inspired by async.queue, but specific to blocking tasks. -// XXX break this out into an NPM module? -// XXX could maybe use the npm 'schlock' module instead, which would -// also support multiple concurrent "read" tasks -// -function AsynchronousQueue () { - this._taskHandles = new Meteor._DoubleEndedQueue(); - this._runningOrRunScheduled = false; - // This is true if we're currently draining. While we're draining, a further - // drain is a noop, to prevent infinite loops. "drain" is a heuristic type - // operation, that has a meaning like unto "what a naive person would expect - // when modifying a table from an observe" - this._draining = false; -} -Object.assign(AsynchronousQueue.prototype, { - queueTask(task) { - const self = this; - self._taskHandles.push({ - task: Meteor.bindEnvironment(task, function (e) { - Meteor._debug('Exception from task', e); - throw e; - }), - name: task.name - }); - self._scheduleRun(); - }, - - async _scheduleRun() { - // Already running or scheduled? Do nothing. - if (this._runningOrRunScheduled) - return; - - this._runningOrRunScheduled = true; - - let resolve; - const promise = new Promise(r => resolve = r); - const runImmediateHandle = (fn) => { - if (Meteor.isServer) { - Meteor._runFresh(() => setImmediate(fn)) - return; - } - setTimeout(fn, 0); - }; - runImmediateHandle(() => { - this._run().finally(resolve); - }); - return promise; - }, - - async _run() { - if (!this._runningOrRunScheduled) - throw new Error("expected to be _runningOrRunScheduled"); - - if (this._taskHandles.isEmpty()) { - // Done running tasks! Don't immediately schedule another run, but - // allow future tasks to do so. - this._runningOrRunScheduled = false; - return; - } - const taskHandle = this._taskHandles.shift(); - let exception; - // Run the task. - try { - await taskHandle.task(); - } catch (err) { - if (taskHandle.resolver) { - // We'll throw this exception through runTask. - exception = err; - } else { - Meteor._debug("Exception in queued task", err); - } - } - - // Soon, run the next task, if there is any. - this._runningOrRunScheduled = false; - this._scheduleRun(); - - if (taskHandle.resolver) { - if (exception) { - taskHandle.resolver(null, exception); - } else { - taskHandle.resolver(); - } - } - }, - - async runTask(task) { - let resolver; - const promise = new Promise( - (resolve, reject) => - (resolver = (res, rej) => { - if (rej) { - reject(rej); - return; - } - resolve(res); - }) - ); - - const handle = { - task, - name: task.name, - resolver, - }; - this._taskHandles.push(handle); - await this._scheduleRun(); - return promise; - }, - - flush() { - return this.runTask(() => { }); - }, - - async drain() { - if (this._draining) - return; - - this._draining = true; - while (!this._taskHandles.isEmpty()) { - await this.flush(); - } - this._draining = false; - } -}); - -Meteor._AsynchronousQueue = AsynchronousQueue; - // Sleep. Mostly used for debugging (eg, inserting latency into server // methods). diff --git a/packages/meteor/asynchronous_queue.js b/packages/meteor/asynchronous_queue.js new file mode 100644 index 0000000000..3d0a80de35 --- /dev/null +++ b/packages/meteor/asynchronous_queue.js @@ -0,0 +1,171 @@ +class AsynchronousQueue { + /** + * Creates a queue that processes tasks in parallel batches while preserving completion order + * when needed. Configurable batch size and concurrency limits help optimize throughput. + * + * Batch size and concurrency are configured via environment variables: + * - METEOR_ASYNC_QUEUE_BATCH_SIZE: Number of tasks to process in each batch (default: 128) + * - METEOR_ASYNC_QUEUE_MAX_CONCURRENT: Maximum number of concurrent tasks (default: 16) + * + * @param {Object} options + * @param {boolean} [options.orderMatters=true] Whether task completion order should be preserved + */ + constructor({ orderMatters = true } = {}) { + this._batchSize = parseInt( + process.env.METEOR_ASYNC_QUEUE_BATCH_SIZE || + '128' + ); + + this._maxConcurrent = parseInt( + process.env.METEOR_ASYNC_QUEUE_MAX_CONCURRENT || + '16' + ); + + this._orderMatters = orderMatters; + + this._taskHandles = new Meteor._DoubleEndedQueue(); + this._runningOrRunScheduled = false; + this._draining = false; + this._activePromises = new Set(); + } + + queueTask(task) { + const wrappedTask = Meteor.bindEnvironment(task, function (e) { + Meteor._debug('Exception from task', e); + throw e; + }); + + this._taskHandles.push({ + task: wrappedTask, + name: task.name + }); + + this._scheduleRun(); + } + + async _scheduleRun() { + if (this._runningOrRunScheduled) return; + this._runningOrRunScheduled = true; + + const runImmediateHandle = (fn) => { + if (Meteor.isServer) { + Meteor._runFresh(() => setImmediate(fn)); + return; + } + setTimeout(fn, 0); + }; + + return new Promise(resolve => { + runImmediateHandle(() => { + this._run().finally(resolve); + }); + }); + } + + async _run() { + if (!this._runningOrRunScheduled) { + throw new Error("expected to be _runningOrRunScheduled"); + } + + if (this._taskHandles.isEmpty()) { + this._runningOrRunScheduled = false; + return; + } + + // Collect tasks for the current batch + const batch = []; + while (batch.length < this._batchSize && !this._taskHandles.isEmpty()) { + batch.push(this._taskHandles.shift()); + } + + // Process batch + if (this._orderMatters) { + await this._processOrderedBatch(batch); + } else { + await this._processParallelBatch(batch); + } + + // Schedule next batch if there are more tasks + this._runningOrRunScheduled = false; + if (!this._taskHandles.isEmpty()) { + this._scheduleRun(); + } + } + + async _processParallelBatch(batch) { + const taskPromises = batch.map(async taskHandle => { + try { + const promise = taskHandle.task(); + this._activePromises.add(promise); + const result = await promise; + this._activePromises.delete(promise); + + if (taskHandle.resolver) { + taskHandle.resolver(result); + } + } catch (err) { + if (taskHandle.resolver) { + taskHandle.resolver(null, err); + } else { + Meteor._debug("Exception in queued task", err); + } + } + }); + + // Process in chunks to control concurrency + for (let i = 0; i < taskPromises.length; i += this._maxConcurrent) { + const chunk = taskPromises.slice(i, i + this._maxConcurrent); + await Promise.all(chunk); + } + } + + async _processOrderedBatch(batch) { + for (const taskHandle of batch) { + try { + const result = await taskHandle.task(); + if (taskHandle.resolver) { + taskHandle.resolver(result); + } + } catch (err) { + if (taskHandle.resolver) { + taskHandle.resolver(null, err); + } else { + Meteor._debug("Exception in queued task", err); + } + } + } + } + + async runTask(task) { + return new Promise((resolve, reject) => { + const resolver = (res, err) => err ? reject(err) : resolve(res); + this._taskHandles.push({ task, name: task.name, resolver }); + this._scheduleRun(); + }); + } + + flush() { + return this.runTask(() => {}); + } + + async drain() { + if (this._draining) return; + this._draining = true; + + while (!this._taskHandles.isEmpty() || this._activePromises.size > 0) { + await this.flush(); + if (this._activePromises.size > 0) { + await Promise.all(Array.from(this._activePromises)); + } + } + + this._draining = false; + } +} + +Meteor._AsynchronousQueue = AsynchronousQueue; + +/** + * Backwards compatibility + */ +Meteor._SynchronousQueue = AsynchronousQueue; \ No newline at end of file diff --git a/packages/meteor/package.js b/packages/meteor/package.js index ba8439fc19..f9e81a4def 100644 --- a/packages/meteor/package.js +++ b/packages/meteor/package.js @@ -34,6 +34,7 @@ Package.onUse(function (api) { api.addFiles('timers.js', ['client', 'server']); api.addFiles('errors.js', ['client', 'server']); api.addFiles('asl-helpers.js', 'server'); + api.addFiles('asynchronous_queue.js', 'server'); api.addFiles('async_helpers.js', ['client', 'server']); api.addFiles('fiber_stubs_client.js', 'client'); api.addFiles('asl-helpers-client.js', 'client'); diff --git a/packages/mongo/observe_handle.ts b/packages/mongo/observe_handle.ts index b2d1924088..66235fa920 100644 --- a/packages/mongo/observe_handle.ts +++ b/packages/mongo/observe_handle.ts @@ -4,24 +4,30 @@ let nextObserveHandleId = 1; export type ObserveHandleCallbackInternal = '_added' | '_addedBefore' | '_changed' | '_movedBefore' | '_removed'; + +export type Callback = (...args: T[]) => Promise | void; + /** * The "observe handle" returned from observeChanges. * Contains a reference to an ObserveMultiplexer. * Used to stop observation and clean up resources. */ -export class ObserveHandle { +export class ObserveHandle { _id: number; _multiplexer: ObserveMultiplexer; nonMutatingCallbacks: boolean; _stopped: boolean; - _added?: (...args: any[]) => void; - _addedBefore?: (...args: any[]) => void; - _changed?: (...args: any[]) => void; - _movedBefore?: (...args: any[]) => void; - _removed?: (...args: any[]) => void; + public initialAddsSentResolver: (value: void) => void = () => {}; + public initialAddsSent: Promise - constructor(multiplexer: any, callbacks: Record, nonMutatingCallbacks: boolean) { + _added?: Callback; + _addedBefore?: Callback; + _changed?: Callback; + _movedBefore?: Callback; + _removed?: Callback; + + constructor(multiplexer: any, callbacks: Record>, nonMutatingCallbacks: boolean) { this._multiplexer = multiplexer; multiplexer.callbackNames().forEach((name: ObserveHandleCallback) => { @@ -40,6 +46,20 @@ export class ObserveHandle { this._stopped = false; this._id = nextObserveHandleId++; this.nonMutatingCallbacks = nonMutatingCallbacks; + + this.initialAddsSent = new Promise(resolve => { + const ready = () => { + resolve(); + this.initialAddsSent = Promise.resolve(); + } + + const timeout = setTimeout(ready, 30000) + + this.initialAddsSentResolver = () => { + ready(); + clearTimeout(timeout); + }; + }); } async stop() { diff --git a/packages/mongo/observe_multiplex.ts b/packages/mongo/observe_multiplex.ts index 559c983ab1..a99d64d562 100644 --- a/packages/mongo/observe_multiplex.ts +++ b/packages/mongo/observe_multiplex.ts @@ -1,5 +1,5 @@ -import { ObserveHandle } from './observe_handle'; import isEmpty from 'lodash.isempty'; +import { ObserveHandle } from './observe_handle'; interface ObserveMultiplexerOptions { ordered: boolean; @@ -34,7 +34,6 @@ export class ObserveMultiplexer { this._ordered = ordered; this._onStop = onStop; - // @ts-ignore this._queue = new Meteor._AsynchronousQueue(); this._handles = {}; this._resolver = null; @@ -140,7 +139,7 @@ export class ObserveMultiplexer { return !!this._isReady; } - _applyCallback(callbackName: string, args: any[]): void { + _applyCallback(callbackName: string, args: any[]) { this._queue.queueTask(async () => { if (!this._handles) return; @@ -152,31 +151,43 @@ export class ObserveMultiplexer { for (const handleId of Object.keys(this._handles)) { const handle = this._handles && this._handles[handleId]; + if (!handle) return; + const callback = (handle as any)[`_${callbackName}`]; - callback && (await callback.apply( + + if (!callback) continue; + + handle.initialAddsSent.then(callback.apply( null, handle.nonMutatingCallbacks ? args : EJSON.clone(args) - )); + )) } }); } async _sendAdds(handle: ObserveHandle): Promise { const add = this._ordered ? handle._addedBefore : handle._added; - if (!add) return; - await this._cache.docs.forEachAsync(async (doc: any, id: string) => { - if (!(handle._id in this._handles!)) + const addPromises: Promise[] = []; + + this._cache.docs.forEach((doc: any, id: string) => { + if (!(handle._id in this._handles!)) { throw Error("handle got removed before sending initial adds!"); + } const { _id, ...fields } = handle.nonMutatingCallbacks ? doc : EJSON.clone(doc); - if (this._ordered) - await add(id, fields, null); - else - await add(id, fields); + const promise = this._ordered ? + add(id, fields, null) : + add(id, fields); + + addPromises.push(promise); }); + + await Promise.all(addPromises); + + handle.initialAddsSentResolver(); } } \ No newline at end of file diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index c857a14f48..cb67ac7fc5 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -87,6 +87,7 @@ export const OplogObserveDriver = function (options) { self._comparator = null; self._sorter = null; self._unpublishedBuffer = null; + // Memory Growth self._published = new LocalCollection._IdMap; } diff --git a/packages/mongo/oplog_tailing.ts b/packages/mongo/oplog_tailing.ts index 440339fa6f..d56bf8e8eb 100644 --- a/packages/mongo/oplog_tailing.ts +++ b/packages/mongo/oplog_tailing.ts @@ -11,7 +11,7 @@ export const OPLOG_COLLECTION = 'oplog.rs'; let TOO_FAR_BEHIND = +(process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000); const TAIL_TIMEOUT = +(process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000); -interface OplogEntry { +export interface OplogEntry { op: string; o: any; o2?: any; @@ -19,12 +19,12 @@ interface OplogEntry { ns: string; } -interface CatchingUpResolver { +export interface CatchingUpResolver { ts: any; resolver: () => void; } -interface OplogTrigger { +export interface OplogTrigger { dropCollection: boolean; dropDatabase: boolean; op: OplogEntry; @@ -34,7 +34,7 @@ interface OplogTrigger { export class OplogHandle { private _oplogUrl: string; - private _dbName: string; + public _dbName: string; private _oplogLastEntryConnection: MongoConnection | null; private _oplogTailConnection: MongoConnection | null; private _oplogOptions: { excludeCollections?: string[]; includeCollections?: string[] } | null; @@ -42,16 +42,18 @@ export class OplogHandle { private _tailHandle: any; private _readyPromiseResolver: (() => void) | null; private _readyPromise: Promise; - private _crossbar: any; + public _crossbar: any; private _baseOplogSelector: any; private _catchingUpResolvers: CatchingUpResolver[]; private _lastProcessedTS: any; private _onSkippedEntriesHook: any; - private _entryQueue: any; - private _workerActive: boolean; private _startTrailingPromise: Promise; private _resolveTimeout: any; + private _entryQueue = new Meteor._DoubleEndedQueue(); + private _workerActive = false; + private _workerPromise: Promise | null = null; + constructor(oplogUrl: string, dbName: string) { this._oplogUrl = oplogUrl; this._dbName = dbName; @@ -90,10 +92,6 @@ export class OplogHandle { debugPrintExceptions: "onSkippedEntries callback" }); - // @ts-ignore - this._entryQueue = new Meteor._DoubleEndedQueue(); - this._workerActive = false; - this._startTrailingPromise = this._startTailing(); } @@ -298,64 +296,11 @@ export class OplogHandle { } private _maybeStartWorker(): void { - if (this._workerActive) return; + if (this._workerPromise) return; this._workerActive = true; - Meteor.defer(async () => { - // May be called recursively in case of transactions. - const handleDoc = async (doc: OplogEntry): Promise => { - if (doc.ns === "admin.$cmd") { - if (doc.o.applyOps) { - // This was a successful transaction, so we need to apply the - // operations that were involved. - let nextTimestamp = doc.ts; - for (const op of doc.o.applyOps) { - // See https://github.com/meteor/meteor/issues/10420. - if (!op.ts) { - op.ts = nextTimestamp; - nextTimestamp = nextTimestamp.add(Long.ONE); - } - await handleDoc(op); - } - return; - } - throw new Error("Unknown command " + JSON.stringify(doc)); - } - - const trigger: OplogTrigger = { - dropCollection: false, - dropDatabase: false, - op: doc, - }; - - if (typeof doc.ns === "string" && doc.ns.startsWith(this._dbName + ".")) { - trigger.collection = doc.ns.slice(this._dbName.length + 1); - } - - // Is it a special command and the collection name is hidden - // somewhere in operator? - if (trigger.collection === "$cmd") { - if (doc.o.dropDatabase) { - delete trigger.collection; - trigger.dropDatabase = true; - } else if ("drop" in doc.o) { - trigger.collection = doc.o.drop; - trigger.dropCollection = true; - trigger.id = null; - } else if ("create" in doc.o && "idIndex" in doc.o) { - // A collection got implicitly created within a transaction. There's - // no need to do anything about it. - } else { - throw Error("Unknown command " + JSON.stringify(doc)); - } - } else { - // All other ops have an id. - trigger.id = idForOp(doc); - } - - await this._crossbar.fire(trigger); - }; - + // Convert to a proper promise-based queue processor + this._workerPromise = (async () => { try { while (!this._stopped && !this._entryQueue.isEmpty()) { // Are we too far behind? Just tell our observers that they need to @@ -375,23 +320,25 @@ export class OplogHandle { continue; } + // Process next batch from the queue const doc = this._entryQueue.shift(); - // Fire trigger(s) for this doc. - await handleDoc(doc); - - // Now that we've processed this operation, process pending - // sequencers. - if (doc.ts) { - this._setLastProcessedTS(doc.ts); - } else { - throw Error("oplog entry without ts: " + JSON.stringify(doc)); + try { + await handleDoc(this, doc); + // Process any waiting fence callbacks + if (doc.ts) { + this._setLastProcessedTS(doc.ts); + } + } catch (e) { + // Keep processing queue even if one entry fails + console.error('Error processing oplog entry:', e); } } } finally { + this._workerPromise = null; this._workerActive = false; } - }); + })(); } _setLastProcessedTS(ts: any): void { @@ -422,3 +369,58 @@ export function idForOp(op: OplogEntry): string { throw Error("Unknown op: " + JSON.stringify(op)); } } + +async function handleDoc(handle: OplogHandle, doc: OplogEntry): Promise { + if (doc.ns === "admin.$cmd") { + if (doc.o.applyOps) { + // This was a successful transaction, so we need to apply the + // operations that were involved. + let nextTimestamp = doc.ts; + for (const op of doc.o.applyOps) { + // See https://github.com/meteor/meteor/issues/10420. + if (!op.ts) { + op.ts = nextTimestamp; + nextTimestamp = nextTimestamp.add(Long.ONE); + } + await handleDoc(handle, op); + } + return; + } + throw new Error("Unknown command " + JSON.stringify(doc)); + } + + const trigger: OplogTrigger = { + dropCollection: false, + dropDatabase: false, + op: doc, + }; + + if (typeof doc.ns === "string" && doc.ns.startsWith(handle._dbName + ".")) { + trigger.collection = doc.ns.slice(handle._dbName.length + 1); + } + + // Is it a special command and the collection name is hidden + // somewhere in operator? + if (trigger.collection === "$cmd") { + if (doc.o.dropDatabase) { + delete trigger.collection; + trigger.dropDatabase = true; + } else if ("drop" in doc.o) { + trigger.collection = doc.o.drop; + trigger.dropCollection = true; + trigger.id = null; + } else if ("create" in doc.o && "idIndex" in doc.o) { + // A collection got implicitly created within a transaction. There's + // no need to do anything about it. + } else { + throw Error("Unknown command " + JSON.stringify(doc)); + } + } else { + // All other ops have an id. + trigger.id = idForOp(doc); + } + + await handle._crossbar.fire(trigger); + + await new Promise(resolve => setImmediate(resolve)); +} \ No newline at end of file diff --git a/packages/mongo/oplog_v2_converter.js b/packages/mongo/oplog_v2_converter.js deleted file mode 100644 index 6cb6b4c726..0000000000 --- a/packages/mongo/oplog_v2_converter.js +++ /dev/null @@ -1,127 +0,0 @@ -// Converter of the new MongoDB Oplog format (>=5.0) to the one that Meteor -// handles well, i.e., `$set` and `$unset`. The new format is completely new, -// and looks as follows: -// -// { $v: 2, diff: Diff } -// -// where `Diff` is a recursive structure: -// -// { -// // Nested updates (sometimes also represented with an s-field). -// // Example: `{ $set: { 'foo.bar': 1 } }`. -// i: { : , ... }, -// -// // Top-level updates. -// // Example: `{ $set: { foo: { bar: 1 } } }`. -// u: { : , ... }, -// -// // Unsets. -// // Example: `{ $unset: { foo: '' } }`. -// d: { : false, ... }, -// -// // Array operations. -// // Example: `{ $push: { foo: 'bar' } }`. -// s: { a: true, u: , ... }, -// ... -// -// // Nested operations (sometimes also represented in the `i` field). -// // Example: `{ $set: { 'foo.bar': 1 } }`. -// s: Diff, -// ... -// } -// -// (all fields are optional). - -function join(prefix, key) { - return prefix ? `${prefix}.${key}` : key; -} - -const arrayOperatorKeyRegex = /^(a|[su]\d+)$/; - -function isArrayOperatorKey(field) { - return arrayOperatorKeyRegex.test(field); -} - -function isArrayOperator(operator) { - return operator.a === true && Object.keys(operator).every(isArrayOperatorKey); -} - -function flattenObjectInto(target, source, prefix) { - if (Array.isArray(source) || typeof source !== 'object' || source === null || - source instanceof Mongo.ObjectID) { - target[prefix] = source; - } else { - const entries = Object.entries(source); - if (entries.length) { - entries.forEach(([key, value]) => { - flattenObjectInto(target, value, join(prefix, key)); - }); - } else { - target[prefix] = source; - } - } -} - -const logDebugMessages = !!process.env.OPLOG_CONVERTER_DEBUG; - -function convertOplogDiff(oplogEntry, diff, prefix) { - if (logDebugMessages) { - console.log(`convertOplogDiff(${JSON.stringify(oplogEntry)}, ${JSON.stringify(diff)}, ${JSON.stringify(prefix)})`); - } - - Object.entries(diff).forEach(([diffKey, value]) => { - if (diffKey === 'd') { - // Handle `$unset`s. - oplogEntry.$unset ??= {}; - Object.keys(value).forEach(key => { - oplogEntry.$unset[join(prefix, key)] = true; - }); - } else if (diffKey === 'i') { - // Handle (potentially) nested `$set`s. - oplogEntry.$set ??= {}; - flattenObjectInto(oplogEntry.$set, value, prefix); - } else if (diffKey === 'u') { - // Handle flat `$set`s. - oplogEntry.$set ??= {}; - Object.entries(value).forEach(([key, value]) => { - oplogEntry.$set[join(prefix, key)] = value; - }); - } else { - // Handle s-fields. - const key = diffKey.slice(1); - if (isArrayOperator(value)) { - // Array operator. - Object.entries(value).forEach(([position, value]) => { - if (position === 'a') { - return; - } - - const positionKey = join(join(prefix, key), position.slice(1)); - if (position[0] === 's') { - convertOplogDiff(oplogEntry, value, positionKey); - } else if (value === null) { - oplogEntry.$unset ??= {}; - oplogEntry.$unset[positionKey] = true; - } else { - oplogEntry.$set ??= {}; - oplogEntry.$set[positionKey] = value; - } - }); - } else if (key) { - // Nested object. - convertOplogDiff(oplogEntry, value, join(prefix, key)); - } - } - }); -} - -export function oplogV2V1Converter(oplogEntry) { - // Pass-through v1 and (probably) invalid entries. - if (oplogEntry.$v !== 2 || !oplogEntry.diff) { - return oplogEntry; - } - - const convertedOplogEntry = { $v: 2 }; - convertOplogDiff(convertedOplogEntry, oplogEntry.diff, ''); - return convertedOplogEntry; -} diff --git a/packages/mongo/oplog_v2_converter.ts b/packages/mongo/oplog_v2_converter.ts new file mode 100644 index 0000000000..708f2d00c8 --- /dev/null +++ b/packages/mongo/oplog_v2_converter.ts @@ -0,0 +1,204 @@ +/** + * Converter module for the new MongoDB Oplog format (>=5.0) to the one that Meteor + * handles well, i.e., `$set` and `$unset`. The new format is completely new, + * and looks as follows: + * + * ```js + * { $v: 2, diff: Diff } + * ``` + * + * where `Diff` is a recursive structure: + * ```js + * { + * // Nested updates (sometimes also represented with an s-field). + * // Example: `{ $set: { 'foo.bar': 1 } }`. + * i: { : , ... }, + * + * // Top-level updates. + * // Example: `{ $set: { foo: { bar: 1 } } }`. + * u: { : , ... }, + * + * // Unsets. + * // Example: `{ $unset: { foo: '' } }`. + * d: { : false, ... }, + * + * // Array operations. + * // Example: `{ $push: { foo: 'bar' } }`. + * s: { a: true, u: , ... }, + * ... + * + * // Nested operations (sometimes also represented in the `i` field). + * // Example: `{ $set: { 'foo.bar': 1 } }`. + * s: Diff, + * ... + * } + * ``` + * + * (all fields are optional) + */ + +import { EJSON } from 'meteor/ejson'; + +interface OplogEntry { + $v: number; + diff?: OplogDiff; + $set?: Record; + $unset?: Record; +} + +interface OplogDiff { + i?: Record; + u?: Record; + d?: Record; + [key: `s${string}`]: ArrayOperator | Record; +} + +interface ArrayOperator { + a: true; + [key: `u${number}`]: any; +} + +const arrayOperatorKeyRegex = /^(a|[su]\d+)$/; + +/** + * Checks if a field is an array operator key of form 'a' or 's1' or 'u1' etc + */ +function isArrayOperatorKey(field: string): boolean { + return arrayOperatorKeyRegex.test(field); +} + +/** + * Type guard to check if an operator is a valid array operator. + * Array operators have 'a: true' and keys that match the arrayOperatorKeyRegex + */ +function isArrayOperator(operator: unknown): operator is ArrayOperator { + return ( + operator !== null && + typeof operator === 'object' && + 'a' in operator && + (operator as ArrayOperator).a === true && + Object.keys(operator).every(isArrayOperatorKey) + ); +} + +/** + * Joins two parts of a field path with a dot. + * Returns the key itself if prefix is empty. + */ +function join(prefix: string, key: string): string { + return prefix ? `${prefix}.${key}` : key; +} + +/** + * Recursively flattens an object into a target object with dot notation paths. + * Handles special cases: + * - Arrays are assigned directly + * - Custom EJSON types are preserved + * - Mongo.ObjectIDs are preserved + * - Plain objects are recursively flattened + * - Empty objects are assigned directly + */ +function flattenObjectInto( + target: Record, + source: any, + prefix: string +): void { + if ( + Array.isArray(source) || + typeof source !== 'object' || + source === null || + source instanceof Mongo.ObjectID || + EJSON._isCustomType(source) + ) { + target[prefix] = source; + return; + } + + const entries = Object.entries(source); + if (entries.length) { + entries.forEach(([key, value]) => { + flattenObjectInto(target, value, join(prefix, key)); + }); + } else { + target[prefix] = source; + } +} + +/** + * Converts an oplog diff to a series of $set and $unset operations. + * Handles several types of operations: + * - Direct unsets via 'd' field + * - Nested sets via 'i' field + * - Top-level sets via 'u' field + * - Array operations and nested objects via 's' prefixed fields + * + * Preserves the structure of EJSON custom types and ObjectIDs while + * flattening paths into dot notation for MongoDB updates. + */ +function convertOplogDiff( + oplogEntry: OplogEntry, + diff: OplogDiff, + prefix = '' +): void { + Object.entries(diff).forEach(([diffKey, value]) => { + if (diffKey === 'd') { + // Handle `$unset`s + oplogEntry.$unset ??= {}; + Object.keys(value).forEach(key => { + oplogEntry.$unset![join(prefix, key)] = true; + }); + } else if (diffKey === 'i') { + // Handle (potentially) nested `$set`s + oplogEntry.$set ??= {}; + flattenObjectInto(oplogEntry.$set, value, prefix); + } else if (diffKey === 'u') { + // Handle flat `$set`s + oplogEntry.$set ??= {}; + Object.entries(value).forEach(([key, fieldValue]) => { + oplogEntry.$set![join(prefix, key)] = fieldValue; + }); + } else if (diffKey.startsWith('s')) { + // Handle s-fields (array operations and nested objects) + const key = diffKey.slice(1); + if (isArrayOperator(value)) { + // Array operator + Object.entries(value).forEach(([position, fieldValue]) => { + if (position === 'a') return; + + const positionKey = join(prefix, `${key}.${position.slice(1)}`); + if (position[0] === 's') { + convertOplogDiff(oplogEntry, fieldValue, positionKey); + } else if (fieldValue === null) { + oplogEntry.$unset ??= {}; + oplogEntry.$unset[positionKey] = true; + } else { + oplogEntry.$set ??= {}; + oplogEntry.$set[positionKey] = fieldValue; + } + }); + } else if (key) { + // Nested object + convertOplogDiff(oplogEntry, value, join(prefix, key)); + } + } + }); +} + +/** + * Converts a MongoDB v2 oplog entry to v1 format. + * Returns the original entry unchanged if it's not a v2 oplog entry + * or doesn't contain a diff field. + * + * The converted entry will contain $set and $unset operations that are + * equivalent to the v2 diff format, with paths flattened to dot notation + * and special handling for EJSON custom types and ObjectIDs. + */ +export function oplogV2V1Converter(oplogEntry: OplogEntry): OplogEntry { + if (oplogEntry.$v !== 2 || !oplogEntry.diff) { + return oplogEntry; + } + + const convertedOplogEntry: OplogEntry = { $v: 2 }; + convertOplogDiff(convertedOplogEntry, oplogEntry.diff); + return convertedOplogEntry; +} \ No newline at end of file diff --git a/packages/mongo/package.js b/packages/mongo/package.js index 16d28e6e72..5d7a744985 100644 --- a/packages/mongo/package.js +++ b/packages/mongo/package.js @@ -89,7 +89,7 @@ Package.onUse(function (api) { "doc_fetcher.js", "polling_observe_driver.ts", "oplog_observe_driver.js", - "oplog_v2_converter.js", + "oplog_v2_converter.ts", "cursor_description.ts", "mongo_connection.js", "mongo_common.js", diff --git a/packages/npm-mongo/package.js b/packages/npm-mongo/package.js index cc7a598051..1e546f940b 100644 --- a/packages/npm-mongo/package.js +++ b/packages/npm-mongo/package.js @@ -3,12 +3,12 @@ Package.describe({ summary: "Wrapper around the mongo npm package", - version: "6.10.0", + version: "6.10.1", documentation: null, }); Npm.depends({ - mongodb: "6.10.0" + mongodb: "6.9.0" }); Package.onUse(function (api) { diff --git a/packages/socket-stream-client/node.js b/packages/socket-stream-client/node.js index 708c8a1711..26bae21b50 100644 --- a/packages/socket-stream-client/node.js +++ b/packages/socket-stream-client/node.js @@ -1,6 +1,6 @@ import { Meteor } from "meteor/meteor"; -import { toWebsocketUrl } from "./urls.js"; import { StreamClientCommon } from "./common.js"; +import { toWebsocketUrl } from "./urls.js"; // @param endpoint {String} URL to Meteor app // "http://subdomain.meteor.com/" or "/" or @@ -132,7 +132,7 @@ export class ClientStream extends StreamClientCommon { // require the module if we actually create a server-to-server // connection. var FayeWebSocket = Npm.require('faye-websocket'); - var deflate = Npm.require('permessage-deflate'); + var deflate = Npm.require('permessage-deflate2'); var targetUrl = toWebsocketUrl(this.endpoint); var fayeOptions = { diff --git a/packages/socket-stream-client/package.js b/packages/socket-stream-client/package.js index a49c533e6d..b15e88267a 100644 --- a/packages/socket-stream-client/package.js +++ b/packages/socket-stream-client/package.js @@ -7,7 +7,7 @@ Package.describe({ Npm.depends({ "faye-websocket": "0.11.4", - "permessage-deflate": "0.1.7", + "permessage-deflate2": "0.1.8", "lodash.isequal": "4.5.0", "lodash.once": "4.1.1" }); diff --git a/scripts/admin/check-legacy-syntax/check-syntax.js b/scripts/admin/check-legacy-syntax/check-syntax.js index 66fb4fd815..fdbe82b061 100644 --- a/scripts/admin/check-legacy-syntax/check-syntax.js +++ b/scripts/admin/check-legacy-syntax/check-syntax.js @@ -34,6 +34,7 @@ const packages = { // Ignored server files that has a features > 2016 ignoredFiles: [ "async_helpers.js", + "asynchronous_queue.js", ] }, "accounts-ui": {}, diff --git a/v3-docs/docs/.vitepress/config.mts b/v3-docs/docs/.vitepress/config.mts index a004c2867b..b199d78b91 100644 --- a/v3-docs/docs/.vitepress/config.mts +++ b/v3-docs/docs/.vitepress/config.mts @@ -431,6 +431,15 @@ export default defineConfig({ ], collapsed: true, }, + { + text: "Performance", + items: [ + { + text: "WebSocket Compression", + link: "/performance/websocket-compression", + }, + ], + }, ], socialLinks: [ diff --git a/v3-docs/docs/performance/websocket-compression.md b/v3-docs/docs/performance/websocket-compression.md new file mode 100644 index 0000000000..e7c117f155 --- /dev/null +++ b/v3-docs/docs/performance/websocket-compression.md @@ -0,0 +1,104 @@ +# Websocket Compression in Meteor + +::: warning +Modifying websocket compression settings without understanding your application's DDP messaging patterns can negatively impact performance. Before changing these settings, you should: +- Use [Meteor DevTools Evolved](https://chromewebstore.google.com/detail/meteor-devtools-evolved/ibniinmoafhgbifjojidlagmggecmpgf) or your browser's Network tab to monitor WebSocket traffic +- Analyze your DDP message frequency and payload sizes +- Test changes in a staging environment with realistic data and user load +::: + +Meteor's stream server uses the permessage-deflate extension for websocket compression by default. While compression can help reduce bandwidth usage, it may impact performance in reactivity-intensive applications due to the computational overhead of compressing numerous DDP messages. + +## Configuration + +### Disabling Compression + +You can disable websocket compression by setting the `SERVER_WEBSOCKET_COMPRESSION` environment variable to `false`: + +```bash +SERVER_WEBSOCKET_COMPRESSION=false +``` + +### Custom Compression Settings + +To customize compression settings, set `SERVER_WEBSOCKET_COMPRESSION` to a JSON string with your desired configuration: + +```bash +# Example with custom settings +SERVER_WEBSOCKET_COMPRESSION='{"threshold": 2048, "level": 1}' +``` + +Available configuration options: + +- `threshold`: Minimum message size (in bytes) before compression is applied (default: 1024) +- `level`: Compression level (0-9, where 0=none, 1=fastest, 9=best compression) +- `memLevel`: Memory level (1-9, lower uses less memory) +- `noContextTakeover`: When true, compressor resets for each message (default: true) +- `maxWindowBits`: Window size for compression (9-15, lower uses less memory) + +## Configuration Examples + +Here are recommended configurations for different types of applications: + +### High-Frequency Updates / Real-Time Dashboard + +For applications with frequent small updates (e.g., real-time dashboards, trading platforms): + +```bash +# Disable compression for optimal performance with small, frequent updates +SERVER_WEBSOCKET_COMPRESSION=false +``` + +### Large Data Transfers + +For applications transferring large datasets (e.g., file sharing, data visualization): + +```bash +# Optimize for large data transfers +SERVER_WEBSOCKET_COMPRESSION='{"threshold": 1024, "level": 6, "memLevel": 8}' +``` + +### Memory-Constrained Environment + +For deployments with limited memory resources: + +```bash +# Minimize memory usage while maintaining compression +SERVER_WEBSOCKET_COMPRESSION='{"threshold": 2048, "level": 1, "memLevel": 1, "maxWindowBits": 9}' +``` + +### Balanced Configuration + +For typical applications with mixed message sizes: + +```bash +# Balance between compression and performance +SERVER_WEBSOCKET_COMPRESSION='{"threshold": 1536, "level": 3, "memLevel": 4}' +``` + +## Verifying Compression Status + +You can check if compression is enabled through the Meteor shell: + +```javascript +Meteor.server.stream_server.server.options.faye_server_options.extensions +``` + +Results interpretation: +- `[]` (empty array): Compression is disabled +- `[{}]` (array with object): Compression is enabled + +## Performance Considerations + +- For apps with high message throughput or frequent small updates, disabling compression may improve performance +- Large message payloads may benefit from compression, especially over slower network connections +- Consider monitoring CPU usage and response times when adjusting compression settings + +## Default Configuration + +When enabled, the default configuration uses: +- Compression threshold: 1024 bytes +- Compression level: Z_BEST_SPEED (fastest) +- Memory level: Z_MIN_MEMLEVEL (minimum memory usage) +- Context takeover: Disabled +- Window bits: Z_MIN_WINDOWBITS (minimum window size)