Remove callback and transform mongodb methods in async

This commit is contained in:
Edimar Cardoso
2022-12-22 00:11:17 -03:00
parent 25ab772ceb
commit ddbaf0c3cc
7 changed files with 119 additions and 172 deletions

View File

@@ -345,11 +345,6 @@ export default class LocalCollection {
// XXX atomicity: if multi is true, and one modification fails, do
// we rollback the whole operation, or what?
async update(selector, mod, options) {
// if (! callback && options instanceof Function) {
// callback = options;
// options = null;
// }
if (!options) {
options = {};
}
@@ -486,17 +481,11 @@ export default class LocalCollection {
// A convenience wrapper on update. LocalCollection.upsert(sel, mod) is
// equivalent to LocalCollection.update(sel, mod, {upsert: true,
// _returnObject: true}).
upsert(selector, mod, options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
async upsert(selector, mod, options) {
return this.update(
selector,
mod,
Object.assign({}, options, {upsert: true, _returnObject: true}),
callback
Object.assign({}, options, {upsert: true, _returnObject: true})
);
}
@@ -739,9 +728,9 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver {
DiffSequence.applyChanges(doc, fields);
};
this.applyChange.removed = id => {
this.applyChange.removed = async id => {
if (callbacks.removed) {
callbacks.removed.call(this, id);
await callbacks.removed.call(this, id);
}
this.docs.remove(id);
@@ -1472,7 +1461,7 @@ LocalCollection._observeFromObserveChangesNoFibers = async (cursor, observeCallb
},
removed(id) {
if (observeCallbacks.removed) {
observeCallbacks.removed(transform(this.docs.get(id)));
return observeCallbacks.removed(transform(this.docs.get(id)));
}
},
};

View File

@@ -684,31 +684,17 @@ Object.assign(Mongo.Collection.prototype, {
* @memberof Mongo.Collection
* @instance
* @param {MongoSelector} selector Specifies which documents to remove
* @param {Function} [callback] Optional. If present, called with an error object as its argument.
*/
remove(selector, callback) {
async remove(selector) {
selector = Mongo.Collection._rewriteSelector(selector);
const wrappedCallback = wrapCallback(callback);
if (this._isRemoteCollection()) {
return this._callMutatorMethodAsync('remove', [selector]);
}
// it's my collection. descend into the collection1 object
// and propagate any exception.
try {
// If the user provided a callback and the collection implements this
// operation asynchronously, then queryRet will be undefined, and the
// result will be returned through the callback instead.
return this._collection.remove(selector, wrappedCallback);
} catch (e) {
if (callback) {
callback(e);
return null;
}
throw e;
}
return this._collection.remove(selector);
},
// Determine if this collection is simply a minimongo representation of a real
@@ -728,14 +714,8 @@ Object.assign(Mongo.Collection.prototype, {
* @param {MongoModifier} modifier Specifies how to modify the documents
* @param {Object} [options]
* @param {Boolean} options.multi True to modify all matching documents; false to only modify one of the matching documents (the default).
* @param {Function} [callback] Optional. If present, called with an error object as the first argument and, if no error, the number of affected documents as the second.
*/
upsert(selector, modifier, options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
async upsert(selector, modifier, options) {
return this.update(
selector,
modifier,
@@ -743,9 +723,7 @@ Object.assign(Mongo.Collection.prototype, {
...options,
_returnObject: true,
upsert: true,
},
callback
);
});
},
// We'll actually design an index API later. For now, we just pass through to

View File

@@ -14,15 +14,14 @@ testAsyncMulti("mongo-livedata - doc fetcher", [
// Test basic operation.
const fakeOp1 = {};
const fakeOp2 = {};
fetcher.fetch(collName, id1, fakeOp1, expect(null, {_id: id1, x: 1}));
fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null));
await fetcher.fetch(collName, id1, fakeOp1).then(() => expect(null, {_id: id1, x: 1}));
await fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null));
var fetched = false;
var fakeOp3 = {};
var expected = {_id: id2, y: 2};
fetcher.fetch(collName, id2, fakeOp3, expect(function (e, d) {
await fetcher.fetch(collName, id2, fakeOp3).then(d => expect(function () {
fetched = true;
test.isFalse(e);
test.equal(d, expected);
}));
// The fetcher yields.
@@ -31,8 +30,7 @@ testAsyncMulti("mongo-livedata - doc fetcher", [
// Now ask for another document with the same op reference. Because a
// fetch for that op is in flight, we will get the other fetch's
// document, not this random document.
fetcher.fetch(collName, Random.id(), fakeOp3, expect(function (e, d) {
test.isFalse(e);
await fetcher.fetch(collName, Random.id(), fakeOp3).then(d => expect(function () {
test.equal(d, expected);
}));
}

View File

@@ -527,23 +527,15 @@ MongoConnection.prototype._update = async function (collection_name, selector, m
// - The id is defined by query or mod we can just add it to the replacement doc
// - The user did not specify any id preference and the id is a Mongo ObjectId,
// then we can just let Mongo generate the id
simulateUpsertWithInsertedId(
collection, mongoSelector, mongoMod, options,
// This callback does not need to be bindEnvironment'ed because
// simulateUpsertWithInsertedId() wraps it and then passes it through
// bindEnvironmentForWrite.
function (error, result) {
// If we got here via a upsert() call, then options._returnObject will
// be set and we should return the whole object. Otherwise, we should
// just return the number of affected docs to match the mongo API.
if (result && ! options._returnObject) {
callback(error, result.numberAffected);
} else {
callback(error, result);
}
}
);
return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options)
.then(result => {
refresh();
if (result && ! options._returnObject) {
return result.numberAffected;
} else {
return result;
}
});
} else {
if (options.upsert && !knownId && options.insertedId && isModify) {
if (!mongoMod.hasOwnProperty('$setOnInsert')) {

View File

@@ -83,26 +83,13 @@ var compareResults = function (test, skipIds, actual, expected) {
test.equal(actual, expected);
};
var upsert = async function (coll, useUpdate, query, mod, options, callback) {
if (! callback && typeof options === "function") {
callback = options;
options = {};
}
var upsert = async function (coll, useUpdate, query, mod, options) {
if (!useUpdate) {
return await coll.upsert(query, mod, options, callback);
return await coll.upsert(query, mod, options);
}
if (callback) {
await coll.update(query, mod,
_.extend({ upsert: true }, options))
.then(result => {
callback(null, {numberAffected: result});
});
}
return await Promise.resolve(coll.update(query, mod,
_.extend({ upsert: true }, options))).then(r => ({numberAffected: r}));
return coll.update(query, mod, _.extend({ upsert: true }, options))
.then(r => ({numberAffected: r}));
};
var upsertTestMethod = "livedata_upsert_test_method";
@@ -1465,13 +1452,12 @@ _.each( ['STRING'], function(idGeneration) {
var self = this;
self.coll = new Mongo.Collection(self.collectionName, self.collectionOptions);
var obs;
var expectAdd = function (doc) {
var expectAdd = expect(function (doc) {
test.equal(doc.seconds(), 50);
};
var expectRemove = async function (doc) {
});
var expectRemove = expect(function (doc) {
test.equal(doc.seconds(), 50);
await obs.stop();
};
});
const id = await runAndThrowIfNeeded(() => self.coll.insert({d: new Date(1356152390004)}), test, false);
test.isTrue(id);
var cursor = self.coll.find();
@@ -1479,15 +1465,15 @@ _.each( ['STRING'], function(idGeneration) {
added: expectAdd,
removed: expectRemove
});
// test.equal(await cursor.count(), 1);
// test.equal((await cursor.fetch())[0].seconds(), 50);
// test.equal((await self.coll.findOne()).seconds(), 50);
// test.equal((await self.coll.findOne({}, {transform: null})).seconds, undefined);
// test.equal((await self.coll.findOne({}, {
// transform: function (doc) {return {seconds: doc.d.getSeconds()};}
// })).seconds, 50);
test.equal(await cursor.count(), 1);
test.equal((await cursor.fetch())[0].seconds(), 50);
test.equal((await self.coll.findOne()).seconds(), 50);
test.equal((await self.coll.findOne({}, {transform: null})).seconds, undefined);
test.equal((await self.coll.findOne({}, {
transform: function (doc) {return {seconds: doc.d.getSeconds()};}
})).seconds, 50);
await self.coll.remove(id);
expect();
obs.stop();
},
async function (test) {
var self = this;
@@ -1518,7 +1504,7 @@ _.each( ['STRING'], function(idGeneration) {
Meteor.subscribe('c-' + this.collectionName, expect());
}
},
async function (test) {
async function (test, expect) {
var self = this;
self.coll = new Mongo.Collection(this.collectionName, collectionOptions);
const id = await runAndThrowIfNeeded(() => self.coll.insert({}), test);
@@ -1545,7 +1531,7 @@ _.each( ['STRING'], function(idGeneration) {
await Meteor.callAsync('createInsecureCollection', this.collectionName, collectionOptions);
Meteor.subscribe('c-' + this.collectionName, expect());
}
}, async function (test) {
}, async function (test, expect) {
const coll = new Mongo.Collection(this.collectionName, collectionOptions);
const id = await runAndThrowIfNeeded(() => coll.insert({b: bin}), test);
test.isTrue(id);
@@ -1566,7 +1552,7 @@ _.each( ['STRING'], function(idGeneration) {
}
},
async function (test) {
async function (test, expect) {
var self = this;
self.coll = new Mongo.Collection(this.collectionName, collectionOptions);
var docId;
@@ -2263,7 +2249,7 @@ testAsyncMulti('mongo-livedata - specified _id', [
await Meteor.callAsync('createInsecureCollection', this.collectionName);
Meteor.subscribe('c-' + this.collectionName, expect());
}
}, async function (test) {
}, async function (test, expect) {
var coll = new Mongo.Collection(this.collectionName);
const id1 = await runAndThrowIfNeeded(() => coll.insert({ _id: "foo", name: "foo" }), test);
test.equal(id1, "foo");
@@ -2276,6 +2262,7 @@ testAsyncMulti('mongo-livedata - specified _id', [
console.log({id1, id2}, await coll.find({}).fetch());
const doc2 = await coll.findOne();
test.equal(doc2.name, "foo");
expect();
}
]);
@@ -2477,6 +2464,7 @@ testAsyncMulti('mongo-livedata - empty string _id', [
var self = this;
var docs = await self.coll.find().fetch();
test.equal(docs, [{_id: "realid", f: "bar"}]);
expect();
},
async function (test, expect) {
var self = this;
@@ -2484,6 +2472,7 @@ testAsyncMulti('mongo-livedata - empty string _id', [
await self.coll._collection.insert({_id: "", f: "baz"});
test.equal((await self.coll.find().fetch()).length, 2);
}
expect();
}
]);
@@ -3149,6 +3138,7 @@ testAsyncMulti("mongo-livedata - undefined find options", [
skip: undefined
});
test.equal(result, self.doc);
expect();
}
]);
@@ -3197,6 +3187,7 @@ Meteor.isServer && testAsyncMulti("mongo-livedata - observe limit bug", [
return self.coll.remove({toDelete: true});
});
test.equal(_.keys(state), [self.id1]);
expect();
}
]);
@@ -3218,6 +3209,7 @@ Meteor.isServer && testAsyncMulti("mongo-livedata - update with replace forbidde
return c.update(id, { foo3: "bar3", $set: { blah: 1 } });
}, "cannot have both modifier and non-modifier fields");
test.equal(await c.findOne(id), { _id: id, foo2: "bar2" });
expect();
}
]);

View File

@@ -127,9 +127,9 @@ ObserveMultiplexer = class {
// Calls "cb" once the effects of all "ready", "addHandleAndSendInitialAdds"
// and observe callbacks which came before this call have been propagated to
// all handles. "ready" must have already been called on this multiplexer.
onFlush(cb) {
async onFlush(cb) {
var self = this;
return this._queue.queueTask(async function () {
return await this._queue.queueTask(async function () {
if (!self._ready())
throw Error("only call onFlush on a multiplexer that will be ready");
await cb();

View File

@@ -480,78 +480,76 @@ _.extend(OplogObserveDriver.prototype, {
},
_fetchModifiedDocuments: function () {
var self = this;
Meteor._noYieldsAllowed(function () {
self._registerPhaseChange(PHASE.FETCHING);
// Defer, because nothing called from the oplog entry handler may yield,
// but fetch() yields.
Meteor.defer(finishIfNeedToPollQuery(async function () {
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase === PHASE.QUERYING) {
// While fetching, we decided to go into QUERYING mode, and then we
// saw another oplog entry, so _needToFetch is not empty. But we
// shouldn't fetch these documents until AFTER the query is done.
break;
}
// Being in steady phase here would be surprising.
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
self._currentlyFetching = self._needToFetch;
var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
let promiseResolver = null;
const awaitablePromise = new Promise(r => promiseResolver = r);
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
self._currentlyFetching.forEach(function (op, id) {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, op,
finishIfNeedToPollQuery(function (err, doc) {
try {
if (err) {
Meteor._debug("Got exception while fetching documents",
err);
// If we get an error from the fetcher (eg, trouble
// connecting to Mongo), let's just abandon the fetch phase
// altogether and fall back to polling. It's not like we're
// getting live updates anyway.
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
}
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0)
promiseResolver();
}
}));
});
await awaitablePromise;
// Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
self._registerPhaseChange(PHASE.FETCHING);
// Defer, because nothing called from the oplog entry handler may yield,
// but fetch() yields.
Meteor.defer(finishIfNeedToPollQuery(async function () {
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase === PHASE.QUERYING) {
// While fetching, we decided to go into QUERYING mode, and then we
// saw another oplog entry, so _needToFetch is not empty. But we
// shouldn't fetch these documents until AFTER the query is done.
break;
}
// We're done fetching, so we can be steady, unless we've had a
// _pollQuery call (here or in another fiber).
if (self._phase !== PHASE.QUERYING)
await self._beSteady();
}));
});
// Being in steady phase here would be surprising.
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
self._currentlyFetching = self._needToFetch;
var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
let promiseResolver = null;
const awaitablePromise = new Promise(r => promiseResolver = r);
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
self._currentlyFetching.forEach(function (op, id) {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, op,
finishIfNeedToPollQuery(function (err, doc) {
try {
if (err) {
Meteor._debug("Got exception while fetching documents",
err);
// If we get an error from the fetcher (eg, trouble
// connecting to Mongo), let's just abandon the fetch phase
// altogether and fall back to polling. It's not like we're
// getting live updates anyway.
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
}
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0)
promiseResolver();
}
}));
});
await awaitablePromise;
// Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
}
// We're done fetching, so we can be steady, unless we've had a
// _pollQuery call (here or in another fiber).
if (self._phase !== PHASE.QUERYING)
await self._beSteady();
}));
},
_beSteady: async function () {
var self = this;