Merge branch 'async-ddp' into async-accounts-base

This commit is contained in:
Gabriel Grubba
2022-12-08 14:41:14 -03:00
3 changed files with 260 additions and 204 deletions

View File

@@ -109,8 +109,8 @@ Ledger.allow({
fetch: []
});
Meteor.startup(function() {
if (Meteor.isServer) Ledger.remove({}); // XXX can this please be Ledger.remove()?
Meteor.startup(async function() {
if (Meteor.isServer) await Ledger.removeAsync({});
});
if (Meteor.isServer)
@@ -120,14 +120,14 @@ if (Meteor.isServer)
});
Meteor.methods({
'ledger/transfer': function(world, from_name, to_name, amount, cheat) {
'ledger/transfer': async function(world, from_name, to_name, amount, cheat) {
check(world, String);
check(from_name, String);
check(to_name, String);
check(amount, Number);
check(cheat, Match.Optional(Boolean));
const from = Ledger.findOne({ name: from_name, world: world });
const to = Ledger.findOne({ name: to_name, world: world });
const from = await Ledger.findOneAsync({ name: from_name, world: world });
const to = await Ledger.findOneAsync({ name: to_name, world: world });
if (Meteor.isServer) cheat = false;
@@ -146,8 +146,8 @@ Meteor.methods({
if (from.balance < amount && !cheat)
throw new Meteor.Error(409, 'Insufficient funds');
Ledger.update(from._id, { $inc: { balance: -amount } });
Ledger.update(to._id, { $inc: { balance: amount } });
await Ledger.updateAsync({_id: from._id}, { $inc: { balance: -amount } });
await Ledger.updateAsync({_id: to._id, }, { $inc: { balance: amount } });
}
});
@@ -157,47 +157,48 @@ Meteor.methods({
objectsWithUsers = new Mongo.Collection('objectsWithUsers');
if (Meteor.isServer) {
objectsWithUsers.remove({});
objectsWithUsers.insert({ name: 'owned by none', ownerUserIds: [null] });
objectsWithUsers.insert({ name: 'owned by one - a', ownerUserIds: ['1'] });
objectsWithUsers.insert({
name: 'owned by one/two - a',
ownerUserIds: ['1', '2']
});
objectsWithUsers.insert({
name: 'owned by one/two - b',
ownerUserIds: ['1', '2']
});
objectsWithUsers.insert({ name: 'owned by two - a', ownerUserIds: ['2'] });
objectsWithUsers.insert({ name: 'owned by two - b', ownerUserIds: ['2'] });
Meteor.startup(async function() {
if (Meteor.isServer) {
await objectsWithUsers.removeAsync({});
await objectsWithUsers.insertAsync({name: 'owned by none', ownerUserIds: [null]});
await objectsWithUsers.insertAsync({name: 'owned by one - a', ownerUserIds: ['1']});
await objectsWithUsers.insertAsync({
name: 'owned by one/two - a',
ownerUserIds: ['1', '2']
});
await objectsWithUsers.insertAsync({
name: 'owned by one/two - b',
ownerUserIds: ['1', '2']
});
await objectsWithUsers.insertAsync({name: 'owned by two - a', ownerUserIds: ['2']});
await objectsWithUsers.insertAsync({name: 'owned by two - b', ownerUserIds: ['2']});
Meteor.publish('objectsWithUsers', function() {
return objectsWithUsers.find(
{ ownerUserIds: this.userId },
{ fields: { ownerUserIds: 0 } }
);
});
(function() {
const userIdWhenStopped = Object.create(null);
Meteor.publish('recordUserIdOnStop', function(key) {
check(key, String);
const self = this;
self.onStop(function() {
userIdWhenStopped[key] = self.userId;
});
Meteor.publish('objectsWithUsers', function () {
return objectsWithUsers.find(
{ownerUserIds: this.userId},
{fields: {ownerUserIds: 0}}
);
});
Meteor.methods({
userIdWhenStopped: function(key) {
(function () {
const userIdWhenStopped = Object.create(null);
Meteor.publish('recordUserIdOnStop', function (key) {
check(key, String);
return userIdWhenStopped[key];
}
});
})();
}
const self = this;
self.onStop(function () {
userIdWhenStopped[key] = self.userId;
});
});
Meteor.methods({
userIdWhenStopped: function (key) {
check(key, String);
return userIdWhenStopped[key];
}
});
})();
}
});
/*****/
/// Helper for "livedata - setUserId fails when called on server"
@@ -332,36 +333,38 @@ if (Meteor.isServer) {
One = new Mongo.Collection('collectionOne');
Two = new Mongo.Collection('collectionTwo');
if (Meteor.isServer) {
One.remove({});
One.insert({ name: 'value1' });
One.insert({ name: 'value2' });
Meteor.startup(async () => {
if (Meteor.isServer) {
await One.removeAsync({});
await One.insertAsync({ name: 'value1' });
await One.insertAsync({ name: 'value2' });
Two.remove({});
Two.insert({ name: 'value3' });
Two.insert({ name: 'value4' });
Two.insert({ name: 'value5' });
await Two.removeAsync({});
await Two.insertAsync({ name: 'value3' });
await Two.insertAsync({ name: 'value4' });
await Two.insertAsync({ name: 'value5' });
Meteor.publish('multiPublish', function(options) {
// See below to see what options are accepted.
check(options, Object);
if (options.normal) {
return [One.find(), Two.find()];
} else if (options.dup) {
// Suppress the log of the expected internal error.
Meteor._suppress_log(1);
return [
One.find(),
One.find({ name: 'value2' }), // multiple cursors for one collection - error
Two.find()
];
} else if (options.notCursor) {
// Suppress the log of the expected internal error.
Meteor._suppress_log(1);
return [One.find(), 'not a cursor', Two.find()];
} else throw 'unexpected options';
});
}
Meteor.publish('multiPublish', function(options) {
// See below to see what options are accepted.
check(options, Object);
if (options.normal) {
return [One.find(), Two.find()];
} else if (options.dup) {
// Suppress the log of the expected internal error.
Meteor._suppress_log(1);
return [
One.find(),
One.find({ name: 'value2' }), // multiple cursors for one collection - error
Two.find(),
];
} else if (options.notCursor) {
// Suppress the log of the expected internal error.
Meteor._suppress_log(1);
return [One.find(), 'not a cursor', Two.find()];
} else throw 'unexpected options';
});
}
});
/// Helper for "livedata - result by value"
const resultByValueArrays = Object.create(null);

View File

@@ -1,6 +1,24 @@
import { DDP } from '../common/namespace.js';
import { Connection } from '../common/livedata_connection.js';
const _sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
const callWhenSubReady = async (subName, handle, cb = () => {}) => {
let control = 0;
while (!handle.ready()) {
if (!handle.ready()) {
// Just in case something happens with the subscription, we have this control
if (control++ === 1000) {
throw new Error(`Subscribe to ${subName} is taking too long!`);
}
await _sleep(0);
return;
}
await cb();
}
};
// XXX should check error codes
const failure = function(test, code, reason) {
return function(error, result) {
@@ -129,7 +147,8 @@ testAsyncMulti('livedata - basic method invocation', [
// make sure 'undefined' is preserved as such, instead of turning
// into null (JSON does not have 'undefined' so there is special
// code for this)
if (Meteor.isServer) test.equal(await Meteor.callAsync('nothing'), undefined);
if (Meteor.isServer)
test.equal(await Meteor.callAsync('nothing'), undefined);
if (Meteor.isClient) test.equal(Meteor.call('nothing'), undefined);
test.equal(Meteor.call('nothing', expect(undefined, undefined)), undefined);
@@ -155,7 +174,10 @@ testAsyncMulti('livedata - basic method invocation', [
async function(test, expect) {
if (Meteor.isServer)
test.equal(await Meteor.callAsync('echo', 12, { x: 13 }), [12, { x: 13 }]);
test.equal(await Meteor.callAsync('echo', 12, { x: 13 }), [
12,
{ x: 13 },
]);
if (Meteor.isClient)
test.equal(Meteor.call('echo', 12, { x: 13 }), undefined);
@@ -292,7 +314,7 @@ testAsyncMulti('livedata - basic method invocation', [
try {
await Meteor.callAsync('exception', 'both', {
intended: true,
throwThroughFuture: true
throwThroughFuture: true,
});
} catch (e) {
threw = true;
@@ -327,74 +349,81 @@ testAsyncMulti('livedata - basic method invocation', [
'server',
{
intended: true,
throwThroughFuture: true
throwThroughFuture: true,
},
expect(failure(test, 999, 'Client-visible test exception'))
),
undefined
);
}
}
},
]);
const checkBalances = function(test, a, b) {
const alice = Ledger.findOne({ name: 'alice', world: test.runId() });
const bob = Ledger.findOne({ name: 'bob', world: test.runId() });
const checkBalances = async function(test, a, b) {
const alice = await Ledger.findOneAsync({
name: 'alice',
world: test.runId(),
});
const bob = await Ledger.findOneAsync({ name: 'bob', world: test.runId() });
test.equal(alice.balance, a);
test.equal(bob.balance, b);
};
const subscribeBeforeRun = async (subName, testId, cb) => {
if (Meteor.isClient) {
const handle = Meteor.subscribe(subName, testId);
await callWhenSubReady(subName, handle);
handle.stop();
}
await cb();
};
// would be nice to have a database-aware test harness of some kind --
// this is a big hack (and XXX pollutes the global test namespace)
testAsyncMulti('livedata - compound methods', [
function(test, expect) {
if (Meteor.isClient) Meteor.subscribe('ledger', test.runId(), expect());
Ledger.insert(
{ name: 'alice', balance: 100, world: test.runId() },
expect(function() {})
);
Ledger.insert(
{ name: 'bob', balance: 50, world: test.runId() },
expect(function() {})
);
async function(test) {
await Ledger.insertAsync({
name: 'alice',
balance: 100,
world: test.runId(),
});
await Ledger.insertAsync({ name: 'bob', balance: 50, world: test.runId() });
},
function(test, expect) {
Meteor.call(
'ledger/transfer',
test.runId(),
'alice',
'bob',
10,
expect(function(err, result) {
test.equal(err, undefined);
test.equal(result, undefined);
checkBalances(test, 90, 60);
})
);
checkBalances(test, 90, 60);
async function(test) {
await subscribeBeforeRun('ledger', test.runId(), async () => {
await Meteor.callAsync(
'ledger/transfer',
test.runId(),
'alice',
'bob',
10
);
await checkBalances(test, 90, 60);
});
},
function(test, expect) {
Meteor.call(
'ledger/transfer',
test.runId(),
'alice',
'bob',
100,
true,
expect(function(err, result) {
failure(test, 409)(err, result);
// Balances are reverted back to pre-stub values.
checkBalances(test, 90, 60);
})
);
async function(test) {
await subscribeBeforeRun('ledger', test.runId(), async () => {
try {
await Meteor.callAsync(
'ledger/transfer',
test.runId(),
'alice',
'bob',
100,
true
);
} catch (e) {}
if (Meteor.isClient)
// client can fool itself by cheating, but only until the sync
// finishes
checkBalances(test, -10, 160);
else checkBalances(test, 90, 60);
}
if (Meteor.isClient) {
// client can fool itself by cheating, but only until the sync
// finishes
await checkBalances(test, -10, 160);
} else {
await checkBalances(test, 90, 60);
}
});
},
]);
// Replaces the Connection's `_livedata_data` method to push incoming
@@ -435,7 +464,7 @@ if (Meteor.isClient) {
testAsyncMulti(
'livedata - changing userid reruns subscriptions without flapping data on the wire',
[
function(test, expect) {
async function(test, expect) {
const messages = [];
const undoEavesdrop = eavesdropOnCollection(
Meteor.connection,
@@ -484,53 +513,53 @@ if (Meteor.isClient) {
let afterSecondSetUserId;
let afterThirdSetUserId;
Meteor.subscribe(
'objectsWithUsers',
expect(function() {
expectMessages(1, 0, ['owned by none']);
const handle = Meteor.subscribe('objectsWithUsers');
// Just make sure the subscription is ready before running the tests
// As everything now runs async, the tests were running before the data fully came in
await callWhenSubReady('objectsWithUsers', handle, () => {
expectMessages(1, 0, ['owned by none']);
Meteor.apply('setUserId', ['1'], { wait: true }, afterFirstSetUserId);
afterFirstSetUserId = expect(function() {
expectMessages(3, 1, [
'owned by one - a',
'owned by one/two - a',
'owned by one/two - b',
]);
Meteor.apply(
'setUserId',
['1'],
['2'],
{ wait: true },
afterFirstSetUserId
afterSecondSetUserId
);
})
);
});
afterFirstSetUserId = expect(function() {
expectMessages(3, 1, [
'owned by one - a',
'owned by one/two - a',
'owned by one/two - b'
]);
Meteor.apply(
'setUserId',
['2'],
{ wait: true },
afterSecondSetUserId
);
});
afterSecondSetUserId = expect(function() {
expectMessages(2, 1, [
'owned by one/two - a',
'owned by one/two - b',
'owned by two - a',
'owned by two - b',
]);
Meteor.apply(
'setUserId',
['2'],
{ wait: true },
afterThirdSetUserId
);
});
afterSecondSetUserId = expect(function() {
expectMessages(2, 1, [
'owned by one/two - a',
'owned by one/two - b',
'owned by two - a',
'owned by two - b'
]);
Meteor.apply('setUserId', ['2'], { wait: true }, afterThirdSetUserId);
});
afterThirdSetUserId = expect(function() {
// Nothing should have been sent since the results of the
// query are the same ("don't flap data on the wire")
expectMessages(0, 0, [
'owned by one/two - a',
'owned by one/two - b',
'owned by two - a',
'owned by two - b'
]);
undoEavesdrop();
afterThirdSetUserId = expect(function() {
// Nothing should have been sent since the results of the
// query are the same ("don't flap data on the wire")
expectMessages(0, 0, [
'owned by one/two - a',
'owned by one/two - b',
'owned by two - a',
'owned by two - b',
]);
undoEavesdrop();
});
});
},
function(test, expect) {
@@ -563,7 +592,7 @@ if (Meteor.isClient) {
{ wait: true },
expect(function() {})
);
}
},
]
);
}
@@ -614,7 +643,7 @@ Meteor.methods({
return 2;
}
return 0;
}
},
});
if (Meteor.isClient) {
@@ -623,12 +652,22 @@ if (Meteor.isClient) {
const id = Random.id();
testAsyncMulti('livedata - added from two different subs', [
function(test, expect) {
Meteor.call('livedata/setup', id, expect(function() {}));
Meteor.call(
'livedata/setup',
id,
expect(function() {})
);
},
function(test, expect) {
MultiPub = new Mongo.Collection('MultiPubCollection' + id);
const sub1 = Meteor.subscribe('pub1' + id, expect(function() {}));
const sub2 = Meteor.subscribe('pub2' + id, expect(function() {}));
const sub1 = Meteor.subscribe(
'pub1' + id,
expect(function() {})
);
const sub2 = Meteor.subscribe(
'pub2' + id,
expect(function() {})
);
},
function(test, expect) {
Meteor.call(
@@ -653,7 +692,7 @@ if (Meteor.isClient) {
},
function(test, expect) {
test.equal(MultiPub.findOne('foo'), { _id: 'foo', a: 'aa', b: 'bb' });
}
},
]);
})();
}
@@ -672,7 +711,7 @@ if (Meteor.isClient) {
test.isTrue(coll.findOne(token));
})
);
}
},
]);
testAsyncMulti('livedata - runtime universal sub creation', [
@@ -688,7 +727,7 @@ if (Meteor.isClient) {
test.isTrue(coll.findOne(token));
})
);
}
},
]);
testAsyncMulti('livedata - no setUserId after unblock', [
@@ -700,7 +739,7 @@ if (Meteor.isClient) {
test.isTrue(result);
})
);
}
},
]);
testAsyncMulti(
@@ -714,7 +753,7 @@ if (Meteor.isClient) {
// Use a separate connection so that we can safely check to see if
// conn._subscriptions is empty.
conn = new Connection('/', {
reloadWithOutstanding: true
reloadWithOutstanding: true,
});
collName = Random.id();
coll = new Mongo.Collection(collName, { connection: conn });
@@ -730,7 +769,7 @@ if (Meteor.isClient) {
? 'Internal server error'
: 'Explicit error'
)
)
),
});
};
testSubError({ throwInHandler: true });
@@ -752,7 +791,7 @@ if (Meteor.isClient) {
onReady: expect(),
onError: function(error) {
errorFromRerun = error;
}
},
}
);
},
@@ -761,7 +800,11 @@ if (Meteor.isClient) {
test.equal(coll.find().count(), 1);
test.isFalse(errorFromRerun);
test.equal(_.size(conn._subscriptions), 1); // white-box test
conn.call('setUserId', 'bla', expect(function() {}));
conn.call(
'setUserId',
'bla',
expect(function() {})
);
},
function(test, expect) {
// Now that we've re-run, we should have stopped the subscription,
@@ -780,13 +823,16 @@ if (Meteor.isClient) {
{
onError: function() {
gotErrorFromStopper = true;
}
},
}
);
// Call a method. This method won't be processed until the publisher's
// function returns, so blocking on it being done ensures that we've
// gotten the removed/nosub/etc.
conn.call('nothing', expect(function() {}));
conn.call(
'nothing',
expect(function() {})
);
},
function(test, expect) {
test.equal(coll.find().count(), 0);
@@ -794,7 +840,7 @@ if (Meteor.isClient) {
test.isFalse(gotErrorFromStopper);
test.equal(_.size(conn._subscriptions), 0); // white-box test
conn._stream.disconnect({ _permanent: true });
}
},
];
})()
);
@@ -810,7 +856,7 @@ if (Meteor.isClient) {
// Use a separate connection so that we can safely check to see if
// conn._subscriptions is empty.
conn = new Connection('/', {
reloadWithOutstanding: true
reloadWithOutstanding: true,
});
collName = Random.id();
coll = new Mongo.Collection(collName, { connection: conn });
@@ -826,7 +872,7 @@ if (Meteor.isClient) {
? 'Internal server error'
: 'Explicit error'
)
)
),
});
};
testSubError({ throwInHandler: true });
@@ -848,7 +894,7 @@ if (Meteor.isClient) {
onReady: expect(),
onStop: function(error) {
errorFromRerun = error;
}
},
}
);
},
@@ -857,7 +903,11 @@ if (Meteor.isClient) {
test.equal(coll.find().count(), 1);
test.isFalse(errorFromRerun);
test.equal(_.size(conn._subscriptions), 1); // white-box test
conn.call('setUserId', 'bla', expect(function() {}));
conn.call(
'setUserId',
'bla',
expect(function() {})
);
},
function(test, expect) {
// Now that we've re-run, we should have stopped the subscription,
@@ -878,13 +928,16 @@ if (Meteor.isClient) {
if (error) {
gotErrorFromStopper = true;
}
}
},
}
);
// Call a method. This method won't be processed until the publisher's
// function returns, so blocking on it being done ensures that we've
// gotten the removed/nosub/etc.
conn.call('nothing', expect(function() {}));
conn.call(
'nothing',
expect(function() {})
);
},
function(test, expect) {
test.equal(coll.find().count(), 0);
@@ -892,7 +945,7 @@ if (Meteor.isClient) {
test.isFalse(gotErrorFromStopper);
test.equal(_.size(conn._subscriptions), 0); // white-box test
conn._stream.disconnect({ _permanent: true });
}
},
];
})()
);
@@ -908,7 +961,7 @@ if (Meteor.isClient) {
test.equal(One.find().count(), 2);
test.equal(Two.find().count(), 3);
}),
onError: failure()
onError: failure(),
}
);
},
@@ -918,7 +971,7 @@ if (Meteor.isClient) {
{ dup: 1 },
{
onReady: failure(),
onError: expect(failure(test, 500, 'Internal server error'))
onError: expect(failure(test, 500, 'Internal server error')),
}
);
},
@@ -928,10 +981,10 @@ if (Meteor.isClient) {
{ notCursor: 1 },
{
onReady: failure(),
onError: expect(failure(test, 500, 'Internal server error'))
onError: expect(failure(test, 500, 'Internal server error')),
}
);
}
},
]);
}
@@ -944,7 +997,7 @@ if (Meteor.isServer) {
s2s: function(arg) {
check(arg, String);
return 's2s ' + arg;
}
},
});
}
(function() {
@@ -973,7 +1026,7 @@ if (Meteor.isServer) {
})
);
}
}
},
]);
})();
@@ -997,7 +1050,7 @@ if (Meteor.isServer) {
if (self.conn.status().connected) {
test.equal(await self.conn.callAsync('s2s', 'foo'), 's2s foo');
}
}
},
]);
})();
}
@@ -1014,7 +1067,7 @@ if (Meteor.isServer) {
}),
500
);
}
},
]);
})();
@@ -1038,13 +1091,13 @@ if (Meteor.isServer) {
onReady: expect(function() {
test.equal(PublisherCloningCollection.findOne(), {
_id: 'a',
x: { y: 43 }
x: { y: 43 },
});
}),
onError: failure()
onError: failure(),
}
);
}
},
]);
}
@@ -1083,7 +1136,7 @@ testAsyncMulti('livedata - result by value', [
test.equal(self.firstResult.length + 1, secondResult.length);
})
);
}
},
]);
// XXX some things to test in greater detail:

View File

@@ -1,8 +1,8 @@
Tinytest.add('livedata - DDP.randomStream', function(test) {
Tinytest.addAsync('livedata - DDP.randomStream', async function(test) {
const randomSeed = Random.id();
const context = { randomSeed: randomSeed };
let sequence = DDP._CurrentMethodInvocation.withValue(context, function() {
let sequence = await DDP._CurrentMethodInvocation.withValue(context, function() {
return DDP.randomStream('1');
});
@@ -21,7 +21,7 @@ Tinytest.add('livedata - DDP.randomStream', function(test) {
test.equal(id1, id1Cloned);
// We should get the same sequence when we use the same key
sequence = DDP._CurrentMethodInvocation.withValue(context, function() {
sequence = await DDP._CurrentMethodInvocation.withValue(context, function() {
return DDP.randomStream('1');
});
seeds = sequence.alea.args;