Fix Mongo Async tests

This commit is contained in:
Edimar Cardoso
2022-12-19 14:17:18 -03:00
parent 0779997245
commit 6587c38710
6 changed files with 129 additions and 110 deletions

View File

@@ -682,7 +682,7 @@ export class Connection {
options = Object.create(null);
}
options = options || Object.create(null);
console.log({name, callback, options});
if (callback) {
// XXX would it be better form to do the binding in stream.on,
// or caller, instead of here?

View File

@@ -1791,6 +1791,7 @@ Object.assign(Server.prototype, {
applyAsync: function (name, args, options) {
// Run the handler
var handler = this.method_handlers[name];
console.log({name});
if (! handler) {
return Promise.reject(
new Meteor.Error(404, `Method '${name}' not found`)

View File

@@ -219,6 +219,7 @@ Object.assign(Mongo.Collection.prototype, {
// Is this a "replace the whole doc" message coming from the quiescence
// of method writes to an object? (Note that 'undefined' is a valid
// value meaning "remove it".)
console.log({msg});
if (msg.msg === 'replace') {
var replace = msg.replace;
if (!replace) {
@@ -575,7 +576,7 @@ Object.assign(Mongo.Collection.prototype, {
} else {
// If we don't have the callback, we assume the user is using the promise.
// We can't just pass this._collection.insert to the promisify because it would lose the context.
result = Meteor.promisify((cb) => this._collection.insert(doc, cb))();
result = this._collection.insert(doc);
}
return chooseReturnValueFromCollectionResult(result);
@@ -615,7 +616,7 @@ Object.assign(Mongo.Collection.prototype, {
* @param {Array} options.arrayFilters Optional. Used in combination with MongoDB [filtered positional operator](https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/) to specify which elements to modify in an array field.
* @param {Function} [callback] Optional. If present, called with an error object as the first argument and, if no error, the number of affected documents as the second.
*/
update(selector, modifier, ...optionsAndCallback) {
async update(selector, modifier, ...optionsAndCallback) {
const callback = popCallbackFromArgs(optionsAndCallback);
// We've already popped off the callback, so we are left with an array
@@ -654,23 +655,26 @@ Object.assign(Mongo.Collection.prototype, {
// it's my collection. descend into the collection object
// and propagate any exception.
try {
// If the user provided a callback and the collection implements this
// operation asynchronously, then queryRet will be undefined, and the
// result will be returned through the callback instead.
//console.log({callback, options, selector, modifier, coll: this._collection});
return this._collection.update(
selector,
modifier,
options,
wrappedCallback
);
} catch (e) {
if (callback) {
callback(e);
return null;
}
throw e;
}
options
).then(result => {
if (callback) {
callback(null, result);
}
return result;
}).catch (e => {
if (callback) {
callback(e);
return null;
}
throw e;
});
},
/**
@@ -906,6 +910,6 @@ function popCallbackFromArgs(args) {
ASYNC_COLLECTION_METHODS.forEach(methodName => {
const methodNameAsync = getAsyncMethodName(methodName);
Mongo.Collection.prototype[methodNameAsync] = function(...args) {
return Promise.resolve(this[methodName](...args));
return this[methodName](...args);
};
});

View File

@@ -324,7 +324,7 @@ var bindEnvironmentForWrite = function (callback) {
return Meteor.bindEnvironment(callback, "Mongo write");
};
MongoConnection.prototype._insert = function (collection_name, document,
MongoConnection.prototype._insert = async function (collection_name, document,
callback) {
var self = this;
@@ -353,22 +353,20 @@ MongoConnection.prototype._insert = function (collection_name, document,
Meteor.refresh({collection: collection_name, id: document._id });
};
callback = bindEnvironmentForWrite(writeCallback(write, refresh, callback));
try {
var collection = self.rawCollection(collection_name);
collection.insertOne(
replaceTypes(document, replaceMeteorAtomWithMongo),
{
safe: true,
}
).then(({insertedId}) => {
callback(null, insertedId);
}).catch((e) => {
callback(e, null);
});
} catch (err) {
var collection = self.rawCollection(collection_name);
return collection.insertOne(
replaceTypes(document, replaceMeteorAtomWithMongo),
{
safe: true,
}
).then(({insertedId}) => {
callback(null, insertedId);
return insertedId;
}).catch((e) => {
callback(e, null);
write.committed();
throw err;
}
throw e;
});
};
// Cause queries that may be affected by the selector to poll in this write
@@ -389,7 +387,7 @@ MongoConnection.prototype._refresh = function (collectionName, selector) {
}
};
MongoConnection.prototype._remove = function (collection_name, selector,
MongoConnection.prototype._remove = async function (collection_name, selector,
callback) {
var self = this;
@@ -407,9 +405,20 @@ MongoConnection.prototype._remove = function (collection_name, selector,
var refresh = function () {
self._refresh(collection_name, selector);
};
callback = bindEnvironmentForWrite(writeCallback(write, refresh, callback));
try {
return new Promise((resolve, reject) => {
if (!callback) {
callback = (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
};
}
callback = bindEnvironmentForWrite(writeCallback(write, refresh, callback));
var collection = self.rawCollection(collection_name);
collection
.deleteMany(replaceTypes(selector, replaceMeteorAtomWithMongo), {
@@ -418,12 +427,10 @@ MongoConnection.prototype._remove = function (collection_name, selector,
.then(({ deletedCount }) => {
callback(null, transformResult({ result : {modifiedCount : deletedCount} }).numberAffected);
}).catch((err) => {
write.committed();
callback(err);
});
} catch (err) {
write.committed();
throw err;
}
});
};
MongoConnection.prototype._dropCollection = function (collectionName, cb) {
@@ -464,7 +471,7 @@ MongoConnection.prototype._dropDatabase = function (cb) {
}
};
MongoConnection.prototype._update = function (collection_name, selector, mod,
MongoConnection.prototype._update = async function (collection_name, selector, mod,
options, callback) {
var self = this;
@@ -517,8 +524,17 @@ MongoConnection.prototype._update = function (collection_name, selector, mod,
var refresh = function () {
self._refresh(collection_name, selector);
};
callback = writeCallback(write, refresh, callback);
try {
return new Promise((resolve, reject) => {
if (!callback) {
callback = (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
};
}
callback = writeCallback(write, refresh, callback);
var collection = self.rawCollection(collection_name);
var mongoOpts = {safe: true};
// Add support for filtered positional operator
@@ -565,7 +581,6 @@ MongoConnection.prototype._update = function (collection_name, selector, mod,
}
}
}
if (options.upsert &&
! isModify &&
! knownId &&
@@ -599,7 +614,6 @@ MongoConnection.prototype._update = function (collection_name, selector, mod,
}
);
} else {
if (options.upsert && !knownId && options.insertedId && isModify) {
if (!mongoMod.hasOwnProperty('$setOnInsert')) {
mongoMod.$setOnInsert = {};
@@ -620,6 +634,7 @@ MongoConnection.prototype._update = function (collection_name, selector, mod,
bindEnvironmentForWrite(function (err = null, result) {
if (! err) {
var meteorResult = transformResult({result});
console.log({meteorResult});
if (meteorResult && options._returnObject) {
// If this was an upsert() call, and we ended up
// inserting a new doc and we know its id, then
@@ -641,10 +656,10 @@ MongoConnection.prototype._update = function (collection_name, selector, mod,
}
}));
}
} catch (e) {
}).catch(e => {
write.committed();
throw e;
}
});
};
var transformResult = function (driverResult) {
@@ -784,6 +799,7 @@ var simulateUpsertWithInsertedId = function (collection, selector, mod,
_.each(["insert", "update", "remove", "dropCollection", "dropDatabase"], function (method) {
MongoConnection.prototype[method] = function (/* arguments */) {
var self = this;
//return self[`_${method}`](...arguments);
return Meteor.promisify(self[`_${method}`]).apply(self, arguments);
};
});
@@ -791,7 +807,7 @@ _.each(["insert", "update", "remove", "dropCollection", "dropDatabase"], functio
// XXX MongoConnection.upsert() does not return the id of the inserted document
// unless you set it explicitly in the selector or modifier (as a replacement
// doc).
MongoConnection.prototype.upsert = function (collectionName, selector, mod,
MongoConnection.prototype.upsert = async function (collectionName, selector, mod,
options, callback) {
var self = this;
if (typeof options === "function" && ! callback) {

View File

@@ -94,16 +94,14 @@ var upsert = async function (coll, useUpdate, query, mod, options, callback) {
}
if (callback) {
return await coll.update(query, mod,
_.extend({ upsert: true }, options),
function (err, result) {
callback(err, ! err && {
numberAffected: result
});
await coll.update(query, mod,
_.extend({ upsert: true }, options))
.then(result => {
callback(null, {numberAffected: result});
});
}
return Promise.resolve(coll.update(query, mod,
return await Promise.resolve(coll.update(query, mod,
_.extend({ upsert: true }, options))).then(r => ({numberAffected: r}));
};
@@ -1578,9 +1576,9 @@ _.each( ['STRING'], function(idGeneration) {
inColl && test.isNull(inColl.d.color);
},
function (test, expect) {
async function (test, expect) {
var self = this;
self.coll.insert(new Dog("rover", "orange")).then(id => {
await self.coll.insertAsync(new Dog("rover", "orange")).then(id => {
expect(function () {
test.isFalse(id);
});
@@ -1591,16 +1589,18 @@ _.each( ['STRING'], function(idGeneration) {
});
},
function (test, expect) {
async function (test, expect) {
var self = this;
self.coll.update(
await self.coll.update(
self.docId, new Dog("rover", "orange")).then(id => {
console.log(id);
console.log({id});
expect();
}).catch(err => {
expect(function () {
test.isTrue(err);
});
console.log({err});
test.isTrue(err);
expect();
});
//console.log({expect});
}
]);
@@ -1731,7 +1731,7 @@ _.each( ['STRING'], function(idGeneration) {
_.each(Meteor.isServer ? [true, false] : [true], function (minimongo) {
_.each([true, false], function (useUpdate) {
_.each([true, false], function (useDirectCollection) {
Tinytest.addAsync("mongo-livedata - " + (useUpdate ? "update " : "") + "upsert" + (minimongo ? " minimongo" : "") + (useDirectCollection ? " direct collection " : "") + ", " + idGeneration, async function (test) {
Tinytest.addAsync("mongo-livedata - " + (useUpdate ? "update " : "") + "upsert" + (minimongo ? " minimongo" : "") + (useDirectCollection ? " direct collection " : "") + ", " + idGeneration, async function (test, onComplete) {
var run = test.runId();
var options = collectionOptions;
// We don't get ids back when we use update() to upsert, or when we are
@@ -1938,11 +1938,11 @@ if (Meteor.isServer) {
test.equal(result1.insertedId, 'foo');
}
compareResults(test, useUpdate, await coll.find().fetch(), [{foo: 'bar', _id: 'foo'}]);
upsert(coll, useUpdate, {_id: 'foo'}, {foo: 'baz'}, next2);
await upsert(coll, useUpdate, {_id: 'foo'}, {foo: 'baz'}, next2);
};
if (! useNetwork) {
next0();
await next0();
}
var t1, t2, result2;
@@ -1959,7 +1959,7 @@ if (Meteor.isServer) {
t1 = new Mongo.ObjectID();
t2 = new Mongo.ObjectID();
upsert(coll, useUpdate, {_id: t1}, {_id: t1, foo: 'bar'}, next3);
await upsert(coll, useUpdate, {_id: t1}, {_id: t1, foo: 'bar'}, next3);
};
var result3;
@@ -1972,7 +1972,7 @@ if (Meteor.isServer) {
}
compareResults(test, useUpdate, await coll.find().fetch(), [{_id: t1, foo: 'bar'}]);
upsert(coll, useUpdate, {_id: t1}, {foo: t2}, next4);
await upsert(coll, useUpdate, {_id: t1}, {foo: t2}, next4);
};
var next4 = async function (err, result4) {
@@ -1984,7 +1984,7 @@ if (Meteor.isServer) {
await coll.remove({_id: t1});
// Test modification by upsert
upsert(coll, useUpdate, {_id: 'David'}, {$set: {foo: 1}}, next5);
await upsert(coll, useUpdate, {_id: 'David'}, {$set: {foo: 1}}, next5);
};
var result5;
@@ -2003,10 +2003,10 @@ if (Meteor.isServer) {
// The stub throws an exception about the invalid modifier, which
// livedata logs (so we suppress it).
Meteor._suppress_log(1);
upsert(coll, useUpdate, {_id: 'David'}, {$blah: {foo: 2}}, function (err) {
await upsert(coll, useUpdate, {_id: 'David'}, {$blah: {foo: 2}}, async function (err) {
if (! (Meteor.isClient && useDirectCollection))
test.isTrue(err);
upsert(coll, useUpdate, {_id: 'David'}, {$set: {foo: 2}}, next6);
await upsert(coll, useUpdate, {_id: 'David'}, {$set: {foo: 2}}, next6);
});
} else {
// XXX skip this test for now for LocalCollection; the fact that
@@ -2014,7 +2014,7 @@ if (Meteor.isServer) {
// Meteor.defer, which means the exception just gets
// logged. Something should be done about this at some point? Maybe
// LocalCollection callbacks don't really have to be deferred.
upsert(coll, useUpdate, {_id: 'David'}, {$set: {foo: 2}}, next6);
await upsert(coll, useUpdate, {_id: 'David'}, {$set: {foo: 2}}, next6);
}
};
@@ -2033,7 +2033,7 @@ if (Meteor.isServer) {
// multi update by upsert.
// We can't actually update multiple documents since we have to do it by
// id, but at least make sure the multi flag doesn't mess anything up.
upsert(coll, useUpdate, {_id: 'Emily'},
await upsert(coll, useUpdate, {_id: 'Emily'},
{$set: {bar: 7},
$setOnInsert: {name: 'Fred', foo: 2}},
{multi: true}, next7);
@@ -2049,7 +2049,7 @@ if (Meteor.isServer) {
{_id: 'Emily', foo: 2, bar: 7}]);
// insert by multi upsert
upsert(coll, useUpdate, {_id: 'Fred'},
await upsert(coll, useUpdate, {_id: 'Fred'},
{$set: {bar: 7},
$setOnInsert: {name: 'Fred', foo: 2}},
{multi: true}, next8);
@@ -2777,7 +2777,7 @@ if (Meteor.isServer) {
}
// This is a VERY white-box test.
Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - _disableOplog", async function (test) {
Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - _disableOplog", async function (test, onComplete) {
var collName = Random.id();
var coll = new Mongo.Collection(collName);
if (MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle) {
@@ -2790,6 +2790,7 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - _disableOplog", a
.observeChanges({added: function () {}});
test.isFalse(observeWithoutOplog._multiplexer._observeDriver._usesOplog);
await observeWithoutOplog.stop();
onComplete();
});
Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - include selector fields", async function (test) {

View File

@@ -88,7 +88,7 @@ _.extend(ExpectationManager.prototype, {
done: function () {
var self = this;
self.closed = true;
self._check_complete();
return self._check_complete();
},
cancel: function () {
@@ -104,7 +104,7 @@ _.extend(ExpectationManager.prototype, {
var self = this;
if (!self.outstanding && self.closed && !self.dead) {
self.dead = true;
self.onComplete();
return self.onComplete();
}
}
});
@@ -114,53 +114,52 @@ testAsyncMulti = function (name, funcs, { isOnly = false } = {}) {
var timeout = 180000;
const addFunction = isOnly ? Tinytest.onlyAsync : Tinytest.addAsync;
addFunction(name, function (test, onComplete) {
addFunction(name, async function (test, onComplete) {
var remaining = _.clone(funcs);
var context = {};
var i = 0;
var runNext = function () {
var runNext = async function () {
var func = remaining.shift();
if (!func) {
delete test.extraDetails.asyncBlock;
onComplete();
}
else {
var em = new ExpectationManager(test, function () {
Meteor.clearTimeout(timer);
runNext();
});
var em = new ExpectationManager(test, function () {
Meteor.clearTimeout(timer);
return runNext();
});
var timer = Meteor.setTimeout(function () {
if (em.cancel()) {
test.fail({type: "timeout", message: "Async batch timed out"});
onComplete();
}
return;
}, timeout);
var timer = Meteor.setTimeout(async function () {
if (em.cancel()) {
test.fail({type: "timeout", message: "Async batch timed out"});
onComplete();
}
return;
}, timeout);
test.extraDetails.asyncBlock = i++;
test.extraDetails.asyncBlock = i++;
new Promise(resolve => {
const result = func.apply(context, [test, _.bind(em.expect, em)]);
if (result && typeof result.then === "function") {
return result.then((r) => resolve(r))
}
return resolve(result);
}).then(() => {
em.done();
}, exception => {
if (em.cancel()) {
test.exception(exception);
// Because we called test.exception, we're not to call onComplete.
}
Meteor.clearTimeout(timer);
});
}
await new Promise(resolve => {
const result = func.apply(context, [test, _.bind(em.expect, em)]);
if (result && typeof result.then === "function") {
//console.log({result});
return result.then((r) => resolve(r));
}
return resolve(result);
}).then(() => {
return em.done();
}).catch(exception => {
if (em.cancel()) {
test.exception(exception);
// Because we called test.exception, we're not to call onComplete.
}
Meteor.clearTimeout(timer);
runNext();
});
};
runNext();
await runNext();
});
};
@@ -212,8 +211,6 @@ runAndThrowIfNeeded = async (fn, test, shouldErrorOut) => {
} catch (e) {
err = e;
}
test[shouldErrorOut ? "isTrue" : "isFalse"](err);
return result;
};