Merge branch 'release-3.0-tests-ddp-client' into release-3.0-tests-dpp-client-mongo

# Conflicts:
#	packages/ddp-server/writefence.js
#	packages/minimongo/minimongo_tests_client.js
#	packages/mongo/observe_multiplex.js
#	packages/mongo/polling_observe_driver.js
#	tools/isobuild/linker.js
This commit is contained in:
denihs
2023-02-08 14:39:56 -04:00
18 changed files with 682 additions and 767 deletions

View File

@@ -52,7 +52,7 @@ Autoupdate.autoupdateVersionRefreshable = null;
Autoupdate.autoupdateVersionCordova = null;
Autoupdate.appId = __meteor_runtime_config__.appId = process.env.APP_ID;
var syncQueue = new Meteor._SynchronousQueue();
var syncQueue = new Meteor._AsynchronousQueue();
async function updateVersions(shouldReloadClientProgram) {
// Step 1: load the current client program on the server

View File

@@ -237,8 +237,8 @@ export class Connection {
// Block auto-reload while we're waiting for method responses.
if (Meteor.isClient &&
Package.reload &&
! options.reloadWithOutstanding) {
Package.reload &&
! options.reloadWithOutstanding) {
Package.reload.Reload._onMigrate(retry => {
if (! self._readyToMigrate()) {
self._retryMigrate = retry;
@@ -601,10 +601,12 @@ export class Connection {
return new Promise((resolve, reject) => {
this.applyAsync(name, args, { isFromCallAsync: true })
.then(result => {
DDP._CurrentMethodInvocation._setCallAsyncMethodRunning(false);
resolve(result);
})
.catch(reject);
.catch(reject)
.finally(() =>
DDP._CurrentMethodInvocation._setCallAsyncMethodRunning(false)
);
});
}
@@ -799,7 +801,7 @@ export class Connection {
} else {
// On the server, make the function synchronous. Throw on
// errors, return on success.
// TODO fibers: before this was a future, now it's a promise.
// TODO[fibers]: before this was a future, now it's a promise.
// Do more tests around this.
if (!options.isFromCallAsync) {
@@ -861,7 +863,9 @@ export class Connection {
// If we're using the default callback on the server,
// block waiting for the result.
if (future) {
return future;
return {
stubValuePromise: future,
};
}
return options.returnStubValue ? stubReturnValue : undefined;
}

View File

@@ -1044,7 +1044,8 @@ if (Meteor.isServer) {
async function(test, expect) {
const self = this;
if (self.conn.status().connected) {
test.equal(await self.conn.callAsync('s2s', 'foo'), 's2s foo');
const callResult = await self.conn.callAsync('s2s', 'foo');
test.equal(await callResult.stubValuePromise, 's2s foo');
}
},
]);

View File

@@ -360,12 +360,20 @@ var Session = function (server, version, socket, options) {
};
Object.assign(Session.prototype, {
_checkPublishPromiseBeforeSend(f) {
if (!this._publishCursorPromise) {
f();
return;
}
this._publishCursorPromise.finally(() => f());
},
sendReady: function (subscriptionIds) {
var self = this;
if (self._isSending)
self.send({msg: "ready", subs: subscriptionIds});
else {
if (self._isSending) {
this._checkPublishPromiseBeforeSend(() => {
self.send({msg: "ready", subs: subscriptionIds});
});
} else {
_.each(subscriptionIds, function (subscriptionId) {
self._pendingReady.push(subscriptionId);
});
@@ -379,13 +387,9 @@ Object.assign(Session.prototype, {
sendAdded(collectionName, id, fields) {
if (this._canSend(collectionName)) {
if (!this._publishCursorPromise) {
this._checkPublishPromiseBeforeSend(() => {
this.send({ msg: 'added', collection: collectionName, id, fields });
return;
}
this._publishCursorPromise.finally(() =>
this.send({ msg: 'added', collection: collectionName, id, fields })
);
});
}
},
@@ -394,18 +398,23 @@ Object.assign(Session.prototype, {
return;
if (this._canSend(collectionName)) {
this.send({
msg: "changed",
collection: collectionName,
id,
fields
this._checkPublishPromiseBeforeSend(() => {
this.send({
msg: "changed",
collection: collectionName,
id,
fields
});
});
}
},
sendRemoved(collectionName, id) {
if (this._canSend(collectionName))
this.send({msg: "removed", collection: collectionName, id});
if (this._canSend(collectionName)) {
this._checkPublishPromiseBeforeSend(() => {
this.send({msg: "removed", collection: collectionName, id});
});
}
},
getSendCallbacks: function () {
@@ -1518,9 +1527,7 @@ Server = function (options = {}) {
return;
}
Meteor._runAsync(function() {
self._handleConnect(socket, msg);
})
self._handleConnect(socket, msg);
return;
}

View File

@@ -127,8 +127,9 @@ Tinytest.addAsync(
makeTestConnection(
test,
function (clientConn, serverConn) {
clientConn.callAsync('livedata_server_test_inner').then(res => {
test.equal(res, serverConn.id);
clientConn.callAsync('livedata_server_test_inner').then(async res => {
const r = await res.stubValuePromise;
test.equal(r, serverConn.id);
clientConn.disconnect();
onComplete();
});
@@ -144,9 +145,10 @@ Tinytest.addAsync(
function (test, onComplete) {
makeTestConnection(
test,
function (clientConn, serverConn) {
clientConn.callAsync('livedata_server_test_outer').then(res => {
test.equal(res, serverConn.id);
function(clientConn, serverConn) {
clientConn.callAsync('livedata_server_test_outer').then(async res => {
const r = await res.stubValuePromise;
test.equal(r, serverConn.id);
clientConn.disconnect();
onComplete();
});
@@ -351,7 +353,7 @@ Tinytest.addAsync(
(test, onComplete) => makeTestConnection(test, async (clientConn, serverConn) => {
const testResolvedPromiseResult = await clientConn.callAsync("testResolvedPromise", "clientConn.call");
test.equal(
testResolvedPromiseResult,
await testResolvedPromiseResult.stubValuePromise,
"clientConn.call after waiting"
);

View File

@@ -1,8 +1,27 @@
Meteor._noYieldsAllowed = function (f) {
return f();
const result = f();
if (Meteor._isPromise(result)) {
throw new Error("function is a promise when calling Meteor._noYieldsAllowed");
}
return result
};
Meteor._DoubleEndedQueue = Npm.require('denque');
class FakeDoubleEndedQueue {
constructor() {
this.queue = [];
}
push(task) {
this.queue.push(task);
}
shift() {
return this.queue.shift();
}
isEmpty() {
return this.queue.length === 0;
}
}
Meteor._DoubleEndedQueue = Meteor.isServer ? Npm.require('denque') : FakeDoubleEndedQueue;
// Meteor._SynchronousQueue is a queue which runs task functions serially.
// Tasks are assumed to be synchronous: ie, it's assumed that they are
@@ -50,8 +69,15 @@ class AsynchronousQueue {
let resolve;
const promise = new Promise(r => resolve = r);
setImmediate(() => {
this._run().finally(() => resolve());
const runImmediateHandle = (fn) => {
if (Meteor.isServer) {
setImmediate(fn);
return;
}
setTimeout(fn, 0);
};
runImmediateHandle(() => {
this._run().finally(resolve);
});
return promise;
}
@@ -136,7 +162,6 @@ class AsynchronousQueue {
}
Meteor._AsynchronousQueue = AsynchronousQueue;
Meteor._SynchronousQueue = AsynchronousQueue;
// Sleep. Mostly used for debugging (eg, inserting latency into server

View File

@@ -1,129 +0,0 @@
Meteor._noYieldsAllowed = function (f) {
return f();
};
Meteor._DoubleEndedQueue = Npm.require('denque');
// Meteor._SynchronousQueue is a queue which runs task functions serially.
// Tasks are assumed to be synchronous: ie, it's assumed that they are
// done when they return.
//
// It has two methods:
// - queueTask queues a task to be run, and returns immediately.
// - runTask queues a task to be run, and then yields. It returns
// when the task finishes running.
//
// It's safe to call queueTask from within a task, but not runTask (unless
// you're calling runTask from a nested Fiber).
//
// Somewhat inspired by async.queue, but specific to blocking tasks.
// XXX break this out into an NPM module?
// XXX could maybe use the npm 'schlock' module instead, which would
// also support multiple concurrent "read" tasks
//
class AsynchronousQueue {
constructor() {
this._taskHandles = new Meteor._DoubleEndedQueue();
this._runningOrRunScheduled = false;
// This is true if we're currently draining. While we're draining, a further
// drain is a noop, to prevent infinite loops. "drain" is a heuristic type
// operation, that has a meaning like unto "what a naive person would expect
// when modifying a table from an observe"
this._draining = false;
}
queueTask(task) {
this._taskHandles.push({
task: task,
name: task.name
});
return this._scheduleRun();
}
_scheduleRun() {
// Already running or scheduled? Do nothing.
if (this._runningOrRunScheduled)
return;
this._runningOrRunScheduled = true;
let resolver;
const returnValue = new Promise(r => resolver = r);
setImmediate(() => {
Meteor._runAsync(async () => {
await this._run();
if (!resolver) {
throw new Error("Resolver not found for task");
}
resolver();
});
});
return returnValue;
}
async _run() {
if (!this._runningOrRunScheduled)
throw new Error("expected to be _runningOrRunScheduled");
if (this._taskHandles.isEmpty()) {
// Done running tasks! Don't immediately schedule another run, but
// allow future tasks to do so.
this._runningOrRunScheduled = false;
return;
}
const taskHandle = this._taskHandles.shift();
// Run the task.
try {
await taskHandle.task();
} catch (err) {
Meteor._debug("Exception in queued task", err);
}
// Soon, run the next task, if there is any.
this._runningOrRunScheduled = false;
await this._scheduleRun();
}
async runTask(task) {
const handle = {
task: Meteor.bindEnvironment(task, function(e) {
Meteor._debug('Exception from task', e);
throw e;
}),
name: task.name
};
this._taskHandles.push(handle);
// XXX: We should be doing this a different way.
await Meteor._sleepForMs(10);
return this._scheduleRun();
}
flush() {
return this.runTask(() => {});
}
async drain() {
if (this._draining)
return;
this._draining = true;
while (!this._taskHandles.isEmpty()) {
await this.flush();
}
this._draining = false;
}
}
Meteor._AsynchronousQueue = AsynchronousQueue;
Meteor._SynchronousQueue = AsynchronousQueue;
// Sleep. Mostly used for debugging (eg, inserting latency into server
// methods).
//
Meteor._sleepForMs = (ms) => new Promise(resolve => setTimeout(resolve, ms));

View File

@@ -34,11 +34,7 @@ Package.onUse(function (api) {
api.addFiles('timers.js', ['client', 'server']);
api.addFiles('errors.js', ['client', 'server']);
api.addFiles('asl-helpers.js', 'server');
if (process.env.DISABLE_FIBERS) {
api.addFiles('async_helpers.js', 'server');
} else {
api.addFiles('fiber_helpers.js', 'server');
}
api.addFiles('async_helpers.js', ['client', 'server']);
api.addFiles('fiber_stubs_client.js', 'client');
api.addFiles('asl-helpers-client.js', 'client');
api.addFiles('startup_client.js', ['client']);

View File

@@ -20,7 +20,9 @@ export default class LocalCollection {
// _id -> document (also containing id)
this._docs = new LocalCollection._IdMap;
this._observeQueue = new Meteor._SynchronousQueue();
this._observeQueue = Meteor.isClient
? new Meteor._SynchronousQueue()
: new Meteor._AsynchronousQueue();
this.next_qid = 1; // live query id generator
@@ -96,7 +98,11 @@ export default class LocalCollection {
return this.find(selector, options).fetch()[0];
}
async findOneAsync(selector, options = {}) {
return Promise.resolve(this.findOne(selector, options));
if (arguments.length === 0) {
selector = {};
}
options.limit = 1;
return (await this.find(selector, options).fetchAsync())[0];
}
prepareInsert(doc) {
assertHasValidFieldNames(doc);
@@ -131,7 +137,7 @@ export default class LocalCollection {
const query = this.queries[qid];
if (query.dirty) {
break;
continue;
}
const matchResult = query.matcher.documentMatches(doc);
@@ -174,7 +180,7 @@ export default class LocalCollection {
const query = this.queries[qid];
if (query.dirty) {
break;
continue;
}
const matchResult = query.matcher.documentMatches(doc);
@@ -623,11 +629,6 @@ export default class LocalCollection {
let updateCount = 0;
// for (const qid of Object.keys(this.queries)) {
// this._recomputeResults(this.queries[qid], qidToOriginalResults[qid]);
// }
this._eachPossiblyMatchingDocSync(selector, (doc, id) => {
const queryResult = matcher.documentMatches(doc);
@@ -637,7 +638,6 @@ export default class LocalCollection {
recomputeQids = this._modifyAndNotifySync(
doc,
mod,
recomputeQids,
queryResult.arrayIndices
);
@@ -770,7 +770,7 @@ export default class LocalCollection {
const query = this.queries[qid];
if (query.dirty) {
break;
continue;
}
const afterMatch = query.matcher.documentMatches(doc);
@@ -815,7 +815,7 @@ export default class LocalCollection {
const query = this.queries[qid];
if (query.dirty) {
break;
continue;
}
const afterMatch = query.matcher.documentMatches(doc);

File diff suppressed because it is too large Load Diff

View File

@@ -146,7 +146,7 @@ ObserveMultiplexer = class {
}
async _applyCallback(callbackName, args) {
const self = this;
await this._queue.queueTask(async function () {
this._queue.queueTask(async function () {
// If we stopped in the meantime, do nothing.
if (!self._handles)
return;
@@ -189,8 +189,6 @@ ObserveMultiplexer = class {
return;
// note: docs may be an _IdMap or an OrderedDict
await this._cache.docs.forEachAsync(async (doc, id) => {
//TODO FIXME
if (!this._handles) console.log({this:this});
if (!_.has(this._handles, handle._id))
throw Error("handle got removed before sending initial adds!");
const { _id, ...fields } = handle.nonMutatingCallbacks ? doc
@@ -215,8 +213,8 @@ ObserveHandle = class {
// ordered observe where for some reason you don't get ordering data on
// the adds. I dunno, we wrote tests for it, there must have been a
// reason.
this._addedBefore = function (id, fields, before) {
callbacks.added(id, fields);
this._addedBefore = async function (id, fields, before) {
await callbacks.added(id, fields);
};
}
});

View File

@@ -115,8 +115,8 @@ OplogObserveDriver = function (options) {
forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
Meteor._noYieldsAllowed(finishIfNeedToPollQuery(function () {
trigger, async function (notification) {
await finishIfNeedToPollQuery(function () {
var op = notification.op;
if (notification.dropCollection || notification.dropDatabase) {
// Note: this call is not allowed to block on anything (especially
@@ -131,7 +131,7 @@ OplogObserveDriver = function (options) {
return self._handleOplogEntrySteadyOrFetching(op);
}
}
}));
});
}
));
});
@@ -162,7 +162,7 @@ OplogObserveDriver = function (options) {
for (const driver of Object.values(drivers)) {
if (driver._stopped)
return;
continue;
var write = await fence.beginWrite();
if (driver._phase === PHASE.STEADY) {
@@ -568,7 +568,7 @@ _.extend(OplogObserveDriver.prototype, {
await w.committed();
}
} catch (e) {
console.log({writes}, e);
console.error("_beSteady error", {writes}, e);
}
});
},
@@ -826,16 +826,14 @@ _.extend(OplogObserveDriver.prototype, {
if (self._phase !== PHASE.QUERYING)
throw Error("Phase unexpectedly " + self._phase);
await Meteor._noYieldsAllowed(async function () {
if (self._requeryWhenDoneThisQuery) {
self._requeryWhenDoneThisQuery = false;
self._pollQuery();
} else if (self._needToFetch.empty()) {
await self._beSteady();
} else {
self._fetchModifiedDocuments();
}
});
if (self._requeryWhenDoneThisQuery) {
self._requeryWhenDoneThisQuery = false;
self._pollQuery();
} else if (self._needToFetch.empty()) {
await self._beSteady();
} else {
self._fetchModifiedDocuments();
}
},
_cursorForQuery: function (optionsOverwrite) {

View File

@@ -78,7 +78,7 @@ OplogHandle = function (oplogUrl, dbName) {
self._workerActive = false;
const shouldAwait = self._startTailing();
//TODO Why wait?
//TODO[fibers] Why wait?
};
Object.assign(OplogHandle.prototype, {

View File

@@ -202,10 +202,10 @@ _.extend(PollingObserveDriver.prototype, {
// round, mark all the writes which existed before this call as
// commmitted. (If new writes have shown up in the meantime, there'll
// already be another _pollMongo task scheduled.)
await self._multiplexer.onFlush(function () {
_.each(writesForCycle, function (w) {
w.committed();
});
await self._multiplexer.onFlush(async function () {
for (const w of writesForCycle) {
await w.committed();
}
});
},
@@ -218,8 +218,8 @@ _.extend(PollingObserveDriver.prototype, {
_.each(self._stopCallbacks, stopCallbacksCaller);
// Release any write fences that are waiting on us.
_.each(self._pendingWrites, function (w) {
w.committed();
_.each(self._pendingWrites, async function (w) {
await w.committed();
});
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-drivers-polling", -1);

View File

@@ -455,7 +455,7 @@ export const TestManager = new (class TestManager {
constructor() {
this.tests = {};
this.ordered_tests = [];
this.testQueue = Meteor.isServer && new Meteor._SynchronousQueue();
this.testQueue = Meteor.isServer && new Meteor._AsynchronousQueue();
this.onlyTestsNames = [];
}

View File

@@ -796,7 +796,7 @@ onMessage('webapp-reload-client', async ({ arch }) => {
async function runWebAppServer() {
var shuttingDown = false;
var syncQueue = new Meteor._SynchronousQueue();
var syncQueue = new Meteor._AsynchronousQueue();
var getItemPathname = function(itemUrl) {
return decodeURIComponent(parseUrl(itemUrl).pathname);

View File

@@ -3441,7 +3441,7 @@ async function bundle({
if (hasCachedBundle) {
// If we already have a cached bundle, just recreate the new targets.
// XXX This might make the contents of "star.json" out of date.
for (const target of targets) {
for (const target of Object.values(targets)) {
await writeClientTarget(target);
}
} else {

View File

@@ -941,7 +941,7 @@ var getHeader = function (options) {
var isApp = options.name === null;
var chunks = [];
let orderedDeps = [];
let orderedDeps = [];
options.deps.forEach(dep => {
if (!dep.unordered) {