Remove callback and transform mongodb methods in async

This commit is contained in:
Edimar Cardoso
2023-01-03 11:25:17 -03:00
parent 65b76c7223
commit fcb7f8f8e7
8 changed files with 300 additions and 290 deletions

View File

@@ -1,22 +1,19 @@
import { DocFetcher } from "./doc_fetcher.js";
testAsyncMulti("mongo-livedata - doc fetcher", [
function (test, expect) {
async function (test, expect) {
var self = this;
var collName = "docfetcher-" + Random.id();
var collection = new Mongo.Collection(collName);
// var id1 = await ;
// var id2 = await collection.insert({y: 2});
Promise.all([collection.insert({x: 1}), collection.insert({y: 2})]).then(([id1, id2]) => {
console.trace({id1, id2});
});
//
// var fetcher = new DocFetcher(
// MongoInternals.defaultRemoteCollectionDriver().mongo);
//
// // Test basic operation.
// const fakeOp1 = {};
// const fakeOp2 = {};
var id1 = await collection.insert({x: 1});
var id2 = await collection.insert({y: 2});
var fetcher = new DocFetcher(
MongoInternals.defaultRemoteCollectionDriver().mongo);
// Test basic operation.
const fakeOp1 = {};
const fakeOp2 = {};
// fetcher.fetch(collName, id1, fakeOp1, expect(null, {_id: id1, x: 1}));
// fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null));
//

View File

@@ -1402,7 +1402,6 @@ Object.assign(MongoConnection.prototype, {
return observeDriver.stop();
}
});
self._observeMultiplexers[observeKey] = multiplexer;
}
var observeHandle = new ObserveHandle(multiplexer,
@@ -1466,7 +1465,7 @@ Object.assign(MongoConnection.prototype, {
// This field is only set for use in tests.
multiplexer._observeDriver = observeDriver;
}
self._observeMultiplexers[observeKey] = multiplexer;
// Blocks until the initial adds have been sent.
await multiplexer.addHandleAndSendInitialAdds(observeHandle);

View File

@@ -3286,54 +3286,54 @@ if (Meteor.isServer) {
// the future. (Well, the invocation happened earlier but the use of the
// Future sequences it so that the confirmation only gets read at this point.)
// TODO -> Fix me
// if (Meteor.isClient) {
// testAsyncMulti("mongo-livedata - fence onBeforeFire error", [
// function (test, expect) {
// var self = this;
// self.nonce = Random.id();
// Meteor.call('fenceOnBeforeFireError1', self.nonce, expect(function (err) {
// test.isFalse(err);
// }));
// },
// function (test, expect) {
// var self = this;
// Meteor.call('fenceOnBeforeFireError2', self.nonce, expect(
// function (err, success) {
// test.isFalse(err);
// test.isTrue(success);
// }
// ));
// }
// ]);
// } else {
// var fenceOnBeforeFireErrorCollection = new Mongo.Collection("FOBFE");
// var Future = Npm.require('fibers/future');
// var futuresByNonce = {};
// Meteor.methods({
// fenceOnBeforeFireError1: function (nonce) {
// futuresByNonce[nonce] = new Future;
// var observe = fenceOnBeforeFireErrorCollection.find({nonce: nonce})
// .observeChanges({added: function (){}});
// Meteor.setTimeout(function () {
// fenceOnBeforeFireErrorCollection.insert(
// {nonce: nonce},
// function (err, result) {
// var success = !err && result;
// futuresByNonce[nonce].return(success);
// observe.stop();
// }
// );
// }, 10);
// },
// fenceOnBeforeFireError2: function (nonce) {
// try {
// return futuresByNonce[nonce].wait();
// } finally {
// delete futuresByNonce[nonce];
// }
// }
// });
// }
if (Meteor.isClient) {
testAsyncMulti("mongo-livedata - fence onBeforeFire error", [
function (test, expect) {
var self = this;
self.nonce = Random.id();
Meteor.call('fenceOnBeforeFireError1', self.nonce, expect(function (err) {
test.isFalse(err);
}));
},
function (test, expect) {
var self = this;
Meteor.call('fenceOnBeforeFireError2', self.nonce, expect(
function (err, success) {
test.isFalse(err);
test.isTrue(success);
}
));
}
]);
} else {
var fenceOnBeforeFireErrorCollection = new Mongo.Collection("FOBFE");
var Future = Npm.require('fibers/future');
var futuresByNonce = {};
Meteor.methods({
fenceOnBeforeFireError1: function (nonce) {
futuresByNonce[nonce] = new Future;
var observe = fenceOnBeforeFireErrorCollection.find({nonce: nonce})
.observeChanges({added: function (){}});
Meteor.setTimeout(function () {
fenceOnBeforeFireErrorCollection.insert(
{nonce: nonce},
function (err, result) {
var success = !err && result;
futuresByNonce[nonce].return(success);
observe.stop();
}
);
}, 10);
},
fenceOnBeforeFireError2: function (nonce) {
try {
return futuresByNonce[nonce].wait();
} finally {
delete futuresByNonce[nonce];
}
}
});
}
if (Meteor.isServer) {
Tinytest.addAsync('mongo update/upsert - returns nMatched as numberAffected', async function (test) {
@@ -3366,115 +3366,115 @@ if (Meteor.isServer) {
test.equal(result.numberAffected, 4);
});
Tinytest.addAsync('mongo livedata - update/upsert callback returns nMatched as numberAffected', function (test, onComplete) {
Tinytest.addAsync('mongo livedata - update/upsert callback returns nMatched as numberAffected', async function (test, onComplete) {
var collName = Random.id();
var coll = new Mongo.Collection('update_nmatched'+collName);
Promise.all([{animal: 'cat', legs: 4}, {animal: 'dog', legs: 4}, {animal: 'echidna', legs: 4},{animal: 'platypus', legs: 4}, {animal: 'starfish', legs: 5}]
.map(({animal, legs}) => coll.insert({animal, legs}))).then(() => {
var test1 = function () {
coll.update({legs: 4}, {$set: {category: 'quadruped'}}, function (err, result) {
test.equal(result, 1);
test2();
});
};
await coll.insert({animal: 'cat', legs: 4});
await coll.insert({animal: 'dog', legs: 4});
await coll.insert({animal: 'echidna', legs: 4});
await coll.insert({animal: 'platypus', legs: 4});
await coll.insert({animal: 'starfish', legs: 5});
var test2 = function () {
//Changes only 3 but matched 4 documents
coll.update({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true}, function (err, result) {
test.equal(result, 4);
test3();
});
};
var test1 = async function () {
await coll.update({legs: 4}, {$set: {category: 'quadruped'}}).then(async result => {
test.equal(result, 1);
await test2();
});
};
var test3 = function () {
//Again, changes nothing but returns nModified
coll.update({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true}, function (err, result) {
test.equal(result, 4);
test4();
});
};
var test2 = async function () {
//Changes only 3 but matched 4 documents
await coll.update({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true}).then(async result => {
test.equal(result, 4);
await test3();
});
};
var test4 = function () {
//upsert:true changes nothing, 4 modified
coll.update({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true, upsert:true}, function (err, result) {
test.equal(result, 4);
test5();
});
};
var test3 = async function () {
//Again, changes nothing but returns nModified
await coll.update({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true}).then(async result => {
test.equal(result, 4);
await test4();
});
};
var test5 = function () {
//upsert method works as upsert:true
coll.upsert({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true}, function (err, result) {
test.equal(result.numberAffected, 4);
onComplete();
});
};
var test4 = async function () {
//upsert:true changes nothing, 4 modified
await coll.update({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true, upsert:true}).then(async result => {
test.equal(result, 4);
await test5();
});
};
test1();
});
var test5 = async function () {
//upsert method works as upsert:true
await coll.upsert({legs: 4}, {$set: {category: 'quadruped'}}, {multi: true}).then(async result => {
test.equal(result.numberAffected, 4);
onComplete();
});
};
await test1();
});
}
// if (Meteor.isServer) {
// Tinytest.addAsync("mongo-livedata - transaction", async function (test, onComplete) {
// const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
//
// const Collection = new Mongo.Collection(`transaction_test_${test.runId()}`);
// const rawCollection = Collection.rawCollection();
//
// await Collection.insert({ _id: "a" });
// await Collection.insert({ _id: "b" });
//
// let changeCount = 0;
// onComplete();
// return new Promise(resolve => {
// function finalize() {
// observeHandle.stop().then(() => {
// Meteor.clearTimeout(timeout);
// onComplete();
// resolve();
// });
// }
//
// const observeHandle = await Collection.find().observeChanges({
// changed(id, fields) {
// let expectedValue;
//
// if (id === "a") {
// expectedValue = "updated1";
// } else if (id === "b") {
// expectedValue = "updated2";
// }
//
// test.equal(fields.field, expectedValue);
// changeCount += 1;
//
// if (changeCount === 2) {
// finalize();
// }
// }
// });
//
// const timeout = Meteor.setTimeout(() => {
// test.fail("Didn't receive all transaction operations in two seconds.");
// finalize();
// }, 2000);
//
// const session = client.startSession();
// await session.withTransaction(session => {
// let promise = Promise.resolve();
// ["a", "b"].forEach((id, index) => {
// promise = promise.then(() => rawCollection.updateMany(
// { _id: id },
// { $set: { field: `updated${index + 1}` } },
// { session }
// ));
// });
// return promise;
// }).finally(() => {
// session.endSession();
// });
// }).then(() => onComplete());
// });
// }
if (Meteor.isServer) {
Tinytest.addAsync("mongo-livedata - transaction", async function (test) {
const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
const Collection = new Mongo.Collection(`transaction_test_${test.runId()}`);
const rawCollection = Collection.rawCollection();
await Collection.insert({ _id: "a" });
await Collection.insert({ _id: "b" });
let changeCount = 0;
return new Promise(async resolve => {
async function finalize() {
await observeHandle.stop();
Meteor.clearTimeout(timeout);
resolve();
}
const observeHandle = await Collection.find().observeChanges({
changed(id, fields) {
let expectedValue;
if (id === "a") {
expectedValue = "updated1";
} else if (id === "b") {
expectedValue = "updated2";
}
test.equal(fields.field, expectedValue);
changeCount += 1;
if (changeCount === 2) {
finalize();
}
}
});
const timeout = Meteor.setTimeout(() => {
test.fail("Didn't receive all transaction operations in two seconds.");
finalize();
}, 2000);
const session = client.startSession();
session.withTransaction(session => {
let promise = Promise.resolve();
["a", "b"].forEach((id, index) => {
promise = promise.then(() => rawCollection.updateMany(
{ _id: id },
{ $set: { field: `updated${index + 1}` } },
{ session }
));
});
return promise;
}).finally(() => {
session.endSession();
});
});
});
}

View File

@@ -15,48 +15,49 @@ _.each ([{added: 'added', forceOrdered: true},
Tinytest.addAsync("observeChanges - single id - basics " + added
+ (forceOrdered ? " force ordered" : ""),
async function (test, onComplete) {
var c = makeCollection();
var counter = 0;
var callbacks = [added, "changed", "removed"];
if (forceOrdered)
callbacks.push("movedBefore");
await withCallbackLogger(test,
callbacks,
Meteor.isServer,
async function (logger) {
var barid = await c.insert({thing: "stuff"});
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
var c = makeCollection();
var counter = 0;
var callbacks = [added, "changed", "removed"];
if (forceOrdered)
callbacks.push("movedBefore");
await withCallbackLogger(test,
callbacks,
Meteor.isServer,
async function (logger) {
var barid = await c.insert({thing: "stuff"});
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
var handle = await c.find(fooid).observeChanges(logger);
if (added === 'added') {
logger.expectResult(added, [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
} else {
logger.expectResult(added,
[fooid, {noodles: "good", bacon: "bad", apples: "ok"}, null]);
}
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
logger.expectResult("changed",
[fooid, {noodles: "alright", potatoes: "tasty", bacon: undefined}]);
var handle = await c.find(fooid).observeChanges(logger);
if (added === 'added') {
await logger.expectResult(added, [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
} else {
await logger.expectResult(added,
[fooid, {noodles: "good", bacon: "bad", apples: "ok"}, null]);
}
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
await logger.expectResult("changed",
[fooid, {noodles: "alright", potatoes: "tasty", bacon: undefined}]);
await c.remove(fooid);
logger.expectResult("removed", [fooid]);
await c.remove(fooid);
await logger.expectResult("removed", [fooid]);
await logger.expectNoResult(async () => {
await c.remove(barid);
await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
await logger.expectNoResult(async () => {
await c.remove(barid);
await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
});
await handle.stop();
const badCursor = c.find({}, {fields: {noodles: 1, _id: false}});
await test.throwsAsync(function () {
return badCursor.observeChanges(logger);
});
});
await handle.stop();
const badCursor = c.find({}, {fields: {noodles: 1, _id: false}});
await test.throwsAsync(function () {
return badCursor.observeChanges(logger);
});
});
onComplete();
});
});
Tinytest.addAsync("observeChanges - callback isolation", async function (test) {
Tinytest.addAsync("observeChanges - callback isolation", async function (test, onComplete) {
var c = makeCollection();
await withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var handles = [];
@@ -73,26 +74,29 @@ Tinytest.addAsync("observeChanges - callback isolation", async function (test) {
}));
var fooid = await c.insert({apples: "ok"});
logger.expectResult("added", [fooid, {apples: "ok"}]);
await logger.expectResult("added", [fooid, {apples: "ok"}]);
await c.update(fooid, {apples: "not ok"});
logger.expectResult("changed", [fooid, {apples: "not ok"}]);
await logger.expectResult("changed", [fooid, {apples: "not ok"}]);
test.equal((await c.findOne(fooid)).apples, "not ok");
await Promise.all(handles.map(h => h.stop()));
//await Promise.all(handles.map(h => h.stop())).then(() => onComplete());
await _.each(handles, async function(handle) { await handle.stop(); });
onComplete();
});
});
Tinytest.addAsync("observeChanges - single id - initial adds", async function (test) {
Tinytest.addAsync("observeChanges - single id - initial adds", async function (test, onComplete) {
var c = makeCollection();
await withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
var handle = await c.find(fooid).observeChanges(logger);
logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await logger.expectNoResult();
await handle.stop();
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
var handle = await c.find(fooid).observeChanges(logger);
await logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await logger.expectNoResult();
await handle.stop();
});
onComplete();
});
@@ -103,7 +107,7 @@ Tinytest.addAsync("observeChanges - unordered - initial adds", async function (t
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
var barid = await c.insert({noodles: "good", bacon: "weird", apples: "ok"});
var handle = await c.find().observeChanges(logger);
logger.expectResultUnordered([
await logger.expectResultUnordered([
{callback: "added",
args: [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]},
{callback: "added",
@@ -114,31 +118,32 @@ Tinytest.addAsync("observeChanges - unordered - initial adds", async function (t
});
});
Tinytest.addAsync("observeChanges - unordered - basics", async function (test) {
Tinytest.addAsync("observeChanges - unordered - basics", async function (test, onComplete) {
var c = makeCollection();
await withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var handle = await c.find().observeChanges(logger);
var barid = await c.insert({thing: "stuff"});
logger.expectResultOnly("added", [barid, {thing: "stuff"}]);
withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var handle = await c.find().observeChanges(logger);
var barid = await c.insert({thing: "stuff"});
await logger.expectResultOnly("added", [barid, {thing: "stuff"}]);
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
logger.expectResultOnly("changed",
[fooid, {noodles: "alright", potatoes: "tasty", bacon: undefined}]);
await c.remove(fooid);
logger.expectResultOnly("removed", [fooid]);
await c.remove(barid);
logger.expectResultOnly("removed", [barid]);
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
await logger.expectResultOnly("changed",
[fooid, {noodles: "alright", potatoes: "tasty", bacon: undefined}]);
await c.remove(fooid);
await logger.expectResultOnly("removed", [fooid]);
await c.remove(barid);
await logger.expectResultOnly("removed", [barid]);
fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await logger.expectNoResult();
await handle.stop();
await logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await logger.expectNoResult();
await handle.stop();
onComplete();
});
});
@@ -148,48 +153,50 @@ if (Meteor.isServer) {
await withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var handle = await c.find({}, {fields:{noodles: 1, bacon: 1}}).observeChanges(logger);
var barid = await c.insert({thing: "stuff"});
logger.expectResultOnly("added", [barid, {}]);
await logger.expectResultOnly("added", [barid, {}]);
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
logger.expectResultOnly("changed",
await logger.expectResultOnly("changed",
[fooid, {noodles: "alright", bacon: undefined}]);
await c.update(fooid, {noodles: "alright", potatoes: "meh", apples: "ok"});
await c.remove(fooid);
logger.expectResultOnly("removed", [fooid]);
await logger.expectResultOnly("removed", [fooid]);
await c.remove(barid);
logger.expectResultOnly("removed", [barid]);
await logger.expectResultOnly("removed", [barid]);
fooid = await c.insert({noodles: "good", bacon: "bad"});
logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectNoResult();
await handle.stop();
});
onComplete();
});
Tinytest.addAsync("observeChanges - unordered - specific fields + selector on excluded fields", async function (test) {
Tinytest.addAsync("observeChanges - unordered - specific fields + selector on excluded fields", async function (test, onComplete) {
var c = makeCollection();
await withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var handle = await c.find({ mac: 1, cheese: 2 },
{fields:{noodles: 1, bacon: 1, eggs: 1}}).observeChanges(logger);
var barid = await c.insert({thing: "stuff", mac: 1, cheese: 2});
logger.expectResultOnly("added", [barid, {}]);
console.log('a1');
await logger.expectResultOnly("added", [barid, {}]);
console.log('a2');
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok", mac: 1, cheese: 2});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok", mac: 1, cheese: 2});
logger.expectResultOnly("changed",
await logger.expectResultOnly("changed",
[fooid, {noodles: "alright", bacon: undefined}]);
// Doesn't get update event, since modifies only hidden fields
await logger.expectNoResult(() =>
c.update(fooid, {
await logger.expectNoResult(async () =>
await c.update(fooid, {
noodles: "alright",
potatoes: "meh",
apples: "ok",
@@ -199,15 +206,17 @@ if (Meteor.isServer) {
);
await c.remove(fooid);
logger.expectResultOnly("removed", [fooid]);
await logger.expectResultOnly("removed", [fooid]);
await c.remove(barid);
logger.expectResultOnly("removed", [barid]);
await logger.expectResultOnly("removed", [barid]);
fooid = await c.insert({noodles: "good", bacon: "bad", mac: 1, cheese: 2});
logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectNoResult();
await handle.stop();
}).then(() => {
onComplete();
});
});
}
@@ -219,16 +228,16 @@ Tinytest.addAsync("observeChanges - unordered - specific fields + modify on excl
{fields:{noodles: 1, bacon: 1, eggs: 1}}).observeChanges(logger);
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok", mac: 1, cheese: 2});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
await logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
// Noodles go into shadow, mac appears as eggs
await c.update(fooid, {$rename: { noodles: 'shadow', apples: 'eggs' }});
logger.expectResultOnly("changed",
await logger.expectResultOnly("changed",
[fooid, {eggs:"ok", noodles: undefined}]);
await c.remove(fooid);
logger.expectResultOnly("removed", [fooid]);
await logger.expectResultOnly("removed", [fooid]);
await logger.expectNoResult();
await handle.stop();
});
@@ -243,11 +252,11 @@ Tinytest.addAsync(
async function (logger) {
var handle = await c.find({}, {fields: {'type.name': 1}}).observeChanges(logger);
var id = await c.insert({ type: { name: 'foobar' } });
logger.expectResultOnly('added', [id, { type: { name: 'foobar' } }]);
await logger.expectResultOnly('added', [id, { type: { name: 'foobar' } }]);
await c.update(id, { $unset: { type: 1 } });
test.equal(await c.find().fetch(), [{ _id: id }]);
logger.expectResultOnly('changed', [id, { type: undefined }]);
await logger.expectResultOnly('changed', [id, { type: undefined }]);
await handle.stop();
}
@@ -260,23 +269,23 @@ Tinytest.addAsync(
Tinytest.addAsync("observeChanges - unordered - enters and exits result set through change", async function (test) {
var c = makeCollection();
await withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, async function (logger) {
var handle = await c.find({noodles: "good"}).observeChanges(logger);
var barid = await c.insert({thing: "stuff"});
var handle = await c.find({noodles: "good"}).observeChanges(logger);
var barid = await c.insert({thing: "stuff"});
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
var fooid = await c.insert({noodles: "good", bacon: "bad", apples: "ok"});
await logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad", apples: "ok"}]);
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
logger.expectResultOnly("removed",
await c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok"});
await logger.expectResultOnly("removed",
[fooid]);
await c.remove(fooid);
await c.remove(barid);
await c.remove(fooid);
await c.remove(barid);
fooid = await c.insert({noodles: "ok", bacon: "bad", apples: "ok"});
await c.update(fooid, {noodles: "good", potatoes: "tasty", apples: "ok"});
logger.expectResult("added", [fooid, {noodles: "good", potatoes: "tasty", apples: "ok"}]);
await logger.expectNoResult();
await handle.stop();
fooid = await c.insert({noodles: "ok", bacon: "bad", apples: "ok"});
await c.update(fooid, {noodles: "good", potatoes: "tasty", apples: "ok"});
await logger.expectResult("added", [fooid, {noodles: "good", potatoes: "tasty", apples: "ok"}]);
await logger.expectNoResult();
await handle.stop();
});
});
@@ -353,14 +362,15 @@ if (Meteor.isServer) {
testAsyncMulti("observeChanges - bad query", [
async function (test, expect) {
var c = makeCollection();
var observeThrows = function () {
return test.throwsAsync(function () {
return c.find({__id: {$in: null}}).observeChanges({
added: function () {
var observeThrows = async function () {
await test.throwsAsync(async function () {
await c.find({__id: {$in: null}}).observeChanges({
added: function added() {
test.fail("added shouldn't be called");
}
});
}, '$in needs an array');
};
if (Meteor.isClient) {
@@ -370,25 +380,30 @@ testAsyncMulti("observeChanges - bad query", [
// Test that if two copies of the same bad observeChanges run in parallel
// and are de-duped, both observeChanges calls will throw.
await Promise.all(['ob1', 'ob2'].map(() => observeThrows()));
await Promise.all([observeThrows(), observeThrows()]).then(() => {
expect();
});
}
]);
if (Meteor.isServer) {
Tinytest.addAsync(
"observeChanges - EnvironmentVariable",
async function (test) {
function (test, onComplete) {
var c = makeCollection();
var environmentVariable = new Meteor.EnvironmentVariable;
await environmentVariable.withValue(true, async function() {
environmentVariable.withValue(true, async function() {
var handle = await c.find({}, { fields: { 'type.name': 1 }}).observeChanges({
added: function() {
added: async function() {
test.isTrue(environmentVariable.get());
handle.stop();
await handle.stop();
onComplete();
}
});
}).then(() => {
c.insert({ type: { name: 'foobar' } });
});
await c.insert({ type: { name: 'foobar' } });
}
);
}

View File

@@ -116,12 +116,10 @@ ObserveMultiplexer = class {
// observeChanges calls) to throw the error.
async queryError(err) {
var self = this;
await this._queue.runTask(async function () {
if (self._ready())
throw Error("can't claim query has an error after it worked!");
await self._stop({fromQueryError: true});
throw err;
});
if (self._ready())
throw Error("can't claim query has an error after it worked!");
await self._stop({fromQueryError: true});
throw err;
}
// Calls "cb" once the effects of all "ready", "addHandleAndSendInitialAdds"

View File

@@ -1,6 +1,6 @@
var OplogCollection = new Mongo.Collection("oplog-" + Random.id());
Tinytest.addAsync("mongo-livedata - oplog - cursorSupported", async function (test) {
Tinytest.addAsync("mongo-livedata - oplog - cursorSupported", async function (test, onComplete) {
var oplogEnabled =
!!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle;
@@ -56,6 +56,7 @@ Tinytest.addAsync("mongo-livedata - oplog - cursorSupported", async function (te
await supported(false, {}, { limit: 5 });
await supported(false, {}, { skip: 2, limit: 5 });
await supported(false, {}, { skip: 2 });
onComplete();
});
process.env.MONGO_OPLOG_URL && testAsyncMulti(

View File

@@ -85,14 +85,15 @@ _.extend(PollingObserveDriver.prototype, {
"mongo-livedata", "observe-drivers-polling", 1);
},
// This is always called through _.throttle (except once at startup).
_unthrottledEnsurePollIsScheduled: function () {
_unthrottledEnsurePollIsScheduled: async function () {
var self = this;
if (self._pollsScheduledButNotStarted > 0)
return;
++self._pollsScheduledButNotStarted;
self._taskQueue.queueTask(function () {
self._pollMongo();
});
//TODO check this change
//await self._taskQueue.queueTask(async function () {
await self._pollMongo();
//});
},
// test-only interface for controlling polling.
@@ -130,7 +131,7 @@ _.extend(PollingObserveDriver.prototype, {
});
},
async _pollMongo() {
async _pollMongo() {
var self = this;
--self._pollsScheduledButNotStarted;
@@ -162,11 +163,10 @@ _.extend(PollingObserveDriver.prototype, {
// successfully. Probably it's a bad selector or something, so we should
// NOT retry. Instead, we should halt the observe (which ends up calling
// `stop` on us).
self._multiplexer.queryError(
await self._multiplexer.queryError(
new Error(
"Exception while polling query " +
JSON.stringify(self._cursorDescription) + ": " + e.message));
return;
}
// getRawObjects can throw if we're having trouble talking to the

View File

@@ -48,7 +48,7 @@ CallbackLogger.prototype._yield = function (arg) {
CallbackLogger.prototype.expectResult = async function (callbackName, args) {
var self = this;
await self._waitForLengthOrTimeout(3);
await self._waitForLengthOrTimeout(10);
if (_.isEmpty(self._log)) {
self._test.fail(["Expected callback " + callbackName + " got none"]);
return;
@@ -58,9 +58,9 @@ CallbackLogger.prototype.expectResult = async function (callbackName, args) {
self._test.equal(result.args, args);
};
CallbackLogger.prototype.expectResultOnly = function (callbackName, args) {
CallbackLogger.prototype.expectResultOnly = async function (callbackName, args) {
var self = this;
self.expectResult(callbackName, args);
await self.expectResult(callbackName, args);
self._expectNoResultImpl();
}