From a30809863322cd71cb89d8f37b0fd17ef236da3a Mon Sep 17 00:00:00 2001 From: denihs Date: Tue, 6 Dec 2022 18:16:35 -0400 Subject: [PATCH] fixing _scheduleRun: It was adding an infity amount of tasks. fixing tests --- .../ddp-client/common/livedata_connection.js | 31 -- .../test/livedata_connection_tests.js | 518 +++++++++--------- packages/ddp-client/test/stub_stream.js | 16 +- packages/meteor/async_helpers.js | 26 +- packages/meteor/dynamics_nodejs.js | 6 +- 5 files changed, 276 insertions(+), 321 deletions(-) diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 1855a0e0f5..03b42c2a5a 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -15,13 +15,6 @@ import { last, } from "meteor/ddp-common/utils.js"; -let Fiber; -let Future; -if (Meteor.isServer) { - Fiber = Npm.require('fibers'); - Future = Npm.require('fibers/future'); -} - class MongoIDMap extends IdMap { constructor() { super(MongoID.idStringify, MongoID.idParse); @@ -494,30 +487,6 @@ export class Connection { return handle; } - // options: - // - onLateError {Function(error)} called if an error was received after the ready event. - // (errors received before ready cause an error to be thrown) - _subscribeAndWait(name, args, options) { - const self = this; - const f = new Future(); - let ready = false; - args = args || []; - args.push({ - onReady() { - ready = true; - f['return'](); - }, - onError(e) { - if (!ready) f['throw'](e); - else options && options.onLateError && options.onLateError(e); - } - }); - - const handle = self.subscribe.apply(self, [name].concat(args)); - f.wait(); - return handle; - } - methods(methods) { Object.entries(methods).forEach(([name, func]) => { if (typeof func !== 'function') { diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 32e014ccbf..3b9d4ce904 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -63,27 +63,27 @@ const testGotMessage = function(test, stream, expected) { return got; }; -const startAndConnect = function(test, stream) { - stream.reset(); // initial connection start. +const startAndConnect = async function(test, stream) { + await stream.reset(); // initial connection start. testGotMessage(test, stream, makeConnectMessage()); test.length(stream.sent, 0); - stream.receive({ msg: 'connected', session: SESSION_ID }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); test.length(stream.sent, 0); }; const SESSION_ID = '17'; -Tinytest.add('livedata stub - receive data', function(test) { +Tinytest.addAsync('livedata stub - receive data', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // data comes in for unknown collection. const coll_name = Random.id(); - stream.receive({ + await stream.receive({ msg: 'added', collection: coll_name, id: '1234', @@ -101,7 +101,7 @@ Tinytest.add('livedata stub - receive data', function(test) { test.equal(coll.find({}).fetch(), [{ _id: '1234', a: 1 }]); // second message. applied directly to the db. - stream.receive({ + await stream.receive({ msg: 'changed', collection: coll_name, id: '1234', @@ -111,7 +111,7 @@ Tinytest.add('livedata stub - receive data', function(test) { test.isUndefined(conn._updatesForUnknownStores[coll_name]); }); -Tinytest.add('livedata stub - buffering data', function(test) { +Tinytest.addAsync('livedata stub - buffering data', async function(test) { // Install special setTimeout that allows tick-by-tick control in tests using sinonjs 'lolex' // This needs to be before the connection is instantiated. const clock = FakeTimers.install(); @@ -123,15 +123,15 @@ Tinytest.add('livedata stub - buffering data', function(test) { bufferedWritesMaxAge: 40 }); - startAndConnect(test, stream); + await startAndConnect(test, stream); const coll_name = Random.id(); const coll = new Mongo.Collection(coll_name, conn); const testDocCount = count => test.equal(coll.find({}).count(), count); - const addDoc = () => { - stream.receive({ + const addDoc = async () => { + await stream.receive({ msg: 'added', collection: coll_name, id: Random.id(), @@ -141,31 +141,31 @@ Tinytest.add('livedata stub - buffering data', function(test) { // Starting at 0 ticks. At this point we haven't advanced the fake clock at all. - addDoc(); // 1st Doc + await addDoc(); // 1st Doc testDocCount(0); // No doc been recognized yet because it's buffered, waiting for more. tick(6); // 6 total ticks testDocCount(0); // Ensure that the doc still hasn't shown up, despite the clock moving forward. tick(4); // 10 total ticks, 1st buffer interval testDocCount(1); // No other docs have arrived, so we 'see' the 1st doc. - addDoc(); // 2nd doc + await addDoc(); // 2nd doc tick(1); // 11 total ticks (1 since last flush) testDocCount(1); // Again, second doc hasn't arrived because we're waiting for more... tick(9); // 20 total ticks (10 ticks since last flush & the 2nd 10-tick interval) testDocCount(2); // Now we're here and got the second document. // Add several docs, frequently enough that we buffer multiple times before the next flush. - addDoc(); // 3 docs + await addDoc(); // 3 docs tick(6); // 26 ticks (6 since last flush) - addDoc(); // 4 docs + await addDoc(); // 4 docs tick(6); // 32 ticks (12 since last flush) - addDoc(); // 5 docs + await addDoc(); // 5 docs tick(6); // 38 ticks (18 since last flush) - addDoc(); // 6 docs + await addDoc(); // 6 docs tick(6); // 44 ticks (24 since last flush) - addDoc(); // 7 docs + await addDoc(); // 7 docs tick(9); // 53 ticks (33 since last flush) - addDoc(); // 8 docs + await addDoc(); // 8 docs tick(9); // 62 ticks! (42 ticks since last flush, over max-age - next interval triggers flush) testDocCount(2); // Still at 2 from before! (Just making sure) tick(1); // Ok, 63 ticks (10 since last doc, so this should cause the flush of all the docs) @@ -175,11 +175,11 @@ Tinytest.add('livedata stub - buffering data', function(test) { clock.uninstall(); }); -Tinytest.add('livedata stub - subscribe', function(test) { +Tinytest.addAsync('livedata stub - subscribe', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // subscribe let callback_fired = false; @@ -201,7 +201,7 @@ Tinytest.add('livedata stub - subscribe', function(test) { test.isFalse(reactivelyReady); // get the sub satisfied. callback fires. - stream.receive({ msg: 'ready', subs: [id] }); + await stream.receive({ msg: 'ready', subs: [id] }); test.isTrue(callback_fired); Tracker.flush(); test.isTrue(reactivelyReady); @@ -224,11 +224,11 @@ Tinytest.add('livedata stub - subscribe', function(test) { test.equal(message, { msg: 'sub', name: 'my_data', params: [] }); }); -Tinytest.add('livedata stub - reactive subscribe', function(test) { +Tinytest.addAsync('livedata stub - reactive subscribe', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const rFoo = new ReactiveVar('foo1'); const rBar = new ReactiveVar('bar1'); @@ -283,7 +283,7 @@ Tinytest.add('livedata stub - reactive subscribe', function(test) { test.isFalse(completerReady); // "completer" gets ready now. its callback should fire. - stream.receive({ msg: 'ready', subs: [idCompleter] }); + await stream.receive({ msg: 'ready', subs: [idCompleter] }); test.equal(onReadyCount, { completer: 1 }); test.length(stream.sent, 0); Tracker.flush(); @@ -331,7 +331,7 @@ Tinytest.add('livedata stub - reactive subscribe', function(test) { // the client; completing bar should call the onReady from the new // subscription because we always call onReady for a given reactively-saved // subscription. - stream.receive({ msg: 'ready', subs: [idStopperAgain, idBar1] }); + await stream.receive({ msg: 'ready', subs: [idStopperAgain, idBar1] }); test.equal(onReadyCount, { completer: 2, bar1: 1, stopper: 1 }); // Shut down the autorun. This should unsub us from all current subs at flush @@ -353,13 +353,13 @@ Tinytest.add('livedata stub - reactive subscribe', function(test) { test.equal(actualIds, expectedIds); }); -Tinytest.add('livedata stub - reactive subscribe handle correct', function( +Tinytest.addAsync('livedata stub - reactive subscribe handle correct', async function( test ) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const rFoo = new ReactiveVar('foo1'); @@ -402,7 +402,7 @@ Tinytest.add('livedata stub - reactive subscribe handle correct', function( test.isFalse(fooReady); // "foo" gets ready now. The handle should be ready and the autorun rerun - stream.receive({ msg: 'ready', subs: [idFoo2] }); + await stream.receive({ msg: 'ready', subs: [idFoo2] }); test.length(stream.sent, 0); Tracker.flush(); test.isTrue(fooHandle.ready()); @@ -427,7 +427,7 @@ Tinytest.add('livedata stub - reactive subscribe handle correct', function( test.isFalse(fooReady); // "foo" gets ready again - stream.receive({ msg: 'ready', subs: [idFoo3] }); + await stream.receive({ msg: 'ready', subs: [idFoo3] }); test.length(stream.sent, 0); Tracker.flush(); test.isTrue(fooHandle.ready()); @@ -436,11 +436,11 @@ Tinytest.add('livedata stub - reactive subscribe handle correct', function( autorunHandle.stop(); }); -Tinytest.add('livedata stub - this', function(test) { +Tinytest.addAsync('livedata stub - this', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); conn.methods({ test_this: function() { test.isTrue(this.isSimulation); @@ -461,30 +461,30 @@ Tinytest.add('livedata stub - this', function(test) { }); test.length(stream.sent, 0); - stream.receive({ msg: 'result', id: message.id, result: null }); - stream.receive({ msg: 'updated', methods: [message.id] }); + await stream.receive({ msg: 'result', id: message.id, result: null }); + await stream.receive({ msg: 'updated', methods: [message.id] }); }); if (Meteor.isClient) { - Tinytest.add('livedata stub - methods', function(test) { + Tinytest.addAsync('livedata stub - methods', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); // setup method conn.methods({ - do_something: function(x) { - coll.insert({ value: x }); + do_something: async function(x) { + await coll.insertAsync({ value: x }); } }); // setup observers const counts = { added: 0, removed: 0, changed: 0, moved: 0 }; - const handle = coll.find({}).observe({ + const handle = await coll.find({}).observe({ addedAt: function() { counts.added += 1; }, @@ -520,29 +520,29 @@ if (Meteor.isClient) { randomSeed: '*' }); - test.equal(coll.find({}).count(), 1); - test.equal(coll.find({ value: 'friday!' }).count(), 1); - const docId = coll.findOne({ value: 'friday!' })._id; + test.equal(await coll.find({}).count(), 1); + test.equal(await coll.find({ value: 'friday!' }).count(), 1); + const docId = await coll.findOneAsync({ value: 'friday!' })._id; // results does not yet result in callback, because data is not // ready. - stream.receive({ msg: 'result', id: message.id, result: '1234' }); + await stream.receive({ msg: 'result', id: message.id, result: '1234' }); test.isFalse(callback1Fired); // result message doesn't affect data - test.equal(coll.find({}).count(), 1); - test.equal(coll.find({ value: 'friday!' }).count(), 1); + test.equal(await coll.find({}).count(), 1); + test.equal(await coll.find({ value: 'friday!' }).count(), 1); test.equal(counts, { added: 1, removed: 0, changed: 0, moved: 0 }); // data methods do not show up (not quiescent yet) - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: MongoID.idStringify(docId), fields: { value: 'tuesday' } }); - test.equal(coll.find({}).count(), 1); - test.equal(coll.find({ value: 'friday!' }).count(), 1); + test.equal(await coll.find({}).count(), 1); + test.equal(await coll.find({ value: 'friday!' }).count(), 1); test.equal(counts, { added: 1, removed: 0, changed: 0, moved: 0 }); // send another methods (unknown on client) @@ -566,35 +566,35 @@ if (Meteor.isClient) { // get the first data satisfied message. changes are applied to database even // though another method is outstanding, because the other method didn't have // a stub. and its callback is called. - stream.receive({ msg: 'updated', methods: [message.id] }); + await stream.receive({ msg: 'updated', methods: [message.id] }); test.isTrue(callback1Fired); test.isFalse(callback2Fired); - test.equal(coll.find({}).count(), 1); - test.equal(coll.find({ value: 'tuesday' }).count(), 1); + test.equal(await coll.find({}).count(), 1); + test.equal(await coll.find({ value: 'tuesday' }).count(), 1); test.equal(counts, { added: 1, removed: 0, changed: 1, moved: 0 }); // second result - stream.receive({ msg: 'result', id: message2.id, result: 'bupkis' }); + await stream.receive({ msg: 'result', id: message2.id, result: 'bupkis' }); test.isFalse(callback2Fired); // get second satisfied; no new changes are applied. - stream.receive({ msg: 'updated', methods: [message2.id] }); + await stream.receive({ msg: 'updated', methods: [message2.id] }); test.isTrue(callback2Fired); - test.equal(coll.find({}).count(), 1); - test.equal(coll.find({ value: 'tuesday', _id: docId }).count(), 1); + test.equal(await coll.find({}).count(), 1); + test.equal(await coll.find({ value: 'tuesday', _id: docId }).count(), 1); test.equal(counts, { added: 1, removed: 0, changed: 1, moved: 0 }); handle.stop(); }); } -Tinytest.add('livedata stub - mutating method args', function(test) { +Tinytest.addAsync('livedata stub - mutating method args', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); conn.methods({ mutateArgs: function(arg) { @@ -616,10 +616,10 @@ Tinytest.add('livedata stub - mutating method args', function(test) { test.length(stream.sent, 0); }); -const observeCursor = function(test, cursor) { +const observeCursor = async function(test, cursor) { const counts = { added: 0, removed: 0, changed: 0, moved: 0 }; const expectedCounts = _.clone(counts); - const handle = cursor.observe({ + const handle = await cursor.observe({ addedAt: function() { counts.added += 1; }, @@ -646,11 +646,11 @@ const observeCursor = function(test, cursor) { // method calls another method in simulation. see not sent. if (Meteor.isClient) { - Tinytest.add('livedata stub - methods calling methods', function(test) { + Tinytest.addAsync('livedata stub - methods calling methods', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const coll_name = Random.id(); const coll = new Mongo.Collection(coll_name, { connection: conn }); @@ -665,7 +665,7 @@ if (Meteor.isClient) { } }); - const o = observeCursor(test, coll.find()); + const o = await observeCursor(test, coll.find()); // call method. conn.call('do_something', _.identity); @@ -683,40 +683,40 @@ if (Meteor.isClient) { // but inner method runs locally. o.expectCallbacks({ added: 1 }); test.equal(coll.find().count(), 1); - const docId = coll.findOne()._id; - test.equal(coll.findOne(), { _id: docId, a: 1 }); + const docId = await coll.findOneAsync()._id; + test.equal(await coll.findOneAsync(), { _id: docId, a: 1 }); // we get the results - stream.receive({ msg: 'result', id: message.id, result: '1234' }); + await stream.receive({ msg: 'result', id: message.id, result: '1234' }); // get data from the method. data from this doc does not show up yet, but data // from another doc does. - stream.receive({ + await stream.receive({ msg: 'added', collection: coll_name, id: MongoID.idStringify(docId), fields: { value: 'tuesday' } }); o.expectCallbacks(); - test.equal(coll.findOne(docId), { _id: docId, a: 1 }); - stream.receive({ + test.equal(await coll.findOneAsync(docId), { _id: docId, a: 1 }); + await stream.receive({ msg: 'added', collection: coll_name, id: 'monkey', fields: { value: 'bla' } }); o.expectCallbacks({ added: 1 }); - test.equal(coll.findOne(docId), { _id: docId, a: 1 }); - const newDoc = coll.findOne({ value: 'bla' }); + test.equal(await coll.findOneAsync(docId), { _id: docId, a: 1 }); + const newDoc = await coll.findOneAsync({ value: 'bla' }); test.isTrue(newDoc); test.equal(newDoc, { _id: newDoc._id, value: 'bla' }); // get method satisfied. all data shows up. the 'a' field is reverted and // 'value' field is set. - stream.receive({ msg: 'updated', methods: [message.id] }); + await stream.receive({ msg: 'updated', methods: [message.id] }); o.expectCallbacks({ changed: 1 }); - test.equal(coll.findOne(docId), { _id: docId, value: 'tuesday' }); - test.equal(coll.findOne(newDoc._id), { _id: newDoc._id, value: 'bla' }); + test.equal(await coll.findOneAsync(docId), { _id: docId, value: 'tuesday' }); + test.equal(await coll.findOneAsync(newDoc._id), { _id: newDoc._id, value: 'bla' }); o.stop(); }); @@ -746,7 +746,7 @@ Tinytest.add('livedata stub - method call before connect', function(test) { }); }); -Tinytest.add('livedata stub - reconnect', function(test) { +Tinytest.addAsync('livedata stub - reconnect', async function(test, onComplete) { const stream = new StubStream(); const conn = newConnection(stream); @@ -755,7 +755,7 @@ Tinytest.add('livedata stub - reconnect', function(test) { const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); - const o = observeCursor(test, coll.find()); + const o = await observeCursor(test, coll.find()); // subscribe let subCallbackFired = false; @@ -773,31 +773,31 @@ Tinytest.add('livedata stub - reconnect', function(test) { }); // get some data. it shows up. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: '1234', fields: { a: 1 } }); - test.equal(coll.find({}).count(), 1); + test.equal(await coll.find({}).count(), 1); o.expectCallbacks({ added: 1 }); test.isFalse(subCallbackFired); - stream.receive({ + await stream.receive({ msg: 'changed', collection: collName, id: '1234', fields: { b: 2 } }); - stream.receive({ + await stream.receive({ msg: 'ready', subs: [subMessage.id] // satisfy sub }); test.isTrue(subCallbackFired); subCallbackFired = false; // re-arm for test that it doesn't fire again. - test.equal(coll.find({ a: 1, b: 2 }).count(), 1); + test.equal(await coll.find({ a: 1, b: 2 }).count(), 1); o.expectCallbacks({ changed: 1 }); // call method. @@ -823,69 +823,69 @@ Tinytest.add('livedata stub - reconnect', function(test) { test.equal(stream.sent.length, 0); // more data. shows up immediately because there was no relevant method stub. - stream.receive({ + await stream.receive({ msg: 'changed', collection: collName, id: '1234', fields: { c: 3 } }); - test.equal(coll.findOne('1234'), { _id: '1234', a: 1, b: 2, c: 3 }); + test.equal(await coll.findOneAsync('1234'), { _id: '1234', a: 1, b: 2, c: 3 }); o.expectCallbacks({ changed: 1 }); // stream reset. reconnect! we send a connect, our pending method, and our // sub. The wait method still is blocked. - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); testGotMessage(test, stream, methodMessage); testGotMessage(test, stream, subMessage); // reconnect with different session id - stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); // resend data. doesn't show up: we're in reconnect quiescence. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: '1234', fields: { a: 1, b: 2, c: 3, d: 4 } }); - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: '2345', fields: { e: 5 } }); - test.equal(coll.findOne('1234'), { _id: '1234', a: 1, b: 2, c: 3 }); - test.isFalse(coll.findOne('2345')); + test.equal(await await coll.findOneAsync('1234'), { _id: '1234', a: 1, b: 2, c: 3 }); + test.isFalse(await coll.findOneAsync('2345')); o.expectCallbacks(); // satisfy and return the method - stream.receive({ + await stream.receive({ msg: 'updated', methods: [methodMessage.id] }); test.isFalse(methodCallbackFired); - stream.receive({ msg: 'result', id: methodMessage.id, result: 'bupkis' }); + await stream.receive({ msg: 'result', id: methodMessage.id, result: 'bupkis' }); // The callback still doesn't fire (and we don't send the wait method): we're // still in global quiescence test.isFalse(methodCallbackFired); test.equal(stream.sent.length, 0); // still no update. - test.equal(coll.findOne('1234'), { _id: '1234', a: 1, b: 2, c: 3 }); - test.isFalse(coll.findOne('2345')); + test.equal(await coll.findOneAsync('1234'), { _id: '1234', a: 1, b: 2, c: 3 }); + test.isFalse(await coll.findOneAsync('2345')); o.expectCallbacks(); // re-satisfy sub - stream.receive({ msg: 'ready', subs: [subMessage.id] }); + await stream.receive({ msg: 'ready', subs: [subMessage.id] }); // now the doc changes and method callback is called, and the wait method is // sent. the sub callback isn't re-called. test.isTrue(methodCallbackFired); test.isFalse(subCallbackFired); - test.equal(coll.findOne('1234'), { _id: '1234', a: 1, b: 2, c: 3, d: 4 }); - test.equal(coll.findOne('2345'), { _id: '2345', e: 5 }); + test.equal(await coll.findOneAsync('1234'), { _id: '1234', a: 1, b: 2, c: 3, d: 4 }); + test.equal(await coll.findOneAsync('2345'), { _id: '2345', e: 5 }); o.expectCallbacks({ added: 1, changed: 1 }); let waitMethodMessage = JSON.parse(stream.sent.shift()); @@ -897,9 +897,9 @@ Tinytest.add('livedata stub - reconnect', function(test) { id: waitMethodMessage.id }); test.equal(stream.sent.length, 0); - stream.receive({ msg: 'result', id: waitMethodMessage.id, result: 'bupkis' }); + await stream.receive({ msg: 'result', id: waitMethodMessage.id, result: 'bupkis' }); test.equal(stream.sent.length, 0); - stream.receive({ msg: 'updated', methods: [waitMethodMessage.id] }); + await stream.receive({ msg: 'updated', methods: [waitMethodMessage.id] }); // wait method done means we can send the third method test.equal(stream.sent.length, 1); @@ -916,14 +916,14 @@ Tinytest.add('livedata stub - reconnect', function(test) { }); if (Meteor.isClient) { - Tinytest.add('livedata stub - reconnect non-idempotent method', function( + Tinytest.addAsync('livedata stub - reconnect non-idempotent method', async function( test ) { // This test is for https://github.com/meteor/meteor/issues/6108 const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); let firstMethodCallbackFired = false; let firstMethodCallbackErrored = false; @@ -954,12 +954,12 @@ if (Meteor.isClient) { stream.sent.shift(); stream.sent.shift(); // reconnect - stream.reset(); + await stream.reset(); // verify that a reconnect message was sent. testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); // Make sure that the stream triggers connection. - stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); //The method callback should fire even though the stream has not sent a response. //the callback should have been fired with an error. @@ -974,14 +974,14 @@ if (Meteor.isClient) { } function addReconnectTests(name, testFunc) { - Tinytest.add(name + ' (deprecated)', function(test) { + Tinytest.addAsync(name + ' (deprecated)', async function(test) { function deprecatedSetOnReconnect(conn, handler) { conn.onReconnect = handler; } - testFunc.call(this, test, deprecatedSetOnReconnect); + await testFunc.call(this, test, deprecatedSetOnReconnect); }); - Tinytest.add(name, function(test) { + Tinytest.addAsync(name, async function(test) { let stopper; function setOnReconnect(conn, handler) { stopper && stopper.stop(); @@ -991,7 +991,7 @@ function addReconnectTests(name, testFunc) { } }); } - testFunc.call(this, test, setOnReconnect); + await testFunc.call(this, test, setOnReconnect); stopper && stopper.stop(); }); } @@ -999,10 +999,10 @@ function addReconnectTests(name, testFunc) { if (Meteor.isClient) { addReconnectTests( 'livedata stub - reconnect method which only got result', - function(test, setOnReconnect) { + async function(test, setOnReconnect) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); @@ -1034,7 +1034,7 @@ if (Meteor.isClient) { ); // Stub write is visible. test.equal(coll.find({ foo: 'bar' }).count(), 1); - const stubWrittenId = coll.findOne({ foo: 'bar' })._id; + const stubWrittenId = await coll.findOneAsync({ foo: 'bar' })._id; o.expectCallbacks({ added: 1 }); // Callback not called. test.equal(callbackOutput, []); @@ -1050,7 +1050,7 @@ if (Meteor.isClient) { test.equal(stream.sent.length, 0); // Get some data. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: MongoID.idStringify(stubWrittenId), @@ -1058,17 +1058,17 @@ if (Meteor.isClient) { }); // It doesn't show up yet. test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar' }); o.expectCallbacks(); // Get the result. - stream.receive({ msg: 'result', id: methodId, result: 'bla' }); + await stream.receive({ msg: 'result', id: methodId, result: 'bla' }); // Data unaffected. test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar' }); @@ -1080,11 +1080,11 @@ if (Meteor.isClient) { // Reset stream. Method does NOT get resent, because its result is already // in. Reconnect quiescence happens as soon as 'connected' is received because // there are no pending methods or subs in need of revival. - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); // Still holding out hope for session resumption, so nothing updated yet. test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar' }); @@ -1093,18 +1093,18 @@ if (Meteor.isClient) { // Receive 'connected': time for reconnect quiescence! Data gets updated // locally (ie, data is reset) and callback gets called. - stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); test.equal(coll.find().count(), 0); o.expectCallbacks({ removed: 1 }); test.equal(callbackOutput, ['bla']); test.equal(onResultReceivedOutput, ['bla']); - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: MongoID.idStringify(stubWrittenId), fields: { baz: 42 } }); - test.equal(coll.findOne(stubWrittenId), { _id: stubWrittenId, baz: 42 }); + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, baz: 42 }); o.expectCallbacks({ added: 1 }); // Run method again. We're going to do the same thing this time, except we're @@ -1124,7 +1124,7 @@ if (Meteor.isClient) { ); // Stub write is visible. test.equal(coll.find({ foo: 'bar' }).count(), 1); - const stubWrittenId2 = coll.findOne({ foo: 'bar' })._id; + const stubWrittenId2 = await coll.findOneAsync({ foo: 'bar' })._id; o.expectCallbacks({ added: 1 }); // Callback not called. test.equal(callbackOutput, ['bla']); @@ -1140,7 +1140,7 @@ if (Meteor.isClient) { test.equal(stream.sent.length, 0); // Get some data. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: MongoID.idStringify(stubWrittenId2), @@ -1148,17 +1148,17 @@ if (Meteor.isClient) { }); // It doesn't show up yet. test.equal(coll.find().count(), 2); - test.equal(coll.findOne(stubWrittenId2), { + test.equal(await coll.findOneAsync(stubWrittenId2), { _id: stubWrittenId2, foo: 'bar' }); o.expectCallbacks(); // Get the result. - stream.receive({ msg: 'result', id: methodId2, result: 'blab' }); + await stream.receive({ msg: 'result', id: methodId2, result: 'blab' }); // Data unaffected. test.equal(coll.find().count(), 2); - test.equal(coll.findOne(stubWrittenId2), { + test.equal(await coll.findOneAsync(stubWrittenId2), { _id: stubWrittenId2, foo: 'bar' }); @@ -1175,7 +1175,7 @@ if (Meteor.isClient) { // Reset stream. Method does NOT get resent, because its result is already in, // but slowMethod gets called via onReconnect. Reconnect quiescence is now // blocking on slowMethod. - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1)); const slowMethodId = testGotMessage(test, stream, { msg: 'method', @@ -1185,7 +1185,7 @@ if (Meteor.isClient) { }).id; // Still holding out hope for session resumption, so nothing updated yet. test.equal(coll.find().count(), 2); - test.equal(coll.findOne(stubWrittenId2), { + test.equal(await coll.findOneAsync(stubWrittenId2), { _id: stubWrittenId2, foo: 'bar' }); @@ -1193,9 +1193,9 @@ if (Meteor.isClient) { test.equal(callbackOutput, ['bla']); // Receive 'connected'... but no reconnect quiescence yet due to slowMethod. - stream.receive({ msg: 'connected', session: SESSION_ID + 2 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 2 }); test.equal(coll.find().count(), 2); - test.equal(coll.findOne(stubWrittenId2), { + test.equal(await coll.findOneAsync(stubWrittenId2), { _id: stubWrittenId2, foo: 'bar' }); @@ -1203,7 +1203,7 @@ if (Meteor.isClient) { test.equal(callbackOutput, ['bla']); // Receive data matching our stub. It doesn't take effect yet. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: MongoID.idStringify(stubWrittenId2), @@ -1215,9 +1215,9 @@ if (Meteor.isClient) { // slowMethod callback)... ie, a reset followed by applying the data we just // got, as well as calling the callback from the method that half-finished // before reset. The net effect is deleting doc 'stubWrittenId'. - stream.receive({ msg: 'updated', methods: [slowMethodId] }); + await stream.receive({ msg: 'updated', methods: [slowMethodId] }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId2), { + test.equal(await coll.findOneAsync(stubWrittenId2), { _id: stubWrittenId2, foo: 'bar' }); @@ -1225,7 +1225,7 @@ if (Meteor.isClient) { test.equal(callbackOutput, ['bla', 'blab']); // slowMethod returns a value now. - stream.receive({ msg: 'result', id: slowMethodId, result: 'slow' }); + await stream.receive({ msg: 'result', id: slowMethodId, result: 'slow' }); o.expectCallbacks(); test.equal(callbackOutput, ['bla', 'blab', 'slow']); @@ -1233,16 +1233,16 @@ if (Meteor.isClient) { } ); } -Tinytest.add('livedata stub - reconnect method which only got data', function( +Tinytest.addAsync('livedata stub - reconnect method which only got data', async function( test ) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); - const o = observeCursor(test, coll.find()); + const o = await observeCursor(test, coll.find()); // Call a method. We'll get the data-done message but not the result before // reconnect. @@ -1273,7 +1273,7 @@ Tinytest.add('livedata stub - reconnect method which only got data', function( test.equal(stream.sent.length, 0); // Get some data. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: 'photo', @@ -1281,14 +1281,14 @@ Tinytest.add('livedata stub - reconnect method which only got data', function( }); // It shows up instantly because the stub didn't write anything. test.equal(coll.find().count(), 1); - test.equal(coll.findOne('photo'), { _id: 'photo', baz: 42 }); + test.equal(await coll.findOneAsync('photo'), { _id: 'photo', baz: 42 }); o.expectCallbacks({ added: 1 }); // Get the data-done message. - stream.receive({ msg: 'updated', methods: [methodId] }); + await stream.receive({ msg: 'updated', methods: [methodId] }); // Data still here. test.equal(coll.find().count(), 1); - test.equal(coll.findOne('photo'), { _id: 'photo', baz: 42 }); + test.equal(await coll.findOneAsync('photo'), { _id: 'photo', baz: 42 }); o.expectCallbacks(); // Method callback not called yet (no result yet). test.equal(callbackOutput, []); @@ -1296,7 +1296,7 @@ Tinytest.add('livedata stub - reconnect method which only got data', function( // Reset stream. Method gets resent (with same ID), and blocks reconnect // quiescence. - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); testGotMessage(test, stream, { msg: 'method', @@ -1306,15 +1306,15 @@ Tinytest.add('livedata stub - reconnect method which only got data', function( }); // Still holding out hope for session resumption, so nothing updated yet. test.equal(coll.find().count(), 1); - test.equal(coll.findOne('photo'), { _id: 'photo', baz: 42 }); + test.equal(await coll.findOneAsync('photo'), { _id: 'photo', baz: 42 }); o.expectCallbacks(); test.equal(callbackOutput, []); test.equal(onResultReceivedOutput, []); // Receive 'connected'. Still blocking on reconnect quiescence. - stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne('photo'), { _id: 'photo', baz: 42 }); + test.equal(await coll.findOneAsync('photo'), { _id: 'photo', baz: 42 }); o.expectCallbacks(); test.equal(callbackOutput, []); test.equal(onResultReceivedOutput, []); @@ -1322,12 +1322,12 @@ Tinytest.add('livedata stub - reconnect method which only got data', function( // Receive method result. onResultReceived is called but the main callback // isn't (ie, we don't get confused by the fact that we got data-done the // *FIRST* time through). - stream.receive({ msg: 'result', id: methodId, result: 'res' }); + await stream.receive({ msg: 'result', id: methodId, result: 'res' }); test.equal(callbackOutput, []); test.equal(onResultReceivedOutput, ['res']); // Now we get data-done. Collection is reset and callback is called. - stream.receive({ msg: 'updated', methods: [methodId] }); + await stream.receive({ msg: 'updated', methods: [methodId] }); test.equal(coll.find().count(), 0); o.expectCallbacks({ removed: 1 }); test.equal(callbackOutput, ['res']); @@ -1336,10 +1336,10 @@ Tinytest.add('livedata stub - reconnect method which only got data', function( o.stop(); }); if (Meteor.isClient) { - Tinytest.add('livedata stub - multiple stubs same doc', function(test) { + Tinytest.addAsync('livedata stub - multiple stubs same doc', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); @@ -1361,7 +1361,7 @@ if (Meteor.isClient) { conn.call('insertSomething', _.identity); // Stub write is visible. test.equal(coll.find({ foo: 'bar' }).count(), 1); - const stubWrittenId = coll.findOne({ foo: 'bar' })._id; + const stubWrittenId = await coll.findOneAsync({ foo: 'bar' })._id; o.expectCallbacks({ added: 1 }); // Method sent. const insertMethodId = testGotMessage(test, stream, { @@ -1377,7 +1377,7 @@ if (Meteor.isClient) { conn.call('updateIt', stubWrittenId, _.identity); // This stub write is visible too. test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar', baz: 42 @@ -1393,7 +1393,7 @@ if (Meteor.isClient) { test.equal(stream.sent.length, 0); // Get some data... slightly different than what we wrote. - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: MongoID.idStringify(stubWrittenId), @@ -1405,7 +1405,7 @@ if (Meteor.isClient) { }); // It doesn't show up yet. test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar', baz: 42 @@ -1414,9 +1414,9 @@ if (Meteor.isClient) { // And get the first method-done. Still no updates to minimongo: we can't // quiesce the doc until the second method is done. - stream.receive({ msg: 'updated', methods: [insertMethodId] }); + await stream.receive({ msg: 'updated', methods: [insertMethodId] }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar', baz: 42 @@ -1424,7 +1424,7 @@ if (Meteor.isClient) { o.expectCallbacks(); // More data. Not quite what we wrote. Also ignored for now. - stream.receive({ + await stream.receive({ msg: 'changed', collection: collName, id: MongoID.idStringify(stubWrittenId), @@ -1432,7 +1432,7 @@ if (Meteor.isClient) { cleared: ['other'] }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'bar', baz: 42 @@ -1440,9 +1440,9 @@ if (Meteor.isClient) { o.expectCallbacks(); // Second data-ready. Now everything takes effect! - stream.receive({ msg: 'updated', methods: [updateMethodId] }); + await stream.receive({ msg: 'updated', methods: [updateMethodId] }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne(stubWrittenId), { + test.equal(await coll.findOneAsync(stubWrittenId), { _id: stubWrittenId, foo: 'barb', other2: 'bla', @@ -1455,14 +1455,14 @@ if (Meteor.isClient) { } if (Meteor.isClient) { - Tinytest.add( + Tinytest.addAsync( "livedata stub - unsent methods don't block quiescence", - function(test) { + async function(test) { // This test is for https://github.com/meteor/meteor/issues/555 const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); @@ -1485,7 +1485,7 @@ if (Meteor.isClient) { // Stub write is visible. test.equal(coll.find({ foo: 'bar' }).count(), 1); - const stubWrittenId = coll.findOne({ foo: 'bar' })._id; + const stubWrittenId = await coll.findOneAsync({ foo: 'bar' })._id; // first method sent const firstMethodId = testGotMessage(test, stream, { @@ -1497,8 +1497,8 @@ if (Meteor.isClient) { test.equal(stream.sent.length, 0); // ack the first method - stream.receive({ msg: 'updated', methods: [firstMethodId] }); - stream.receive({ msg: 'result', id: firstMethodId }); + await stream.receive({ msg: 'updated', methods: [firstMethodId] }); + await stream.receive({ msg: 'result', id: firstMethodId }); // Wait method sent. const waitMethodId = testGotMessage(test, stream, { @@ -1510,8 +1510,8 @@ if (Meteor.isClient) { test.equal(stream.sent.length, 0); // ack the wait method - stream.receive({ msg: 'updated', methods: [waitMethodId] }); - stream.receive({ msg: 'result', id: waitMethodId }); + await stream.receive({ msg: 'updated', methods: [waitMethodId] }); + await stream.receive({ msg: 'result', id: waitMethodId }); // insert method sent. const insertMethodId = testGotMessage(test, stream, { @@ -1524,19 +1524,19 @@ if (Meteor.isClient) { test.equal(stream.sent.length, 0); // ack the insert method - stream.receive({ msg: 'updated', methods: [insertMethodId] }); - stream.receive({ msg: 'result', id: insertMethodId }); + await stream.receive({ msg: 'updated', methods: [insertMethodId] }); + await stream.receive({ msg: 'result', id: insertMethodId }); // simulation reverted. test.equal(coll.find({ foo: 'bar' }).count(), 0); } ); } -Tinytest.add('livedata stub - reactive resub', function(test) { +Tinytest.addAsync('livedata stub - reactive resub', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const readiedSubs = {}; const markAllReady = function() { @@ -1627,10 +1627,10 @@ Tinytest.add('livedata connection - reactive userId', function(test) { test.equal(conn.userId(), 1337); }); -Tinytest.add('livedata connection - two wait methods', function(test) { +Tinytest.addAsync('livedata connection - two wait methods', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); @@ -1673,19 +1673,19 @@ Tinytest.add('livedata connection - two wait methods', function(test) { // Receive some data. "one" is not a wait method and there are no stubs, so it // gets applied immediately. test.equal(coll.find().count(), 0); - stream.receive({ + await stream.receive({ msg: 'added', collection: collName, id: 'foo', fields: { x: 1 } }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne('foo'), { _id: 'foo', x: 1 }); + test.equal(await coll.findOneAsync('foo'), { _id: 'foo', x: 1 }); // Let "one!" finish. Both messages are required to fire the callback. - stream.receive({ msg: 'result', id: one_message.id }); + await stream.receive({ msg: 'result', id: one_message.id }); test.equal(responses, []); - stream.receive({ msg: 'updated', methods: [one_message.id] }); + await stream.receive({ msg: 'updated', methods: [one_message.id] }); test.equal(responses, ['one']); // Now we've send out "two!". @@ -1697,23 +1697,23 @@ Tinytest.add('livedata connection - two wait methods', function(test) { // Receive more data. "two" is a wait method, so the data doesn't get applied // yet. - stream.receive({ + await stream.receive({ msg: 'changed', collection: collName, id: 'foo', fields: { y: 3 } }); test.equal(coll.find().count(), 1); - test.equal(coll.findOne('foo'), { _id: 'foo', x: 1 }); + test.equal(await coll.findOneAsync('foo'), { _id: 'foo', x: 1 }); // Let "two!" finish, with its end messages in the opposite order to "one!". - stream.receive({ msg: 'updated', methods: [two_message.id] }); + await stream.receive({ msg: 'updated', methods: [two_message.id] }); test.equal(responses, ['one']); test.equal(stream.sent.length, 0); // data-done message is enough to allow data to be written. test.equal(coll.find().count(), 1); - test.equal(coll.findOne('foo'), { _id: 'foo', x: 1, y: 3 }); - stream.receive({ msg: 'result', id: two_message.id }); + test.equal(await coll.findOneAsync('foo'), { _id: 'foo', x: 1, y: 3 }); + await stream.receive({ msg: 'result', id: two_message.id }); test.equal(responses, ['one', 'two']); // Verify that we just sent "three!" and "four!" now that we got @@ -1725,14 +1725,14 @@ Tinytest.add('livedata connection - two wait methods', function(test) { test.equal(four_message.params, ['four!']); // Out of order response is OK for non-wait methods. - stream.receive({ msg: 'result', id: three_message.id }); - stream.receive({ msg: 'result', id: four_message.id }); - stream.receive({ msg: 'updated', methods: [four_message.id] }); + await stream.receive({ msg: 'result', id: three_message.id }); + await stream.receive({ msg: 'result', id: four_message.id }); + await stream.receive({ msg: 'updated', methods: [four_message.id] }); test.equal(responses, ['one', 'two', 'four']); test.equal(stream.sent.length, 0); // Let three finish too. - stream.receive({ msg: 'updated', methods: [three_message.id] }); + await stream.receive({ msg: 'updated', methods: [three_message.id] }); test.equal(responses, ['one', 'two', 'four', 'three']); // Verify that we just sent "five!" (the next wait method). @@ -1742,8 +1742,8 @@ Tinytest.add('livedata connection - two wait methods', function(test) { test.equal(responses, ['one', 'two', 'four', 'three']); // Let five finish. - stream.receive({ msg: 'result', id: five_message.id }); - stream.receive({ msg: 'updated', methods: [five_message.id] }); + await stream.receive({ msg: 'result', id: five_message.id }); + await stream.receive({ msg: 'updated', methods: [five_message.id] }); test.equal(responses, ['one', 'two', 'four', 'three', 'five']); let six_message = JSON.parse(stream.sent.shift()); @@ -1752,10 +1752,10 @@ Tinytest.add('livedata connection - two wait methods', function(test) { addReconnectTests( 'livedata connection - onReconnect prepends messages correctly with a wait method', - function(test, setOnReconnect) { + async function(test, setOnReconnect) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // setup method conn.methods({ do_something: function(x) {} }); @@ -1773,7 +1773,7 @@ addReconnectTests( // reconnect stream.sent = []; - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); // Test that we sent what we expect to send, and we're blocked on @@ -1807,22 +1807,22 @@ addReconnectTests( } ); -Tinytest.add('livedata connection - ping without id', function(test) { +Tinytest.addAsync('livedata connection - ping without id', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); - stream.receive({ msg: 'ping' }); + await stream.receive({ msg: 'ping' }); testGotMessage(test, stream, { msg: 'pong' }); }); -Tinytest.add('livedata connection - ping with id', function(test) { +Tinytest.addAsync('livedata connection - ping with id', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const id = Random.id(); - stream.receive({ msg: 'ping', id: id }); + await stream.receive({ msg: 'ping', id: id }); testGotMessage(test, stream, { msg: 'pong', id: id }); }); @@ -1978,10 +1978,10 @@ Tinytest.addAsync('livedata connection - version negotiation error', function( addReconnectTests( 'livedata connection - onReconnect prepends messages correctly without a wait method', - function(test, setOnReconnect) { + async function(test, setOnReconnect) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // setup method conn.methods({ do_something: function(x) {} }); @@ -1999,7 +1999,7 @@ addReconnectTests( // reconnect stream.sent = []; - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); // Test that we sent what we expect to send, and we're blocked on @@ -2034,10 +2034,10 @@ addReconnectTests( addReconnectTests( 'livedata connection - onReconnect with sent messages', - function(test, setOnReconnect) { + async function(test, setOnReconnect) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // setup method conn.methods({ do_something: function(x) {} }); @@ -2050,7 +2050,7 @@ addReconnectTests( // initial connect stream.sent = []; - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); // Test that we sent just the login message. @@ -2062,15 +2062,15 @@ addReconnectTests( }).id; // we connect. - stream.receive({ msg: 'connected', session: Random.id() }); + await stream.receive({ msg: 'connected', session: Random.id() }); test.length(stream.sent, 0); // login got result (but not yet data) - stream.receive({ msg: 'result', id: loginId, result: 'foo' }); + await stream.receive({ msg: 'result', id: loginId, result: 'foo' }); test.length(stream.sent, 0); // login got data. now we send next method. - stream.receive({ msg: 'updated', methods: [loginId] }); + await stream.receive({ msg: 'updated', methods: [loginId] }); testGotMessage(test, stream, { msg: 'method', @@ -2081,13 +2081,13 @@ addReconnectTests( } ); -addReconnectTests('livedata stub - reconnect double wait method', function( +addReconnectTests('livedata stub - reconnect double wait method', async function( test, setOnReconnect ) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const output = []; setOnReconnect(conn, function() { @@ -2111,13 +2111,13 @@ addReconnectTests('livedata stub - reconnect double wait method', function( test.equal(stream.sent.length, 0); // Get the result. This means it will not be resent. - stream.receive({ msg: 'result', id: halfwayId, result: 'bla' }); + await stream.receive({ msg: 'result', id: halfwayId, result: 'bla' }); // Callback not called. test.equal(output, []); // Reset stream. halfwayMethod does NOT get resent, but reconnectMethod does! // Reconnect quiescence happens when reconnectMethod is done. - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); const reconnectId = testGotMessage(test, stream, { msg: 'method', @@ -2131,18 +2131,18 @@ addReconnectTests('livedata stub - reconnect double wait method', function( // Receive 'connected', but reconnect quiescence is blocking on // reconnectMethod. - stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); test.equal(output, []); // Data-done for reconnectMethod. This gets us to reconnect quiescence, so // halfwayMethod's callback fires. reconnectMethod's is still waiting on its // result. - stream.receive({ msg: 'updated', methods: [reconnectId] }); + await stream.receive({ msg: 'updated', methods: [reconnectId] }); test.equal(output.shift(), 'halfway'); test.equal(output, []); // Get result of reconnectMethod. Its callback fires. - stream.receive({ msg: 'result', id: reconnectId, result: 'foo' }); + await stream.receive({ msg: 'result', id: reconnectId, result: 'foo' }); test.equal(output.shift(), 'reconnect'); test.equal(output, []); @@ -2158,11 +2158,11 @@ addReconnectTests('livedata stub - reconnect double wait method', function( }); }); -Tinytest.add('livedata stub - subscribe errors', function(test) { +Tinytest.addAsync('livedata stub - subscribe errors', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // subscribe let onReadyFired = false; @@ -2202,7 +2202,7 @@ Tinytest.add('livedata stub - subscribe errors', function(test) { }); // Reject the sub. - stream.receive({ + await stream.receive({ msg: 'nosub', id: subMessage.id, error: new Meteor.Error(404, 'Subscription not found') @@ -2221,7 +2221,7 @@ Tinytest.add('livedata stub - subscribe errors', function(test) { test.equal(subErrorInError.reason, 'Subscription not found'); // stream reset: reconnect! - stream.reset(); + await stream.reset(); // We send a connect. testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); // We should NOT re-sub to the sub, because we processed the error. @@ -2229,11 +2229,11 @@ Tinytest.add('livedata stub - subscribe errors', function(test) { test.isFalse(onReadyFired); }); -Tinytest.add('livedata stub - subscribe stop', function(test) { +Tinytest.addAsync('livedata stub - subscribe stop', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); // subscribe let onReadyFired = false; @@ -2256,7 +2256,7 @@ Tinytest.add('livedata stub - subscribe stop', function(test) { }); if (Meteor.isClient) { - Tinytest.add('livedata stub - stubs before connected', function(test) { + Tinytest.addAsync('livedata stub - stubs before connected', async function(test) { const stream = new StubStream(); const conn = newConnection(stream); @@ -2264,7 +2264,7 @@ if (Meteor.isClient) { const coll = new Mongo.Collection(collName, { connection: conn }); // Start and send "connect", but DON'T get 'connected' quite yet. - stream.reset(); // initial connection start. + await stream.reset(); // initial connection start. testGotMessage(test, stream, makeConnectMessage()); test.length(stream.sent, 0); @@ -2272,7 +2272,7 @@ if (Meteor.isClient) { // Insert a document. The stub updates "conn" directly. coll.insert({ _id: 'foo', bar: 42 }, _.identity); test.equal(coll.find().count(), 1); - test.equal(coll.findOne(), { _id: 'foo', bar: 42 }); + test.equal(await coll.findOneAsync(), { _id: 'foo', bar: 42 }); // It also sends the method message. let methodMessage = JSON.parse(stream.sent.shift()); test.isUndefined(methodMessage.randomSeed); @@ -2286,33 +2286,33 @@ if (Meteor.isClient) { // Now receive a connected message. This should not clear the // _documentsWrittenByStub state! - stream.receive({ msg: 'connected', session: SESSION_ID }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); test.length(stream.sent, 0); test.equal(coll.find().count(), 1); // Now receive the "updated" message for the method. This should revert the // insert. - stream.receive({ msg: 'updated', methods: [methodMessage.id] }); + await stream.receive({ msg: 'updated', methods: [methodMessage.id] }); test.length(stream.sent, 0); test.equal(coll.find().count(), 0); }); } if (Meteor.isClient) { - Tinytest.add( + Tinytest.addAsync ( 'livedata stub - method call between reset and quiescence', - function(test) { + async function(test) { const stream = new StubStream(); const conn = newConnection(stream); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); conn.methods({ - update_value: function() { - coll.update('aaa', { value: 222 }); + update_value: async function() { + await coll.updateAsync('aaa', { value: 222 }); } }); @@ -2336,23 +2336,23 @@ if (Meteor.isClient) { const subReadyMessage = { msg: 'ready', subs: [subMessage.id] }; - stream.receive(subDocMessage); - stream.receive(subReadyMessage); - test.isTrue(coll.findOne('aaa').value == 111); + await stream.receive(subDocMessage); + await stream.receive(subReadyMessage); + test.isTrue(await coll.findOneAsync('aaa').value == 111); // Initiate reconnect. - stream.reset(); + await stream.reset(); testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); testGotMessage(test, stream, subMessage); - stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); + await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); // Now in reconnect, can still see the document. - test.isTrue(coll.findOne('aaa').value == 111); + test.isTrue(await coll.findOneAsync('aaa').value == 111); - conn.call('update_value'); + await conn.callAsync('update_value'); // Observe the stub-written value. - test.isTrue(coll.findOne('aaa').value == 222); + test.isTrue(await coll.findOneAsync('aaa').value == 222); let methodMessage = JSON.parse(stream.sent.shift()); test.equal(methodMessage, { @@ -2363,29 +2363,29 @@ if (Meteor.isClient) { }); test.length(stream.sent, 0); - stream.receive(subDocMessage); - stream.receive(subReadyMessage); + await stream.receive(subDocMessage); + await stream.receive(subReadyMessage); // By this point quiescence is reached and stores have been reset. // The stub-written value is still there. - test.isTrue(coll.findOne('aaa').value == 222); + test.isTrue(await coll.findOneAsync('aaa').value == 222); - stream.receive({ + await stream.receive({ msg: 'changed', collection: collName, id: 'aaa', fields: { value: 333 } }); - stream.receive({ msg: 'updated', methods: [methodMessage.id] }); - stream.receive({ msg: 'result', id: methodMessage.id, result: null }); + await stream.receive({ msg: 'updated', methods: [methodMessage.id] }); + await stream.receive({ msg: 'result', id: methodMessage.id, result: null }); // Server wrote a different value, make sure it's visible now. - test.isTrue(coll.findOne('aaa').value == 333); + test.isTrue(await coll.findOneAsync('aaa').value == 333); } ); - Tinytest.add('livedata stub - buffering and methods interaction', function( + Tinytest.addAsync('livedata stub - buffering and methods interaction', async function( test ) { const stream = new StubStream(); @@ -2395,14 +2395,14 @@ if (Meteor.isClient) { bufferedWritesMaxAge: 10000 }); - startAndConnect(test, stream); + await startAndConnect(test, stream); const collName = Random.id(); const coll = new Mongo.Collection(collName, { connection: conn }); conn.methods({ - update_value: function() { - const value = coll.findOne('aaa').subscription; + update_value: async function() { + const value = await coll.findOneAsync('aaa').subscription; // Method should have access to the latest value of the collection. coll.update('aaa', { $set: { method: value + 110 } }); } @@ -2428,9 +2428,9 @@ if (Meteor.isClient) { const subReadyMessage = { msg: 'ready', subs: [subMessage.id] }; - stream.receive(subDocMessage); - stream.receive(subReadyMessage); - test.equal(coll.findOne('aaa').subscription, 111); + await stream.receive(subDocMessage); + await stream.receive(subReadyMessage); + test.equal(await coll.findOneAsync('aaa').subscription, 111); const subDocChangeMessage = { msg: 'changed', @@ -2439,18 +2439,18 @@ if (Meteor.isClient) { fields: { subscription: 112 } }; - stream.receive(subDocChangeMessage); + await stream.receive(subDocChangeMessage); // Still 111 because buffer has not been flushed. - test.equal(coll.findOne('aaa').subscription, 111); + test.equal(await coll.findOneAsync('aaa').subscription, 111); // Call updates the stub. - conn.call('update_value'); + await conn.callAsync('update_value'); // Observe the stub-written value. - test.equal(coll.findOne('aaa').method, 222); + test.equal(await coll.findOneAsync('aaa').method, 222); // subscription field is updated to the latest value // because of the method call. - test.equal(coll.findOne('aaa').subscription, 112); + test.equal(await coll.findOneAsync('aaa').subscription, 112); let methodMessage = JSON.parse(stream.sent.shift()); test.equal(methodMessage, { @@ -2464,23 +2464,23 @@ if (Meteor.isClient) { // "Server-side" change from the method arrives and method returns. // With potentially fixed value for method field, if stub didn't // use 112 as the subscription field value. - stream.receive({ + await stream.receive({ msg: 'changed', collection: collName, id: 'aaa', fields: { method: 222 } }); - stream.receive({ msg: 'updated', methods: [methodMessage.id] }); - stream.receive({ msg: 'result', id: methodMessage.id, result: null }); + await stream.receive({ msg: 'updated', methods: [methodMessage.id] }); + await stream.receive({ msg: 'result', id: methodMessage.id, result: null }); - test.equal(coll.findOne('aaa').method, 222); - test.equal(coll.findOne('aaa').subscription, 112); + test.equal(await coll.findOneAsync('aaa').method, 222); + test.equal(await coll.findOneAsync('aaa').subscription, 112); // Buffer should already be flushed because of a non-update message. // And after a flush we really want subscription field to be 112. conn._flushBufferedWrites(); - test.equal(coll.findOne('aaa').method, 222); - test.equal(coll.findOne('aaa').subscription, 112); + test.equal(await coll.findOneAsync('aaa').method, 222); + test.equal(await coll.findOneAsync('aaa').subscription, 112); }); } diff --git a/packages/ddp-client/test/stub_stream.js b/packages/ddp-client/test/stub_stream.js index 1b0186a348..d65990055d 100644 --- a/packages/ddp-client/test/stub_stream.js +++ b/packages/ddp-client/test/stub_stream.js @@ -32,23 +32,23 @@ _.extend(StubStream.prototype, { }, // Methods for tests - receive: function(data) { + receive: async function(data) { const self = this; if (typeof data === 'object') { data = EJSON.stringify(data); } - _.each(self.callbacks['message'], function(cb) { - cb(data); - }); + for (const cb of self.callbacks['message']) { + await cb(data); + } }, - reset: function() { + reset: async function() { const self = this; - _.each(self.callbacks['reset'], function(cb) { - cb(); - }); + for (const cb of self.callbacks['reset']) { + await cb(); + } }, // Provide a tag to detect stub streams. diff --git a/packages/meteor/async_helpers.js b/packages/meteor/async_helpers.js index e326265c03..7be1653c76 100644 --- a/packages/meteor/async_helpers.js +++ b/packages/meteor/async_helpers.js @@ -35,36 +35,22 @@ class AsynchronousQueue { this._draining = false; } - queueTask(task) { + async queueTask(task) { this._taskHandles.push({ task: task, name: task.name }); - return this._scheduleRun(); + await this._scheduleRun(); } - _scheduleRun() { + async _scheduleRun() { // Already running or scheduled? Do nothing. if (this._runningOrRunScheduled) return; this._runningOrRunScheduled = true; - let resolver; - const returnValue = new Promise(r => resolver = r); - setImmediate(() => { - Meteor._runAsync(async () => { - await this._run(); - - if (!resolver) { - throw new Error("Resolver not found for task"); - } - - resolver(); - }); - }); - - return returnValue; + await this._run(); } async _run() { @@ -91,7 +77,7 @@ class AsynchronousQueue { await this._scheduleRun(); } - runTask(task) { + async runTask(task) { const handle = { task: Meteor.bindEnvironment(task, function(e) { Meteor._debug('Exception from task', e); @@ -100,7 +86,7 @@ class AsynchronousQueue { name: task.name }; this._taskHandles.push(handle); - return this._scheduleRun(); + await this._scheduleRun(); } flush() { diff --git a/packages/meteor/dynamics_nodejs.js b/packages/meteor/dynamics_nodejs.js index 62ea44205c..8d2f3f9406 100644 --- a/packages/meteor/dynamics_nodejs.js +++ b/packages/meteor/dynamics_nodejs.js @@ -245,12 +245,12 @@ const bindEnvironmentAsync = (func, onException, _this) => { async () => { let ret; try { - Meteor._updateAslStore(CURRENT_VALUE_KEY_NAME, dynamics); + if (currentSlot) { + Meteor._updateAslStore(CURRENT_VALUE_KEY_NAME, dynamics); + } ret = await func.apply(_this, args); } catch (e) { onException(e); - } finally { - Meteor._updateAslStore(CURRENT_VALUE_KEY_NAME, undefined); } return ret; },