mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Finish removing underscore from ddp-client.
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import { Meteor } from 'meteor/meteor';
|
||||
import { _ } from 'meteor/underscore';
|
||||
import { DDPCommon } from 'meteor/ddp-common';
|
||||
import { Tracker } from 'meteor/tracker';
|
||||
import { EJSON } from 'meteor/ejson';
|
||||
@@ -9,6 +8,13 @@ import { MongoID } from 'meteor/mongo-id';
|
||||
import { DDP } from './namespace.js';
|
||||
import getClientStreamClass from './getClientStreamClass.js';
|
||||
import MethodInvoker from './MethodInvoker.js';
|
||||
import {
|
||||
hasOwn,
|
||||
slice,
|
||||
keys,
|
||||
isEmpty,
|
||||
last,
|
||||
} from "./utils.js";
|
||||
|
||||
if (Meteor.isServer) {
|
||||
var Fiber = Npm.require('fibers');
|
||||
@@ -234,9 +240,11 @@ export class Connection {
|
||||
self._userIdDeps = new Tracker.Dependency();
|
||||
|
||||
// Block auto-reload while we're waiting for method responses.
|
||||
if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) {
|
||||
if (Meteor.isClient &&
|
||||
Package.reload &&
|
||||
! options.reloadWithOutstanding) {
|
||||
Package.reload.Reload._onMigrate(retry => {
|
||||
if (!self._readyToMigrate()) {
|
||||
if (! self._readyToMigrate()) {
|
||||
if (self._retryMigrate)
|
||||
throw new Error('Two migrations in progress?');
|
||||
self._retryMigrate = retry;
|
||||
@@ -288,31 +296,27 @@ export class Connection {
|
||||
// Wrap the input object in an object which makes any store method not
|
||||
// implemented by 'store' into a no-op.
|
||||
var store = Object.create(null);
|
||||
_.each(
|
||||
[
|
||||
'update',
|
||||
'beginUpdate',
|
||||
'endUpdate',
|
||||
'saveOriginals',
|
||||
'retrieveOriginals',
|
||||
'getDoc',
|
||||
'_getCollection'
|
||||
],
|
||||
method => {
|
||||
store[method] = (...args) => {
|
||||
return wrappedStore[method]
|
||||
? wrappedStore[method].apply(wrappedStore, args)
|
||||
: undefined;
|
||||
};
|
||||
}
|
||||
);
|
||||
[ 'update',
|
||||
'beginUpdate',
|
||||
'endUpdate',
|
||||
'saveOriginals',
|
||||
'retrieveOriginals',
|
||||
'getDoc',
|
||||
'_getCollection'
|
||||
].forEach(method => {
|
||||
store[method] = (...args) => {
|
||||
if (wrappedStore[method]) {
|
||||
return wrappedStore[method](...args);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
self._stores[name] = store;
|
||||
|
||||
var queued = self._updatesForUnknownStores[name];
|
||||
if (queued) {
|
||||
store.beginUpdate(queued.length, false);
|
||||
_.each(queued, msg => {
|
||||
queued.forEach(msg => {
|
||||
store.update(msg);
|
||||
});
|
||||
store.endUpdate();
|
||||
@@ -340,21 +344,19 @@ export class Connection {
|
||||
subscribe(name /* .. [arguments] .. (callback|callbacks) */) {
|
||||
var self = this;
|
||||
|
||||
var params = Array.prototype.slice.call(arguments, 1);
|
||||
var params = slice.call(arguments, 1);
|
||||
var callbacks = Object.create(null);
|
||||
if (params.length) {
|
||||
var lastParam = params[params.length - 1];
|
||||
if (typeof lastParam === 'function') {
|
||||
callbacks.onReady = params.pop();
|
||||
} else if (
|
||||
lastParam &&
|
||||
} else if (lastParam && [
|
||||
lastParam.onReady,
|
||||
// XXX COMPAT WITH 1.0.3.1 onError used to exist, but now we use
|
||||
// onStop with an error callback instead.
|
||||
_.any(
|
||||
[lastParam.onReady, lastParam.onError, lastParam.onStop],
|
||||
f => typeof f === 'function'
|
||||
)
|
||||
) {
|
||||
lastParam.onError,
|
||||
lastParam.onStop
|
||||
].some(f => typeof f === "function")) {
|
||||
callbacks = params.pop();
|
||||
}
|
||||
}
|
||||
@@ -377,10 +379,14 @@ export class Connection {
|
||||
// We only look for one such sub; if there are N apparently-identical subs
|
||||
// being invalidated, we will require N matching subscribe calls to keep
|
||||
// them all active.
|
||||
var existing = _.find(self._subscriptions, sub => {
|
||||
return (
|
||||
sub.inactive && sub.name === name && EJSON.equals(sub.params, params)
|
||||
);
|
||||
var existing;
|
||||
keys(self._subscriptions).some(id => {
|
||||
const sub = self._subscriptions[id];
|
||||
if (sub.inactive &&
|
||||
sub.name === name &&
|
||||
EJSON.equals(sub.params, params)) {
|
||||
return existing = sub;
|
||||
}
|
||||
});
|
||||
|
||||
var id;
|
||||
@@ -450,13 +456,16 @@ export class Connection {
|
||||
// return a handle to the application.
|
||||
var handle = {
|
||||
stop() {
|
||||
if (!_.has(self._subscriptions, id)) return;
|
||||
|
||||
if (! hasOwn.call(self._subscriptions, id)) {
|
||||
return;
|
||||
}
|
||||
self._subscriptions[id].stop();
|
||||
},
|
||||
ready() {
|
||||
// return false if we've unsubscribed.
|
||||
if (!_.has(self._subscriptions, id)) return false;
|
||||
if (! hasOwn.call(self._subscriptions, id)) {
|
||||
return false;
|
||||
}
|
||||
var record = self._subscriptions[id];
|
||||
record.readyDeps.depend();
|
||||
return record.ready;
|
||||
@@ -472,15 +481,15 @@ export class Connection {
|
||||
// be reused from the rerun. If it isn't reused, it's killed from
|
||||
// an afterFlush.
|
||||
Tracker.onInvalidate(c => {
|
||||
if (_.has(self._subscriptions, id))
|
||||
if (hasOwn.call(self._subscriptions, id)) {
|
||||
self._subscriptions[id].inactive = true;
|
||||
}
|
||||
|
||||
Tracker.afterFlush(() => {
|
||||
if (
|
||||
_.has(self._subscriptions, id) &&
|
||||
self._subscriptions[id].inactive
|
||||
)
|
||||
if (hasOwn.call(self._subscriptions, id) &&
|
||||
self._subscriptions[id].inactive) {
|
||||
handle.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -514,13 +523,15 @@ export class Connection {
|
||||
}
|
||||
|
||||
methods(methods) {
|
||||
var self = this;
|
||||
_.each(methods, (func, name) => {
|
||||
if (typeof func !== 'function')
|
||||
keys(methods).forEach(name => {
|
||||
const func = methods[name];
|
||||
if (typeof func !== 'function') {
|
||||
throw new Error("Method '" + name + "' must be a function");
|
||||
if (self._methodHandlers[name])
|
||||
}
|
||||
if (this._methodHandlers[name]) {
|
||||
throw new Error("A method named '" + name + "' is already defined");
|
||||
self._methodHandlers[name] = func;
|
||||
}
|
||||
this._methodHandlers[name] = func;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -536,7 +547,7 @@ export class Connection {
|
||||
call(name /* .. [arguments] .. callback */) {
|
||||
// if it's a function, the last argument is the result callback,
|
||||
// not a parameter to the remote method.
|
||||
var args = Array.prototype.slice.call(arguments, 1);
|
||||
var args = slice.call(arguments, 1);
|
||||
if (args.length && typeof args[args.length - 1] === 'function')
|
||||
var callback = args.pop();
|
||||
return this.apply(name, args, callback);
|
||||
@@ -770,12 +781,15 @@ export class Connection {
|
||||
} else {
|
||||
// Not a wait method. Start a new block if the previous block was a wait
|
||||
// block, and add it to the last block of methods.
|
||||
if (
|
||||
_.isEmpty(self._outstandingMethodBlocks) ||
|
||||
_.last(self._outstandingMethodBlocks).wait
|
||||
)
|
||||
self._outstandingMethodBlocks.push({ wait: false, methods: [] });
|
||||
_.last(self._outstandingMethodBlocks).methods.push(methodInvoker);
|
||||
if (isEmpty(self._outstandingMethodBlocks) ||
|
||||
last(self._outstandingMethodBlocks).wait) {
|
||||
self._outstandingMethodBlocks.push({
|
||||
wait: false,
|
||||
methods: [],
|
||||
});
|
||||
}
|
||||
|
||||
last(self._outstandingMethodBlocks).methods.push(methodInvoker);
|
||||
}
|
||||
|
||||
// If we added it to the first block, send it out now.
|
||||
@@ -793,12 +807,15 @@ export class Connection {
|
||||
// _retrieveAndStoreOriginals to get the original versions of changed
|
||||
// documents.
|
||||
_saveOriginals() {
|
||||
var self = this;
|
||||
if (!self._waitingForQuiescence()) self._flushBufferedWrites();
|
||||
_.each(self._stores, s => {
|
||||
s.saveOriginals();
|
||||
if (! this._waitingForQuiescence()) {
|
||||
this._flushBufferedWrites();
|
||||
}
|
||||
|
||||
keys(this._stores).forEach(storeName => {
|
||||
this._stores[storeName].saveOriginals();
|
||||
});
|
||||
}
|
||||
|
||||
// Retrieves the original versions of all documents modified by the stub for
|
||||
// method 'methodId' from all stores and saves them to _serverDocuments (keyed
|
||||
// by document) and _documentsWrittenByStub (keyed by method ID).
|
||||
@@ -808,14 +825,16 @@ export class Connection {
|
||||
throw new Error('Duplicate methodId in _retrieveAndStoreOriginals');
|
||||
|
||||
var docsWritten = [];
|
||||
_.each(self._stores, (s, collection) => {
|
||||
var originals = s.retrieveOriginals();
|
||||
|
||||
keys(self._stores).forEach(collection => {
|
||||
var originals = self._stores[collection].retrieveOriginals();
|
||||
// not all stores define retrieveOriginals
|
||||
if (!originals) return;
|
||||
if (! originals) return;
|
||||
originals.forEach((doc, id) => {
|
||||
docsWritten.push({ collection: collection, id: id });
|
||||
if (!_.has(self._serverDocuments, collection))
|
||||
docsWritten.push({ collection, id });
|
||||
if (! hasOwn.call(self._serverDocuments, collection)) {
|
||||
self._serverDocuments[collection] = new MongoIDMap();
|
||||
}
|
||||
var serverDoc = self._serverDocuments[collection].setDefault(
|
||||
id,
|
||||
Object.create(null)
|
||||
@@ -833,7 +852,7 @@ export class Connection {
|
||||
}
|
||||
});
|
||||
});
|
||||
if (!_.isEmpty(docsWritten)) {
|
||||
if (! isEmpty(docsWritten)) {
|
||||
self._documentsWrittenByStub[methodId] = docsWritten;
|
||||
}
|
||||
}
|
||||
@@ -841,8 +860,8 @@ export class Connection {
|
||||
// This is very much a private function we use to make the tests
|
||||
// take up fewer server resources after they complete.
|
||||
_unsubscribeAll() {
|
||||
var self = this;
|
||||
_.each(_.clone(self._subscriptions), (sub, id) => {
|
||||
keys(this._subscriptions).forEach(id => {
|
||||
const sub = this._subscriptions[id];
|
||||
// Avoid killing the autoupdate subscription so that developers
|
||||
// still get hot code pushes when writing tests.
|
||||
//
|
||||
@@ -850,7 +869,7 @@ export class Connection {
|
||||
// but it doesn't seem worth it yet to have a special API for
|
||||
// subscriptions to preserve after unit tests.
|
||||
if (sub.name !== 'meteor_autoupdate_clientVersions') {
|
||||
self._subscriptions[id].stop();
|
||||
sub.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -923,15 +942,18 @@ export class Connection {
|
||||
// "wait" method to finish.
|
||||
_waitingForQuiescence() {
|
||||
return (
|
||||
!_.isEmpty(this._subsBeingRevived) ||
|
||||
!_.isEmpty(this._methodsBlockingQuiescence)
|
||||
! isEmpty(this._subsBeingRevived) ||
|
||||
! isEmpty(this._methodsBlockingQuiescence)
|
||||
);
|
||||
}
|
||||
|
||||
// Returns true if any method whose message has been sent to the server has
|
||||
// not yet invoked its user callback.
|
||||
_anyMethodsAreOutstanding() {
|
||||
return _.any(_.pluck(this._methodInvokers, 'sentMessage'));
|
||||
const invokers = this._methodInvokers;
|
||||
return keys(invokers).some(id => {
|
||||
return invokers[id].sentMessage;
|
||||
});
|
||||
}
|
||||
|
||||
_livedata_connected(msg) {
|
||||
@@ -991,8 +1013,10 @@ export class Connection {
|
||||
// XXX We should also block reconnect quiescence until unnamed subscriptions
|
||||
// (eg, autopublish) are done re-publishing to avoid flicker!
|
||||
self._subsBeingRevived = Object.create(null);
|
||||
_.each(self._subscriptions, (sub, id) => {
|
||||
if (sub.ready) self._subsBeingRevived[id] = true;
|
||||
keys(self._subscriptions).forEach(id => {
|
||||
if (self._subscriptions[id].ready) {
|
||||
self._subsBeingRevived[id] = true;
|
||||
}
|
||||
});
|
||||
|
||||
// Arrange for "half-finished" methods to have their callbacks run, and
|
||||
@@ -1004,13 +1028,17 @@ export class Connection {
|
||||
// that we drop here will be restored by the loop below.
|
||||
self._methodsBlockingQuiescence = Object.create(null);
|
||||
if (self._resetStores) {
|
||||
_.each(self._methodInvokers, invoker => {
|
||||
const invokers = self._methodInvokers;
|
||||
keys(invokers).forEach(id => {
|
||||
const invoker = invokers[id];
|
||||
if (invoker.gotResult()) {
|
||||
// This method already got its result, but it didn't call its callback
|
||||
// because its data didn't become visible. We did not resend the
|
||||
// method RPC. We'll call its callback when we get a full quiesce,
|
||||
// since that's as close as we'll get to "data must be visible".
|
||||
self._afterUpdateCallbacks.push(_.bind(invoker.dataVisible, invoker));
|
||||
self._afterUpdateCallbacks.push(
|
||||
(...args) => invoker.dataVisible(...args)
|
||||
);
|
||||
} else if (invoker.sentMessage) {
|
||||
// This method has been sent on this connection (maybe as a resend
|
||||
// from the last connection, maybe from onReconnect, maybe just very
|
||||
@@ -1030,9 +1058,10 @@ export class Connection {
|
||||
|
||||
// If we're not waiting on any methods or subs, we can reset the stores and
|
||||
// call the callbacks immediately.
|
||||
if (!self._waitingForQuiescence()) {
|
||||
if (! self._waitingForQuiescence()) {
|
||||
if (self._resetStores) {
|
||||
_.each(self._stores, s => {
|
||||
keys(self._stores).forEach(storeName => {
|
||||
const s = self._stores[storeName];
|
||||
s.beginUpdate(0, true);
|
||||
s.endUpdate();
|
||||
});
|
||||
@@ -1069,24 +1098,40 @@ export class Connection {
|
||||
if (self._waitingForQuiescence()) {
|
||||
self._messagesBufferedUntilQuiescence.push(msg);
|
||||
|
||||
if (msg.msg === 'nosub') delete self._subsBeingRevived[msg.id];
|
||||
if (msg.msg === 'nosub') {
|
||||
delete self._subsBeingRevived[msg.id];
|
||||
}
|
||||
|
||||
_.each(msg.subs || [], subId => {
|
||||
delete self._subsBeingRevived[subId];
|
||||
});
|
||||
_.each(msg.methods || [], methodId => {
|
||||
delete self._methodsBlockingQuiescence[methodId];
|
||||
});
|
||||
if (msg.subs) {
|
||||
msg.subs.forEach(subId => {
|
||||
delete self._subsBeingRevived[subId];
|
||||
});
|
||||
}
|
||||
|
||||
if (self._waitingForQuiescence()) return;
|
||||
if (msg.methods) {
|
||||
msg.methods.forEach(methodId => {
|
||||
delete self._methodsBlockingQuiescence[methodId];
|
||||
});
|
||||
}
|
||||
|
||||
if (self._waitingForQuiescence()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// No methods or subs are blocking quiescence!
|
||||
// We'll now process and all of our buffered messages, reset all stores,
|
||||
// and apply them all at once.
|
||||
_.each(self._messagesBufferedUntilQuiescence, bufferedMsg => {
|
||||
self._processOneDataMessage(bufferedMsg, self._bufferedWrites);
|
||||
|
||||
const bufferedMessages = self._messagesBufferedUntilQuiescence;
|
||||
keys(bufferedMessages).forEach(id => {
|
||||
self._processOneDataMessage(
|
||||
bufferedMessages[id],
|
||||
self._bufferedWrites
|
||||
);
|
||||
});
|
||||
|
||||
self._messagesBufferedUntilQuiescence = [];
|
||||
|
||||
} else {
|
||||
self._processOneDataMessage(msg, self._bufferedWrites);
|
||||
}
|
||||
@@ -1094,8 +1139,12 @@ export class Connection {
|
||||
// Immediately flush writes when:
|
||||
// 1. Buffering is disabled. Or;
|
||||
// 2. any non-(added/changed/removed) message arrives.
|
||||
var standardWrite = _.include(['added', 'changed', 'removed'], msg.msg);
|
||||
if (self._bufferedWritesInterval === 0 || !standardWrite) {
|
||||
var standardWrite =
|
||||
msg.msg === "added" ||
|
||||
msg.msg === "changed" ||
|
||||
msg.msg === "removed";
|
||||
|
||||
if (self._bufferedWritesInterval === 0 || ! standardWrite) {
|
||||
self._flushBufferedWrites();
|
||||
return;
|
||||
}
|
||||
@@ -1136,20 +1185,25 @@ export class Connection {
|
||||
_performWrites(updates) {
|
||||
var self = this;
|
||||
|
||||
if (self._resetStores || !_.isEmpty(updates)) {
|
||||
if (self._resetStores || ! isEmpty(updates)) {
|
||||
// Begin a transactional update of each store.
|
||||
_.each(self._stores, (s, storeName) => {
|
||||
s.beginUpdate(
|
||||
_.has(updates, storeName) ? updates[storeName].length : 0,
|
||||
|
||||
keys(self._stores).forEach(storeName => {
|
||||
self._stores[storeName].beginUpdate(
|
||||
hasOwn.call(updates, storeName)
|
||||
? updates[storeName].length
|
||||
: 0,
|
||||
self._resetStores
|
||||
);
|
||||
});
|
||||
|
||||
self._resetStores = false;
|
||||
|
||||
_.each(updates, (updateMessages, storeName) => {
|
||||
keys(updates).forEach(storeName => {
|
||||
const updateMessages = updates[storeName];
|
||||
var store = self._stores[storeName];
|
||||
if (store) {
|
||||
_.each(updateMessages, updateMessage => {
|
||||
updateMessages.forEach(updateMessage => {
|
||||
store.update(updateMessage);
|
||||
});
|
||||
} else {
|
||||
@@ -1158,18 +1212,19 @@ export class Connection {
|
||||
// XXX memory use will grow without bound if you forget to
|
||||
// create a collection or just don't care about it... going
|
||||
// to have to do something about that.
|
||||
if (!_.has(self._updatesForUnknownStores, storeName))
|
||||
self._updatesForUnknownStores[storeName] = [];
|
||||
Array.prototype.push.apply(
|
||||
self._updatesForUnknownStores[storeName],
|
||||
updateMessages
|
||||
);
|
||||
const updates = self._updatesForUnknownStores;
|
||||
|
||||
if (! hasOwn.call(updates, storeName)) {
|
||||
updates[storeName] = [];
|
||||
}
|
||||
|
||||
updates[storeName].push(...updateMessages);
|
||||
}
|
||||
});
|
||||
|
||||
// End update transaction.
|
||||
_.each(self._stores, s => {
|
||||
s.endUpdate();
|
||||
keys(self._stores).forEach(storeName => {
|
||||
self._stores[storeName].endUpdate();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1183,14 +1238,13 @@ export class Connection {
|
||||
var self = this;
|
||||
var callbacks = self._afterUpdateCallbacks;
|
||||
self._afterUpdateCallbacks = [];
|
||||
_.each(callbacks, c => {
|
||||
callbacks.forEach(c => {
|
||||
c();
|
||||
});
|
||||
}
|
||||
|
||||
_pushUpdate(updates, collection, msg) {
|
||||
var self = this;
|
||||
if (!_.has(updates, collection)) {
|
||||
if (! hasOwn.call(updates, collection)) {
|
||||
updates[collection] = [];
|
||||
}
|
||||
updates[collection].push(msg);
|
||||
@@ -1198,7 +1252,9 @@ export class Connection {
|
||||
|
||||
_getServerDoc(collection, id) {
|
||||
var self = this;
|
||||
if (!_.has(self._serverDocuments, collection)) return null;
|
||||
if (! hasOwn.call(self._serverDocuments, collection)) {
|
||||
return null;
|
||||
}
|
||||
var serverDocsForCollection = self._serverDocuments[collection];
|
||||
return serverDocsForCollection.get(id) || null;
|
||||
}
|
||||
@@ -1263,20 +1319,25 @@ export class Connection {
|
||||
_process_updated(msg, updates) {
|
||||
var self = this;
|
||||
// Process "method done" messages.
|
||||
_.each(msg.methods, methodId => {
|
||||
_.each(self._documentsWrittenByStub[methodId], written => {
|
||||
var serverDoc = self._getServerDoc(written.collection, written.id);
|
||||
if (!serverDoc)
|
||||
|
||||
msg.methods.forEach(methodId => {
|
||||
const docs = self._documentsWrittenByStub[methodId];
|
||||
keys(docs).forEach(id => {
|
||||
const written = docs[id];
|
||||
const serverDoc = self._getServerDoc(written.collection, written.id);
|
||||
if (! serverDoc) {
|
||||
throw new Error('Lost serverDoc for ' + JSON.stringify(written));
|
||||
if (!serverDoc.writtenByStubs[methodId])
|
||||
}
|
||||
if (! serverDoc.writtenByStubs[methodId]) {
|
||||
throw new Error(
|
||||
'Doc ' +
|
||||
JSON.stringify(written) +
|
||||
' not written by method ' +
|
||||
methodId
|
||||
);
|
||||
}
|
||||
delete serverDoc.writtenByStubs[methodId];
|
||||
if (_.isEmpty(serverDoc.writtenByStubs)) {
|
||||
if (isEmpty(serverDoc.writtenByStubs)) {
|
||||
// All methods whose stubs wrote this method have completed! We can
|
||||
// now copy the saved document to the database (reverting the stub's
|
||||
// change if the server did not write to this object, or applying the
|
||||
@@ -1291,7 +1352,8 @@ export class Connection {
|
||||
replace: serverDoc.document
|
||||
});
|
||||
// Call all flush callbacks.
|
||||
_.each(serverDoc.flushCallbacks, c => {
|
||||
|
||||
serverDoc.flushCallbacks.forEach(c => {
|
||||
c();
|
||||
});
|
||||
|
||||
@@ -1305,11 +1367,13 @@ export class Connection {
|
||||
|
||||
// We want to call the data-written callback, but we can't do so until all
|
||||
// currently buffered messages are flushed.
|
||||
var callbackInvoker = self._methodInvokers[methodId];
|
||||
if (!callbackInvoker)
|
||||
const callbackInvoker = self._methodInvokers[methodId];
|
||||
if (! callbackInvoker) {
|
||||
throw new Error('No callback invoker for method ' + methodId);
|
||||
}
|
||||
|
||||
self._runWhenAllServerDocsAreFlushed(
|
||||
_.bind(callbackInvoker.dataVisible, callbackInvoker)
|
||||
(...args) => callbackInvoker.dataVisible(...args)
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -1319,7 +1383,8 @@ export class Connection {
|
||||
// Process "sub ready" messages. "sub ready" messages don't take effect
|
||||
// until all current server documents have been flushed to the local
|
||||
// database. We can use a write fence to implement this.
|
||||
_.each(msg.subs, subId => {
|
||||
|
||||
msg.subs.forEach(subId => {
|
||||
self._runWhenAllServerDocsAreFlushed(() => {
|
||||
var subRecord = self._subscriptions[subId];
|
||||
// Did we already unsubscribe?
|
||||
@@ -1350,15 +1415,15 @@ export class Connection {
|
||||
runFAfterUpdates();
|
||||
}
|
||||
};
|
||||
_.each(self._serverDocuments, collectionDocs => {
|
||||
collectionDocs.forEach(serverDoc => {
|
||||
var writtenByStubForAMethodWithSentMessage = _.any(
|
||||
serverDoc.writtenByStubs,
|
||||
(dummy, methodId) => {
|
||||
|
||||
keys(self._serverDocuments).forEach(collection => {
|
||||
self._serverDocuments[collection].forEach(serverDoc => {
|
||||
const writtenByStubForAMethodWithSentMessage =
|
||||
keys(serverDoc.writtenByStubs).some(methodId => {
|
||||
var invoker = self._methodInvokers[methodId];
|
||||
return invoker && invoker.sentMessage;
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
if (writtenByStubForAMethodWithSentMessage) {
|
||||
++unflushedServerDocCount;
|
||||
serverDoc.flushCallbacks.push(onServerDocFlush);
|
||||
@@ -1383,7 +1448,9 @@ export class Connection {
|
||||
// buffering-until-quiescence.
|
||||
|
||||
// we weren't subbed anyway, or we initiated the unsub.
|
||||
if (!_.has(self._subscriptions, msg.id)) return;
|
||||
if (! hasOwn.call(self._subscriptions, msg.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// XXX COMPAT WITH 1.0.3.1 #errorCallback
|
||||
var errorCallback = self._subscriptions[msg.id].errorCallback;
|
||||
@@ -1419,13 +1486,13 @@ export class Connection {
|
||||
var self = this;
|
||||
|
||||
// Lets make sure there are no buffered writes before returning result.
|
||||
if (!_.isEmpty(self._bufferedWrites)) {
|
||||
if (! isEmpty(self._bufferedWrites)) {
|
||||
self._flushBufferedWrites();
|
||||
}
|
||||
|
||||
// find the outstanding request
|
||||
// should be O(1) in nearly all realistic use cases
|
||||
if (_.isEmpty(self._outstandingMethodBlocks)) {
|
||||
if (isEmpty(self._outstandingMethodBlocks)) {
|
||||
Meteor._debug('Received method result but no methods outstanding');
|
||||
return;
|
||||
}
|
||||
@@ -1446,7 +1513,7 @@ export class Connection {
|
||||
// _outstandingMethodFinished.
|
||||
currentMethodBlock.splice(i, 1);
|
||||
|
||||
if (_.has(msg, 'error')) {
|
||||
if (hasOwn.call(msg, 'error')) {
|
||||
m.receiveResult(
|
||||
new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details)
|
||||
);
|
||||
@@ -1467,16 +1534,16 @@ export class Connection {
|
||||
// No methods are outstanding. This should mean that the first block of
|
||||
// methods is empty. (Or it might not exist, if this was a method that
|
||||
// half-finished before disconnect/reconnect.)
|
||||
if (!_.isEmpty(self._outstandingMethodBlocks)) {
|
||||
if (! isEmpty(self._outstandingMethodBlocks)) {
|
||||
var firstBlock = self._outstandingMethodBlocks.shift();
|
||||
if (!_.isEmpty(firstBlock.methods))
|
||||
if (! isEmpty(firstBlock.methods))
|
||||
throw new Error(
|
||||
'No methods outstanding but nonempty block: ' +
|
||||
JSON.stringify(firstBlock)
|
||||
);
|
||||
|
||||
// Send the outstanding methods now in the first block.
|
||||
if (!_.isEmpty(self._outstandingMethodBlocks))
|
||||
if (! isEmpty(self._outstandingMethodBlocks))
|
||||
self._sendOutstandingMethods();
|
||||
}
|
||||
|
||||
@@ -1488,8 +1555,12 @@ export class Connection {
|
||||
// _outstandingMethodBlocks.
|
||||
_sendOutstandingMethods() {
|
||||
var self = this;
|
||||
if (_.isEmpty(self._outstandingMethodBlocks)) return;
|
||||
_.each(self._outstandingMethodBlocks[0].methods, m => {
|
||||
|
||||
if (isEmpty(self._outstandingMethodBlocks)) {
|
||||
return;
|
||||
}
|
||||
|
||||
self._outstandingMethodBlocks[0].methods.forEach(m => {
|
||||
m.sendMessage();
|
||||
});
|
||||
}
|
||||
@@ -1510,12 +1581,12 @@ export class Connection {
|
||||
return true;
|
||||
});
|
||||
|
||||
if (_.isEmpty(oldOutstandingMethodBlocks)) return;
|
||||
if (isEmpty(oldOutstandingMethodBlocks)) return;
|
||||
|
||||
// We have at least one block worth of old outstanding methods to try
|
||||
// again. First: did onReconnect actually send anything? If not, we just
|
||||
// restore all outstanding methods and run the first block.
|
||||
if (_.isEmpty(self._outstandingMethodBlocks)) {
|
||||
if (isEmpty(self._outstandingMethodBlocks)) {
|
||||
self._outstandingMethodBlocks = oldOutstandingMethodBlocks;
|
||||
self._sendOutstandingMethods();
|
||||
return;
|
||||
@@ -1524,30 +1595,29 @@ export class Connection {
|
||||
// OK, there are blocks on both sides. Special case: merge the last block of
|
||||
// the reconnect methods with the first block of the original methods, if
|
||||
// neither of them are "wait" blocks.
|
||||
if (
|
||||
!_.last(self._outstandingMethodBlocks).wait &&
|
||||
!oldOutstandingMethodBlocks[0].wait
|
||||
) {
|
||||
_.each(oldOutstandingMethodBlocks[0].methods, m => {
|
||||
_.last(self._outstandingMethodBlocks).methods.push(m);
|
||||
if (! last(self._outstandingMethodBlocks).wait &&
|
||||
! oldOutstandingMethodBlocks[0].wait) {
|
||||
oldOutstandingMethodBlocks[0].methods.forEach(m => {
|
||||
last(self._outstandingMethodBlocks).methods.push(m);
|
||||
|
||||
// If this "last block" is also the first block, send the message.
|
||||
if (self._outstandingMethodBlocks.length === 1) m.sendMessage();
|
||||
if (self._outstandingMethodBlocks.length === 1) {
|
||||
m.sendMessage();
|
||||
}
|
||||
});
|
||||
|
||||
oldOutstandingMethodBlocks.shift();
|
||||
}
|
||||
|
||||
// Now add the rest of the original blocks on.
|
||||
_.each(oldOutstandingMethodBlocks, block => {
|
||||
oldOutstandingMethodBlocks.forEach(block => {
|
||||
self._outstandingMethodBlocks.push(block);
|
||||
});
|
||||
}
|
||||
|
||||
// We can accept a hot code push if there are no methods in flight.
|
||||
_readyToMigrate() {
|
||||
var self = this;
|
||||
return _.isEmpty(self._methodInvokers);
|
||||
return isEmpty(this._methodInvokers);
|
||||
}
|
||||
|
||||
// If we were blocking a migration, see if it's now possible to continue.
|
||||
@@ -1588,7 +1658,7 @@ export class Connection {
|
||||
this._livedata_connected(msg);
|
||||
this.options.onConnected();
|
||||
} else if (msg.msg === 'failed') {
|
||||
if (_.contains(this._supportedDDPVersions, msg.version)) {
|
||||
if (this._supportedDDPVersions.indexOf(msg.version) >= 0) {
|
||||
this._versionSuggestion = msg.version;
|
||||
this._stream.reconnect({ _force: true });
|
||||
} else {
|
||||
@@ -1681,7 +1751,7 @@ export class Connection {
|
||||
|
||||
// Mark all messages as unsent, they have not yet been sent on this
|
||||
// connection.
|
||||
Object.keys(this._methodInvokers).forEach(id => {
|
||||
keys(this._methodInvokers).forEach(id => {
|
||||
this._methodInvokers[id].sentMessage = false;
|
||||
});
|
||||
|
||||
@@ -1694,7 +1764,7 @@ export class Connection {
|
||||
|
||||
// add new subscriptions at the end. this way they take effect after
|
||||
// the handlers and we don't see flicker.
|
||||
Object.keys(this._subscriptions).forEach(id => {
|
||||
keys(this._subscriptions).forEach(id => {
|
||||
const sub = this._subscriptions[id];
|
||||
this._send({
|
||||
msg: 'sub',
|
||||
|
||||
39
packages/ddp-client/common/utils.js
Normal file
39
packages/ddp-client/common/utils.js
Normal file
@@ -0,0 +1,39 @@
|
||||
"use strict";
|
||||
|
||||
export const hasOwn = Object.prototype.hasOwnProperty;
|
||||
export const slice = Array.prototype.slice;
|
||||
|
||||
export function keys(obj) {
|
||||
return Object.keys(Object(obj));
|
||||
}
|
||||
|
||||
export function isEmpty(obj) {
|
||||
if (obj == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (Array.isArray(obj) ||
|
||||
typeof obj === "string") {
|
||||
return obj.length === 0;
|
||||
}
|
||||
|
||||
for (const key in obj) {
|
||||
if (hasOwn.call(obj, key)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export function last(array, n, guard) {
|
||||
if (array == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ((n == null) || guard) {
|
||||
return array[array.length - 1];
|
||||
}
|
||||
|
||||
return slice.call(array, Math.max(array.length - n, 0));
|
||||
}
|
||||
@@ -11,28 +11,24 @@ Npm.depends({
|
||||
});
|
||||
|
||||
Package.onUse((api) => {
|
||||
api.use(
|
||||
[
|
||||
'check',
|
||||
'random',
|
||||
'ejson',
|
||||
'underscore',
|
||||
'tracker',
|
||||
'retry',
|
||||
'id-map',
|
||||
'ecmascript',
|
||||
'callback-hook',
|
||||
'ddp-common',
|
||||
'reload',
|
||||
api.use([
|
||||
'check',
|
||||
'random',
|
||||
'ejson',
|
||||
'tracker',
|
||||
'retry',
|
||||
'id-map',
|
||||
'ecmascript',
|
||||
'callback-hook',
|
||||
'ddp-common',
|
||||
'reload',
|
||||
|
||||
// we depend on _diffObjects, _applyChanges,
|
||||
'diff-sequence',
|
||||
// we depend on _diffObjects, _applyChanges,
|
||||
'diff-sequence',
|
||||
|
||||
// _idParse, _idStringify.
|
||||
'mongo-id'
|
||||
],
|
||||
['client', 'server']
|
||||
);
|
||||
// _idParse, _idStringify.
|
||||
'mongo-id'
|
||||
], ['client', 'server']);
|
||||
|
||||
api.use('reload', 'client', { weak: true });
|
||||
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
import { _ } from 'meteor/underscore';
|
||||
import { Meteor } from 'meteor/meteor';
|
||||
|
||||
import { DDP } from '../common/namespace.js';
|
||||
import { toWebsocketUrl } from '../common/urlHelpers.js';
|
||||
import StreamClientCommon from '../common/stream_client_common.js';
|
||||
|
||||
Reference in New Issue
Block a user