Refectory mongo packages

- Using isomorphy async methods with the suffix Async;
- Changing tests to use async methods on client and server
- Fix tests
This commit is contained in:
Edimar Cardoso
2023-01-25 16:52:07 -03:00
parent 1cdaf40aa5
commit 565d2755aa
21 changed files with 99 additions and 96 deletions

View File

@@ -261,7 +261,7 @@ CollectionPrototype._validatedInsertAsync = function (userId, doc,
if (generatedId !== null)
doc._id = generatedId;
self._collection.insertAsync.call(self._collection, doc);
return self._collection.insertAsync.call(self._collection, doc);
};
// Simulate a mongo `update` operation while validating that the access
@@ -426,7 +426,7 @@ CollectionPrototype._callMutatorMethodAsync = async function _callMutatorMethod(
}
const mutatorMethodName = this._prefix + name;
return await this._connection.applyAsync(mutatorMethodName, args, { returnStubValue: true , throwStubExceptions: true });
return await this._connection.applyAsync(mutatorMethodName, args, { returnStubValue: true , isFromCallAsync: true });
}
function transformDoc(validator, doc) {

View File

@@ -45,10 +45,10 @@ const clientVersions =
Autoupdate._clientVersions = // Used by a self-test and hot-module-replacement
new ClientVersions();
Meteor.connection.registerStore(
(async () => await Meteor.connection.registerStore(
"meteor_autoupdate_clientVersions",
clientVersions.createStore()
);
));
Autoupdate.newClientAvailable = function () {
return clientVersions.newClientAvailable(

View File

@@ -13,10 +13,10 @@ const clientVersions = new ClientVersions();
// Used by hot-module-replacement
Autoupdate._clientVersions = clientVersions;
Meteor.connection.registerStore(
(async () => await Meteor.connection.registerStore(
"meteor_autoupdate_clientVersions",
clientVersions.createStore()
);
));
Autoupdate.newClientAvailable = function () {
return clientVersions.newClientAvailable(

View File

@@ -282,7 +282,7 @@ export class Connection {
// 'name' is the name of the data on the wire that should go in the
// store. 'wrappedStore' should be an object with methods beginUpdate, update,
// endUpdate, saveOriginals, retrieveOriginals. see Collection for an example.
registerStore(name, wrappedStore) {
async registerStore(name, wrappedStore) {
const self = this;
if (name in self._stores) return false;
@@ -310,11 +310,11 @@ export class Connection {
const queued = self._updatesForUnknownStores[name];
if (Array.isArray(queued)) {
store.beginUpdate(queued.length, false);
await store.beginUpdate(queued.length, false);
queued.forEach(msg => {
store.update(msg);
});
store.endUpdate();
await store.endUpdate();
delete self._updatesForUnknownStores[name];
}
@@ -646,7 +646,7 @@ export class Connection {
isFromCallAsync: stubOptions.isFromCallAsync,
})
) {
this._saveOriginals();
await this._saveOriginals();
}
try {
/*
@@ -915,9 +915,9 @@ export class Connection {
// Before calling a method stub, prepare all stores to track changes and allow
// _retrieveAndStoreOriginals to get the original versions of changed
// documents.
_saveOriginals() {
async _saveOriginals() {
if (! this._waitingForQuiescence()) {
this._flushBufferedWrites();
await this._flushBufferedWrites();
}
Object.values(this._stores).forEach((store) => {
@@ -1201,7 +1201,7 @@ export class Connection {
}
}
_livedata_data(msg) {
async _livedata_data(msg) {
const self = this;
if (self._waitingForQuiescence()) {
@@ -1254,7 +1254,7 @@ export class Connection {
msg.msg === "removed";
if (self._bufferedWritesInterval === 0 || ! standardWrite) {
self._flushBufferedWrites();
await self._flushBufferedWrites();
return;
}
@@ -1262,7 +1262,7 @@ export class Connection {
self._bufferedWritesFlushAt =
new Date().valueOf() + self._bufferedWritesMaxAge;
} else if (self._bufferedWritesFlushAt < new Date().valueOf()) {
self._flushBufferedWrites();
await self._flushBufferedWrites();
return;
}
@@ -1275,7 +1275,7 @@ export class Connection {
);
}
_flushBufferedWrites() {
async _flushBufferedWrites() {
const self = this;
if (self._bufferedWritesFlushHandle) {
clearTimeout(self._bufferedWritesFlushHandle);
@@ -1288,32 +1288,31 @@ export class Connection {
// will exit cleanly.
const writes = self._bufferedWrites;
self._bufferedWrites = Object.create(null);
self._performWrites(writes);
await self._performWrites(writes);
}
_performWrites(updates) {
async _performWrites(updates) {
const self = this;
if (self._resetStores || ! isEmpty(updates)) {
// Begin a transactional update of each store.
Object.entries(self._stores).forEach(([storeName, store]) => {
store.beginUpdate(
for (const [storeName, store] of Object.entries(self._stores) ) {
await store.beginUpdate(
hasOwn.call(updates, storeName)
? updates[storeName].length
: 0,
self._resetStores
);
});
}
self._resetStores = false;
Object.entries(updates).forEach(([storeName, updateMessages]) => {
for (const [storeName, updateMessages] of Object.entries(updates) ) {
const store = self._stores[storeName];
if (store) {
updateMessages.forEach(updateMessage => {
store.update(updateMessage);
});
for (const updateMessage of updateMessages) {
await store.update(updateMessage);
}
} else {
// Nobody's listening for this data. Queue it up until
// someone wants it.
@@ -1328,12 +1327,12 @@ export class Connection {
updates[storeName].push(...updateMessages);
}
});
}
// End update transaction.
Object.values(self._stores).forEach((store) => {
store.endUpdate();
});
for (const store of Object.values(self._stores) ) {
await store.endUpdate();
};
}
self._runAfterUpdateCallbacks();
@@ -1544,12 +1543,12 @@ export class Connection {
}
}
_livedata_nosub(msg) {
async _livedata_nosub(msg) {
const self = this;
// First pass it through _livedata_data, which only uses it to help get
// towards quiescence.
self._livedata_data(msg);
await self._livedata_data(msg);
// Do the rest of our processing immediately, with no
// buffering-until-quiescence.
@@ -1587,14 +1586,14 @@ export class Connection {
}
}
_livedata_result(msg) {
async _livedata_result(msg) {
// id, result or error. error has error (code), reason, details
const self = this;
// Lets make sure there are no buffered writes before returning result.
if (! isEmpty(self._bufferedWrites)) {
self._flushBufferedWrites();
await self._flushBufferedWrites();
}
// find the outstanding request
@@ -1735,7 +1734,7 @@ export class Connection {
}
}
onMessage(raw_msg) {
async onMessage(raw_msg) {
let msg;
try {
msg = DDPCommon.parseDDP(raw_msg);
@@ -1780,11 +1779,11 @@ export class Connection {
} else if (
['added', 'changed', 'removed', 'ready', 'updated'].includes(msg.msg)
) {
this._livedata_data(msg);
await this._livedata_data(msg);
} else if (msg.msg === 'nosub') {
this._livedata_nosub(msg);
await this._livedata_nosub(msg);
} else if (msg.msg === 'result') {
this._livedata_result(msg);
await this._livedata_result(msg);
} else if (msg.msg === 'error') {
this._livedata_error(msg);
} else {

View File

@@ -67,6 +67,8 @@ DDPCommon.MethodInvocation = class MethodInvocation {
// This is set by RandomStream.get; and holds the random stream state
this.randomStream = null;
this.fence = options.fence;
}
/**

View File

@@ -49,6 +49,14 @@ var SessionDocumentView = function () {
DDPServer._SessionDocumentView = SessionDocumentView;
DDPServer._getCurrentFence = function () {
let currentInvocation = this._CurrentWriteFence.get();
if (currentInvocation) {
return currentInvocation;
}
currentInvocation = DDP._CurrentPublicationInvocation.get();
return currentInvocation ? currentInvocation.fence : undefined;
};
_.extend(SessionDocumentView.prototype, {
@@ -682,7 +690,7 @@ Object.assign(Session.prototype, {
self._stopSubscription(msg.id);
},
method: function (msg, unblock) {
method: async function (msg, unblock) {
var self = this;
// Reject malformed messages.
@@ -709,8 +717,7 @@ Object.assign(Session.prototype, {
// example, because the method waits for them) their
// writes will be included in the fence.
fence.retire();
self.send({
msg: 'updated', methods: [msg.id]});
self.send({msg: 'updated', methods: [msg.id]});
});
// Find the handler
@@ -733,7 +740,8 @@ Object.assign(Session.prototype, {
setUserId: setUserId,
unblock: unblock,
connection: self.connectionHandle,
randomSeed: randomSeed
randomSeed: randomSeed,
fence,
});
const promise = new Promise((resolve, reject) => {
@@ -777,7 +785,6 @@ Object.assign(Session.prototype, {
keyName: 'getCurrentMethodInvocationResult',
}
);
resolve(
DDPServer._CurrentWriteFence.withValue(
fence,
@@ -790,8 +797,8 @@ Object.assign(Session.prototype, {
);
});
function finish() {
fence.arm();
async function finish() {
await fence.arm();
unblock();
}
@@ -799,15 +806,14 @@ Object.assign(Session.prototype, {
msg: "result",
id: msg.id
};
promise.then(result => {
finish();
await promise.then(async result => {
await finish();
if (result !== undefined) {
payload.result = result;
}
self.send(payload);
}, (exception) => {
finish();
}, async (exception) => {
await finish();
payload.error = wrapInternalException(
exception,
`while invoking method '${msg.method}'`
@@ -1791,7 +1797,7 @@ Object.assign(Server.prototype, {
applyAsync: function (name, args, options) {
// Run the handler
var handler = this.method_handlers[name];
console.log({name});
if (! handler) {
return Promise.reject(
new Meteor.Error(404, `Method '${name}' not found`)

View File

@@ -34,16 +34,16 @@ DDPServer._WriteFence = class {
await this._maybeFire();
};
const self = this;
return {
committed: Meteor._isFibersEnabled ? () => Promise.await(_committedFn.apply(self)) : _committedFn,
committed: _committedFn,
};
}
// Arm the fence. Once the fence is armed, and there are no more
// uncommitted writes, it will activate.
arm() {
if (this === DDPServer._CurrentWriteFence.get())
if (this === DDPServer._getCurrentFence())
throw Error("Can't arm the current fence");
this.armed = true;
return Meteor._isFibersEnabled ? Promise.await(this._maybeFire()) : this._maybeFire();
@@ -101,8 +101,10 @@ DDPServer._WriteFence = class {
if (!this.outstanding_writes) {
this.fired = true;
while (this.completion_callbacks.length > 0) {
const cb = this.completion_callbacks.shift();
const callbacks = this.completion_callbacks || [];
this.completion_callbacks = [];
while (callbacks.length > 0) {
const cb = callbacks.shift();
await invokeCallback(cb);
}
}

View File

@@ -92,7 +92,7 @@ SQp.flush = function () {
self.runTask(function () {});
};
SQp.drain = async function () {
SQp.drain = function () {
var self = this;
if (!self.safeToRunTask()) {
return;

View File

@@ -150,22 +150,18 @@ export default class LocalCollection {
}
}
});
queriesToRecompute.forEach(qid => {
if (this.queries[qid]) {
this._recomputeResults(this.queries[qid]);
}
});
// TODO -> Check here.
Promise.resolve(this._observeQueue.drain()).then(() => {
// Defer because the caller likely doesn't expect the callback to be run
// immediately.
if (callback) {
(async () => callback(null, id))();
}
});
this._observeQueue.drain();
// Defer because the caller likely doesn't expect the callback to be run
// immediately.
if (callback) {
(async () => callback(null, id))();
}
return id;
}
@@ -282,7 +278,7 @@ export default class LocalCollection {
// notifications to bring them to the current state of the
// database. Note that this is not just replaying all the changes that
// happened during the pause, it is a smarter 'coalesced' diff.
resumeObservers() {
async resumeObservers() {
// No-op if not paused.
if (!this.paused) {
return;
@@ -316,7 +312,7 @@ export default class LocalCollection {
query.resultsSnapshot = null;
});
this._observeQueue.drain();
await this._observeQueue.drain();
}
retrieveOriginals() {

View File

@@ -3360,7 +3360,7 @@ Tinytest.add('minimongo - objectid', test => {
test.equal(randomOid, new MongoID.ObjectID(randomOid.valueOf()));
});
Tinytest.add('minimongo - pause', test => {
Tinytest.addAsync('minimongo - pause', async test => {
const operations = [];
const cbs = log_callbacks(operations);
@@ -3378,7 +3378,7 @@ Tinytest.add('minimongo - pause', test => {
c.insert({_id: 1, a: 1});
test.length(operations, 0);
c.resumeObservers();
await c.resumeObservers();
test.length(operations, 0);

View File

@@ -152,7 +152,7 @@ Mongo.Collection = function Collection(name, options) {
};
Object.assign(Mongo.Collection.prototype, {
_maybeSetUpReplication(name, { _suppressSameNameError = false }) {
async _maybeSetUpReplication(name, { _suppressSameNameError = false }) {
const self = this;
if (!(self._connection && self._connection.registerStore)) {
return;
@@ -161,7 +161,7 @@ Object.assign(Mongo.Collection.prototype, {
// OK, we're going to be a slave, replicating some remote
// database, except possibly with some temporary divergence while
// we have unacknowledged RPC's.
const ok = self._connection.registerStore(name, {
const ok = await self._connection.registerStore(name, {
// Called at the beginning of a batch of updates. batchSize is the number
// of update calls to expect.
//
@@ -172,7 +172,7 @@ Object.assign(Mongo.Collection.prototype, {
// message, and then we can either directly apply it at endUpdate time if
// it was the only update, or do pauseObservers/apply/apply at the next
// updateAsync() if there's another one.
beginUpdate(batchSize, reset) {
async beginUpdate(batchSize, reset) {
// pause observers so users don't see flicker when updating several
// objects at once (including the post-reconnect reset-and-reapply
// stage), and so that a re-sorting of a query can take advantage of the
@@ -180,7 +180,7 @@ Object.assign(Mongo.Collection.prototype, {
// time.
if (batchSize > 1 || reset) self._collection.pauseObservers();
if (reset) self._collection.removeAsync({});
if (reset) await self._collection.removeAsync({});
},
// Apply an update.
@@ -275,8 +275,8 @@ Object.assign(Mongo.Collection.prototype, {
},
// Called at the end of a batch of updates.
endUpdate() {
self._collection.resumeObservers();
async endUpdate() {
await self._collection.resumeObservers();
},
// Called around method stub invocations to capture the original versions

View File

@@ -40,7 +40,6 @@ export class DocFetcher {
collectionName, {_id: id}) || null;
// Return doc to all relevant callbacks. Note that this array can
// continue to grow during callback excecution.
console.log({doc});
emitter.emit('data', doc);
} catch (e) {
emitter.emit('error', e);

View File

@@ -19,6 +19,7 @@ import {
ASYNC_CURSOR_METHODS,
getAsyncMethodName
} from "meteor/minimongo/constants";
import { Meteor } from "meteor/meteor";
MongoInternals = {};
@@ -262,7 +263,7 @@ MongoConnection.prototype.createCappedCollectionAsync = async function (
// after the observer notifiers have added themselves to the write
// fence), you should call 'committed()' on the object returned.
MongoConnection.prototype._maybeBeginWrite = function () {
var fence = DDPServer._CurrentWriteFence.get();
const fence = DDPServer._getCurrentFence();
if (fence) {
return fence.beginWrite();
} else {

View File

@@ -1540,7 +1540,7 @@ _.each( ['STRING'], function(idGeneration) {
async function (test, expect) {
this.collectionName = Random.id();
if (Meteor.isClient) {
await Meteor.call('createInsecureCollection', this.collectionName, collectionOptions);
await Meteor.callAsync('createInsecureCollection', this.collectionName, collectionOptions);
Meteor.subscribe('c-' + this.collectionName, expect());
}
},
@@ -1556,6 +1556,7 @@ _.each( ['STRING'], function(idGeneration) {
test.isTrue(id);
docId = id;
self.docId = docId;
var cursor = self.coll.find();
test.equal(await cursor.count(), 1);
var inColl = await self.coll.findOneAsync();
@@ -1567,15 +1568,15 @@ _.each( ['STRING'], function(idGeneration) {
async function (test, expect) {
var self = this;
await self.coll.insertAsync(new Dog("rover", "orange")).catch(expect(function (err) {
test.isTrue(err);
test.isTrue(err);
}));
},
async function (test) {
async function (test, expect) {
var self = this;
await self.coll.updateAsync(self.docId, new Dog("rover", "orange")).catch(err => {
await self.coll.updateAsync(self.docId, new Dog("rover", "orange")).catch(expect(function(err) {
test.isTrue(err);
});
}));
}
]);
@@ -2107,6 +2108,7 @@ if (Meteor.isServer) {
let err;
try {
await Meteor.callAsync(upsertTestMethod, run, useUpdate, collectionOptions);
console.log('xxxx');
} catch (e) {
err = e;
}
@@ -3281,10 +3283,10 @@ if (Meteor.isClient) {
var futuresByNonce = {};
Meteor.methods({
fenceOnBeforeFireError1: function (nonce) {
fenceOnBeforeFireError1: async function (nonce) {
let resolve;
futuresByNonce[nonce] = new Promise(r => resolve = r);
var observe = fenceOnBeforeFireErrorCollection.find({nonce: nonce})
var observe = await fenceOnBeforeFireErrorCollection.find({nonce: nonce})
.observeChanges({added: function (){}});
Meteor.setTimeout(async function () {
await fenceOnBeforeFireErrorCollection.insertAsync({nonce: nonce})

View File

@@ -17,7 +17,6 @@ _.each ([
Tinytest.addAsync("observeChanges - single id - basics " + added
+ (forceOrdered ? " force ordered" : ""),
async function (test, onComplete) {
console.log({added});
var c = makeCollection();
var counter = 0;
var callbacks = [added, "changed", "removed"];

View File

@@ -95,7 +95,7 @@ ObserveMultiplexer = class {
// adds have been processed. Does not block.
async ready() {
const self = this;
await this._queue.runTask(function () {
this._queue.queueTask(function () {
if (self._ready())
throw Error("can't make ObserveMultiplex ready twice!");
@@ -129,7 +129,7 @@ ObserveMultiplexer = class {
// all handles. "ready" must have already been called on this multiplexer.
async onFlush(cb) {
var self = this;
await this._queue.runTask(async function () {
await this._queue.queueTask(async function () {
if (!self._ready())
throw Error("only call onFlush on a multiplexer that will be ready");
await cb();
@@ -146,7 +146,7 @@ ObserveMultiplexer = class {
}
async _applyCallback(callbackName, args) {
const self = this;
await this._queue.runTask(async function () {
await this._queue.queueTask(async function () {
// If we stopped in the meantime, do nothing.
if (!self._handles)
return;

View File

@@ -140,7 +140,7 @@ OplogObserveDriver = function (options) {
self._stopHandles.push(listenAll(
self._cursorDescription, function () {
// If we're not in a pre-fire write fence, we don't have to do anything.
var fence = DDPServer._CurrentWriteFence.get();
var fence = DDPServer._getCurrentFence();
if (!fence || fence.fired)
return;

View File

@@ -3,7 +3,6 @@ var OplogCollection = new Mongo.Collection("oplog-" + Random.id());
Tinytest.addAsync("mongo-livedata - oplog - cursorSupported", async function (test, onComplete) {
var oplogEnabled =
!!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle;
console.log({OplogCollection});
// var supported = async function (expected, selector, options) {
// var cursor = OplogCollection.find(selector, options);
// var handle = await cursor.observeChanges({

View File

@@ -42,7 +42,7 @@ PollingObserveDriver = function (options) {
// When someone does a transaction that might affect us, schedule a poll
// of the database. If that transaction happens inside of a write fence,
// block the fence until we've polled and notified observers.
var fence = DDPServer._CurrentWriteFence.get();
var fence = DDPServer._getCurrentFence();
if (fence)
self._pendingWrites.push(fence.beginWrite());
// Ensure a poll is scheduled... but if we already know that one is,

View File

@@ -263,12 +263,10 @@ export class TestCaseResults {
const predicate = this._guessPredicate(expected);
try {
console.log('XXXXXXXXXXXXX', message);
await f();
} catch (exception) {
actual = exception;
}
console.log('XXXXXXXXXXXXX', message);
this._assertActual(actual, predicate, message);
}

View File

@@ -895,7 +895,7 @@ exports.loggedInUsername = function () {
return loggedIn(data) ? currentUsername(data) : false;
};
exports.getAccountsConfiguration = function (conn) {
exports.getAccountsConfiguration = async function (conn) {
// Subscribe to the package server's service configurations so that we
// can get the OAuth client ID to kick off the OAuth flow.
var accountsConfiguration = null;