mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Correctly implement forEach (make it synchronously run the callbacks) and apply that to map.
Also, correctly implement the withValue from the Meteor.EVp. Seems that the values were being set the wrong way.
This commit is contained in:
@@ -79,8 +79,7 @@ _.extend(DDPServer._WriteFence.prototype, {
|
||||
self.completion_callbacks.push(func);
|
||||
},
|
||||
|
||||
// Convenience function. Arms the fence, then blocks until it fires.
|
||||
armAndWait: function () {
|
||||
_armAndWaitFibers: function () {
|
||||
var self = this;
|
||||
var future = new Future;
|
||||
self.onAllCommitted(function () {
|
||||
@@ -89,6 +88,29 @@ _.extend(DDPServer._WriteFence.prototype, {
|
||||
self.arm();
|
||||
future.wait();
|
||||
},
|
||||
_armAndWaitNoFibers: function () {
|
||||
var self = this;
|
||||
|
||||
let _resolver;
|
||||
self.onAllCommitted(function () {
|
||||
if (!_resolver) {
|
||||
console.warn("oops, no resolver");
|
||||
return;
|
||||
}
|
||||
|
||||
_resolver();
|
||||
});
|
||||
|
||||
return new Promise((r) => {
|
||||
_resolver = r;
|
||||
self.arm();
|
||||
} );
|
||||
},
|
||||
|
||||
// Convenience function. Arms the fence, then blocks until it fires.
|
||||
armAndWait: function () {
|
||||
return Meteor._isFibersEnabled ? this._armAndWaitFibers() : this._armAndWaitNoFibers();
|
||||
},
|
||||
|
||||
_maybeFire: function () {
|
||||
var self = this;
|
||||
|
||||
@@ -97,15 +97,15 @@ EVp.withValue = function (value, func) {
|
||||
let meteorDynamics = Meteor._getValueFromAslStore('_meteor_dynamics');
|
||||
if (!meteorDynamics) {
|
||||
meteorDynamics = [];
|
||||
Meteor._updateAslStore('_meteor_dynamics', []);
|
||||
}
|
||||
|
||||
const saved = meteorDynamics[this.slot];
|
||||
try {
|
||||
Meteor._updateAslStore('_meteor_dynamics', value);
|
||||
meteorDynamics[this.slot] = value;
|
||||
return func();
|
||||
} finally {
|
||||
Meteor._updateAslStore('_meteor_dynamics', saved);
|
||||
meteorDynamics[this.slot] = saved;
|
||||
Meteor._updateAslStore('_meteor_dynamics', meteorDynamics);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -162,12 +162,12 @@ SQp._scheduleRun = function () {
|
||||
*/
|
||||
if (Meteor._isFibersEnabled) {
|
||||
setImmediate(function() {
|
||||
Fiber(function() {
|
||||
Meteor._runAsync(function() {
|
||||
self._run();
|
||||
}).run();
|
||||
});
|
||||
});
|
||||
} else {
|
||||
global.asyncLocalStorage.run(Meteor._getAslStore(), () => {
|
||||
Meteor._runAsync(() => {
|
||||
self._run();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -689,8 +689,6 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver {
|
||||
this.docs.putBefore(id, doc, before || null);
|
||||
},
|
||||
movedBefore: (id, before) => {
|
||||
const doc = this.docs.get(id);
|
||||
|
||||
if (callbacks.movedBefore) {
|
||||
callbacks.movedBefore.call(this, id, before);
|
||||
}
|
||||
@@ -1201,7 +1199,7 @@ LocalCollection._modify = (doc, modifier, options = {}) => {
|
||||
});
|
||||
};
|
||||
|
||||
LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => {
|
||||
LocalCollection._observeFromObserveChangesFibers = (cursor, observeCallbacks) => {
|
||||
const transform = cursor.getTransform() || (doc => doc);
|
||||
let suppressed = !!observeCallbacks._suppress_initial;
|
||||
|
||||
@@ -1344,6 +1342,155 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => {
|
||||
return handle;
|
||||
};
|
||||
|
||||
LocalCollection._observeFromObserveChangesNoFibers = async (cursor, observeCallbacks) => {
|
||||
const transform = cursor.getTransform() || (doc => doc);
|
||||
let suppressed = !!observeCallbacks._suppress_initial;
|
||||
|
||||
let observeChangesCallbacks;
|
||||
if (LocalCollection._observeCallbacksAreOrdered(observeCallbacks)) {
|
||||
// The "_no_indices" option sets all index arguments to -1 and skips the
|
||||
// linear scans required to generate them. This lets observers that don't
|
||||
// need absolute indices benefit from the other features of this API --
|
||||
// relative order, transforms, and applyChanges -- without the speed hit.
|
||||
const indices = !observeCallbacks._no_indices;
|
||||
|
||||
observeChangesCallbacks = {
|
||||
addedBefore(id, fields, before) {
|
||||
if (suppressed || !(observeCallbacks.addedAt || observeCallbacks.added)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const doc = transform(Object.assign(fields, {_id: id}));
|
||||
|
||||
if (observeCallbacks.addedAt) {
|
||||
observeCallbacks.addedAt(
|
||||
doc,
|
||||
indices
|
||||
? before
|
||||
? this.docs.indexOf(before)
|
||||
: this.docs.size()
|
||||
: -1,
|
||||
before
|
||||
);
|
||||
} else {
|
||||
observeCallbacks.added(doc);
|
||||
}
|
||||
},
|
||||
changed(id, fields) {
|
||||
if (!(observeCallbacks.changedAt || observeCallbacks.changed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
let doc = EJSON.clone(this.docs.get(id));
|
||||
if (!doc) {
|
||||
throw new Error(`Unknown id for changed: ${id}`);
|
||||
}
|
||||
|
||||
const oldDoc = transform(EJSON.clone(doc));
|
||||
|
||||
DiffSequence.applyChanges(doc, fields);
|
||||
|
||||
if (observeCallbacks.changedAt) {
|
||||
observeCallbacks.changedAt(
|
||||
transform(doc),
|
||||
oldDoc,
|
||||
indices ? this.docs.indexOf(id) : -1
|
||||
);
|
||||
} else {
|
||||
observeCallbacks.changed(transform(doc), oldDoc);
|
||||
}
|
||||
},
|
||||
movedBefore(id, before) {
|
||||
if (!observeCallbacks.movedTo) {
|
||||
return;
|
||||
}
|
||||
|
||||
const from = indices ? this.docs.indexOf(id) : -1;
|
||||
let to = indices
|
||||
? before
|
||||
? this.docs.indexOf(before)
|
||||
: this.docs.size()
|
||||
: -1;
|
||||
|
||||
// When not moving backwards, adjust for the fact that removing the
|
||||
// document slides everything back one slot.
|
||||
if (to > from) {
|
||||
--to;
|
||||
}
|
||||
|
||||
observeCallbacks.movedTo(
|
||||
transform(EJSON.clone(this.docs.get(id))),
|
||||
from,
|
||||
to,
|
||||
before || null
|
||||
);
|
||||
},
|
||||
removed(id) {
|
||||
if (!(observeCallbacks.removedAt || observeCallbacks.removed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// technically maybe there should be an EJSON.clone here, but it's about
|
||||
// to be removed from this.docs!
|
||||
const doc = transform(this.docs.get(id));
|
||||
|
||||
if (observeCallbacks.removedAt) {
|
||||
observeCallbacks.removedAt(doc, indices ? this.docs.indexOf(id) : -1);
|
||||
} else {
|
||||
observeCallbacks.removed(doc);
|
||||
}
|
||||
},
|
||||
};
|
||||
} else {
|
||||
observeChangesCallbacks = {
|
||||
added(id, fields) {
|
||||
if (!suppressed && observeCallbacks.added) {
|
||||
observeCallbacks.added(transform(Object.assign(fields, {_id: id})));
|
||||
}
|
||||
},
|
||||
changed(id, fields) {
|
||||
if (observeCallbacks.changed) {
|
||||
const oldDoc = this.docs.get(id);
|
||||
const doc = EJSON.clone(oldDoc);
|
||||
|
||||
DiffSequence.applyChanges(doc, fields);
|
||||
|
||||
observeCallbacks.changed(
|
||||
transform(doc),
|
||||
transform(EJSON.clone(oldDoc))
|
||||
);
|
||||
}
|
||||
},
|
||||
removed(id) {
|
||||
if (observeCallbacks.removed) {
|
||||
observeCallbacks.removed(transform(this.docs.get(id)));
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const changeObserver = new LocalCollection._CachingChangeObserver({
|
||||
callbacks: observeChangesCallbacks
|
||||
});
|
||||
|
||||
// CachingChangeObserver clones all received input on its callbacks
|
||||
// So we can mark it as safe to reduce the ejson clones.
|
||||
// This is tested by the `mongo-livedata - (extended) scribbling` tests
|
||||
changeObserver.applyChange._fromObserve = true;
|
||||
const handle = await cursor.observeChanges(changeObserver.applyChange,
|
||||
{ nonMutatingCallbacks: true });
|
||||
|
||||
suppressed = false;
|
||||
|
||||
return handle;
|
||||
};
|
||||
|
||||
LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => {
|
||||
return Meteor._isFibersEnabled
|
||||
? LocalCollection._observeFromObserveChangesFibers(cursor, observeCallbacks)
|
||||
: LocalCollection._observeFromObserveChangesNoFibers(cursor, observeCallbacks);
|
||||
};
|
||||
|
||||
LocalCollection._observeCallbacksAreOrdered = callbacks => {
|
||||
if (callbacks.added && callbacks.addedAt) {
|
||||
throw new Error('Please specify only one of added() and addedAt()');
|
||||
|
||||
@@ -1125,23 +1125,23 @@ class AsynchronousCursor {
|
||||
});
|
||||
}
|
||||
|
||||
forEach(callback, thisArg) {
|
||||
async forEach(callback, thisArg) {
|
||||
// Get back to the beginning.
|
||||
this._rewind();
|
||||
|
||||
return this._cursor.forEach((doc, index) => {
|
||||
callback.call(thisArg, doc, index)
|
||||
}, this._selfForIteration);
|
||||
let idx = 0;
|
||||
while (true) {
|
||||
const doc = await this._nextObjectPromise();
|
||||
if (!doc) return;
|
||||
await callback.call(thisArg, doc, idx++, this._selfForIteration);
|
||||
}
|
||||
}
|
||||
|
||||
async map(callback, thisArg) {
|
||||
const results = [];
|
||||
|
||||
let idx = 0;
|
||||
for await (const doc of this._cursor) {
|
||||
results.push(await callback.call(thisArg, doc, idx, this._selfForIteration))
|
||||
idx++;
|
||||
}
|
||||
await this.forEach(async (doc, index) => {
|
||||
results.push(await callback.call(thisArg, doc, index, this._selfForIteration));
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
@@ -1159,7 +1159,7 @@ class AsynchronousCursor {
|
||||
}
|
||||
|
||||
fetch() {
|
||||
return this._cursor.toArray();
|
||||
return this.map(_.identity);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1172,13 +1172,13 @@ class AsynchronousCursor {
|
||||
}
|
||||
|
||||
// This method is NOT wrapped in Cursor.
|
||||
getRawObjects(ordered) {
|
||||
async getRawObjects(ordered) {
|
||||
var self = this;
|
||||
if (ordered) {
|
||||
return self.fetch();
|
||||
} else {
|
||||
var results = new LocalCollection._IdMap;
|
||||
self.forEach(function (doc) {
|
||||
await self.forEach(function (doc) {
|
||||
results.set(doc._id, doc);
|
||||
});
|
||||
return results;
|
||||
@@ -1590,6 +1590,10 @@ Object.assign(MongoConnection.prototype, {
|
||||
_testOnlyPollCallback: callbacks._testOnlyPollCallback
|
||||
});
|
||||
|
||||
if (observeDriver._init) {
|
||||
await observeDriver._init();
|
||||
}
|
||||
|
||||
// This field is only set for use in tests.
|
||||
multiplexer._observeDriver = observeDriver;
|
||||
}
|
||||
@@ -1697,6 +1701,10 @@ Object.assign(MongoConnection.prototype, {
|
||||
_testOnlyPollCallback: callbacks._testOnlyPollCallback
|
||||
});
|
||||
|
||||
if (observeDriver._init) {
|
||||
observeDriver._init();
|
||||
}
|
||||
|
||||
// This field is only set for use in tests.
|
||||
multiplexer._observeDriver = observeDriver;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ PollingObserveDriver = function (options) {
|
||||
self._stopCallbacks = [];
|
||||
self._stopped = false;
|
||||
|
||||
self._synchronousCursor = self._mongoHandle._createSynchronousCursor(
|
||||
self._cursor = self._mongoHandle._createSynchronousCursor(
|
||||
self._cursorDescription);
|
||||
|
||||
// previous results snapshot. on each poll cycle, diffs against
|
||||
@@ -74,15 +74,28 @@ PollingObserveDriver = function (options) {
|
||||
Meteor.clearInterval(intervalHandle);
|
||||
});
|
||||
}
|
||||
|
||||
// Make sure we actually poll soon!
|
||||
self._unthrottledEnsurePollIsScheduled();
|
||||
|
||||
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
|
||||
"mongo-livedata", "observe-drivers-polling", 1);
|
||||
};
|
||||
|
||||
_.extend(PollingObserveDriver.prototype, {
|
||||
_initAsync: async function () {
|
||||
// Make sure we actually poll soon!
|
||||
await this._unthrottledEnsurePollIsScheduled();
|
||||
|
||||
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
|
||||
"mongo-livedata", "observe-drivers-polling", 1);
|
||||
},
|
||||
_init() {
|
||||
if (!Meteor._isFibersEnabled) {
|
||||
return this._initAsync();
|
||||
}
|
||||
|
||||
var self = this;
|
||||
// Make sure we actually poll soon!
|
||||
self._unthrottledEnsurePollIsScheduled();
|
||||
|
||||
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
|
||||
"mongo-livedata", "observe-drivers-polling", 1);
|
||||
},
|
||||
// This is always called through _.throttle (except once at startup).
|
||||
_unthrottledEnsurePollIsScheduled: function () {
|
||||
var self = this;
|
||||
@@ -129,7 +142,7 @@ _.extend(PollingObserveDriver.prototype, {
|
||||
});
|
||||
},
|
||||
|
||||
_pollMongo: function () {
|
||||
_pollMongoFibers: function () {
|
||||
var self = this;
|
||||
--self._pollsScheduledButNotStarted;
|
||||
|
||||
@@ -153,7 +166,7 @@ _.extend(PollingObserveDriver.prototype, {
|
||||
|
||||
// Get the new query results. (This yields.)
|
||||
try {
|
||||
newResults = self._synchronousCursor.getRawObjects(self._ordered);
|
||||
newResults = self._cursor.getRawObjects(self._ordered);
|
||||
} catch (e) {
|
||||
if (first && typeof(e.code) === 'number') {
|
||||
// This is an error document sent to us by mongod, not a connection
|
||||
@@ -208,6 +221,89 @@ _.extend(PollingObserveDriver.prototype, {
|
||||
});
|
||||
},
|
||||
|
||||
async _pollMongoNoFibers() {
|
||||
var self = this;
|
||||
--self._pollsScheduledButNotStarted;
|
||||
|
||||
if (self._stopped)
|
||||
return;
|
||||
|
||||
var first = false;
|
||||
var newResults;
|
||||
var oldResults = self._results;
|
||||
if (!oldResults) {
|
||||
first = true;
|
||||
// XXX maybe use OrderedDict instead?
|
||||
oldResults = self._ordered ? [] : new LocalCollection._IdMap;
|
||||
}
|
||||
|
||||
self._testOnlyPollCallback && self._testOnlyPollCallback();
|
||||
|
||||
// Save the list of pending writes which this round will commit.
|
||||
var writesForCycle = self._pendingWrites;
|
||||
self._pendingWrites = [];
|
||||
|
||||
// Get the new query results. (This yields.)
|
||||
try {
|
||||
newResults = await self._cursor.getRawObjects(self._ordered);
|
||||
} catch (e) {
|
||||
if (first && typeof(e.code) === 'number') {
|
||||
// This is an error document sent to us by mongod, not a connection
|
||||
// error generated by the client. And we've never seen this query work
|
||||
// successfully. Probably it's a bad selector or something, so we should
|
||||
// NOT retry. Instead, we should halt the observe (which ends up calling
|
||||
// `stop` on us).
|
||||
self._multiplexer.queryError(
|
||||
new Error(
|
||||
"Exception while polling query " +
|
||||
JSON.stringify(self._cursorDescription) + ": " + e.message));
|
||||
return;
|
||||
}
|
||||
|
||||
// getRawObjects can throw if we're having trouble talking to the
|
||||
// database. That's fine --- we will repoll later anyway. But we should
|
||||
// make sure not to lose track of this cycle's writes.
|
||||
// (It also can throw if there's just something invalid about this query;
|
||||
// unfortunately the ObserveDriver API doesn't provide a good way to
|
||||
// "cancel" the observe from the inside in this case.
|
||||
Array.prototype.push.apply(self._pendingWrites, writesForCycle);
|
||||
Meteor._debug("Exception while polling query " +
|
||||
JSON.stringify(self._cursorDescription), e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Run diffs.
|
||||
if (!self._stopped) {
|
||||
LocalCollection._diffQueryChanges(
|
||||
self._ordered, oldResults, newResults, self._multiplexer);
|
||||
}
|
||||
|
||||
// Signals the multiplexer to allow all observeChanges calls that share this
|
||||
// multiplexer to return. (This happens asynchronously, via the
|
||||
// multiplexer's queue.)
|
||||
if (first)
|
||||
self._multiplexer.ready();
|
||||
|
||||
// Replace self._results atomically. (This assignment is what makes `first`
|
||||
// stay through on the next cycle, so we've waited until after we've
|
||||
// committed to ready-ing the multiplexer.)
|
||||
self._results = newResults;
|
||||
|
||||
// Once the ObserveMultiplexer has processed everything we've done in this
|
||||
// round, mark all the writes which existed before this call as
|
||||
// commmitted. (If new writes have shown up in the meantime, there'll
|
||||
// already be another _pollMongo task scheduled.)
|
||||
self._multiplexer.onFlush(function () {
|
||||
_.each(writesForCycle, function (w) {
|
||||
w.committed();
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
_pollMongo: function () {
|
||||
return Meteor._isFibersEnabled ? this._pollMongoFibers() : this._pollMongoNoFibers();
|
||||
},
|
||||
|
||||
stop: function () {
|
||||
var self = this;
|
||||
self._stopped = true;
|
||||
|
||||
Reference in New Issue
Block a user