Merge pull request #12593 from matheusccastroo/wrappers-for-async-migration

Wrappers to help in the async migration
This commit is contained in:
Gabriel Grubba
2023-04-20 11:55:21 -03:00
committed by GitHub
9 changed files with 136 additions and 32 deletions

View File

@@ -190,7 +190,7 @@ export class AccountsServer extends AccountsCommon {
throw new Error("Can only call onCreateUser once");
}
this._onCreateUserHook = func;
this._onCreateUserHook = Meteor.wrapFn(func);
}
/**
@@ -564,7 +564,7 @@ export class AccountsServer extends AccountsCommon {
this._loginHandlers.push({
name: name,
handler: handler
handler: Meteor.wrapFn(handler)
});
};

View File

@@ -49,6 +49,11 @@ export class Hook {
this.bindEnvironment = false;
}
this.wrapAsync = true;
if (options.wrapAsync === false) {
this.wrapAsync = false;
}
if (options.exceptionHandler) {
this.exceptionHandler = options.exceptionHandler;
} else if (options.debugPrintExceptions) {
@@ -73,6 +78,10 @@ export class Hook {
callback = dontBindEnvironment(callback, exceptionHandler);
}
if (this.wrapAsync) {
callback = Meteor.wrapFn(callback);
}
const id = this.nextCallbackId++;
this.callbacks[id] = callback;

View File

@@ -3,6 +3,7 @@
var Fiber = Npm.require('fibers');
var nextSlot = 0;
var callAsyncMethodRunning = false;
Meteor._nodeCodeMustBeInFiber = function () {
if (!Fiber.current) {
@@ -75,7 +76,7 @@ EVp.withValue = function (value, func) {
var saved = currentValues[this.slot];
try {
currentValues[this.slot] = value;
return func();
return Meteor.wrapFn(func)();
} finally {
currentValues[this.slot] = saved;
}
@@ -106,6 +107,15 @@ EVp._setNewContextAndGetCurrent = function (value) {
return saved;
};
EVp._isCallAsyncMethodRunning = function () {
return callAsyncMethodRunning;
};
EVp._setCallAsyncMethodRunning = function (value) {
callAsyncMethodRunning = value;
};
// Meteor application code is always supposed to be run inside a
// fiber. bindEnvironment ensures that the function it wraps is run from
// inside a fiber and ensures it sees the values of Meteor environment

View File

@@ -163,6 +163,25 @@ Meteor.wrapAsync = function (fn, context) {
};
};
Meteor.wrapFn = function (fn) {
if (!fn || typeof fn !== 'function') {
throw new Meteor.Error("Expected to receive function to wrap");
}
if (Meteor.isClient) {
return fn;
}
return function() {
var ret = fn.apply(this, arguments);
if (ret && typeof ret.then === 'function') {
return Promise.await(ret);
}
return ret;
}
};
// Sets child's prototype to a new object whose prototype is parent's
// prototype. Used as:
// Meteor._inherits(ClassB, ClassA).

View File

@@ -1,4 +1,5 @@
Meteor.startup = function startup(callback) {
callback = Meteor.wrapFn(callback);
if (process.env.METEOR_PROFILE) {
// Create a temporary error to capture the current stack trace.
var error = new Error("Meteor.startup");

View File

@@ -834,19 +834,35 @@ MongoConnection.prototype.find = function (collectionName, selector, options) {
self, new CursorDescription(collectionName, selector, options));
};
MongoConnection.prototype.findOne = function (collection_name, selector,
options) {
MongoConnection.prototype.findOneAsync = async function (collection_name, selector,
options) {
var self = this;
if (arguments.length === 1)
selector = {};
options = options || {};
options.limit = 1;
return (await self.find(collection_name, selector, options).fetchAsync())[0];
};
MongoConnection.prototype.findOne = function (collection_name, selector,
options) {
var self = this;
// [FIBERS]
// TODO: Remove this when 3.0 is released.
warnUsingOldApi("findOne");
options = options || {};
options.limit = 1;
return self.find(collection_name, selector, options).fetch()[0];
return Future.fromPromise(self.findOneAsync(collection_name, selector, options)).wait();
};
MongoConnection.prototype.createIndexAsync = function (collectionName, index,
options) {
var self = this;
// We expect this function to be called at startup, not from within a method,
// so we don't interact with the write fence.
var collection = self.rawCollection(collectionName);
return collection.createIndex(index, options);
};
// We'll actually design an index API later. For now, we just pass through to
@@ -854,17 +870,11 @@ MongoConnection.prototype.findOne = function (collection_name, selector,
MongoConnection.prototype.createIndex = function (collectionName, index,
options) {
var self = this;
// [FIBERS]
// TODO: Remove this when 3.0 is released.
warnUsingOldApi("createIndex");
// We expect this function to be called at startup, not from within a method,
// so we don't interact with the write fence.
var collection = self.rawCollection(collectionName);
var future = new Future;
var indexName = collection.createIndex(index, options, future.resolver());
future.wait();
return Future.fromPromise(self.createIndexAsync(collectionName, index, options));
};
MongoConnection.prototype.countDocuments = function (collectionName, ...args) {
@@ -1209,6 +1219,7 @@ _.extend(SynchronousCursor.prototype, {
forEach: function (callback, thisArg) {
var self = this;
const wrappedFn = Meteor.wrapFn(callback);
// Get back to the beginning.
self._rewind();
@@ -1220,16 +1231,17 @@ _.extend(SynchronousCursor.prototype, {
while (true) {
var doc = self._nextObject();
if (!doc) return;
callback.call(thisArg, doc, index++, self._selfForIteration);
wrappedFn.call(thisArg, doc, index++, self._selfForIteration);
}
},
// XXX Allow overlapping callback executions if callback yields.
map: function (callback, thisArg) {
var self = this;
const wrappedFn = Meteor.wrapFn(callback);
var res = [];
self.forEach(function (doc, index) {
res.push(callback.call(thisArg, doc, index, self._selfForIteration));
res.push(wrappedFn.call(thisArg, doc, index, self._selfForIteration));
});
return res;
},

View File

@@ -1,3 +1,8 @@
import {
ASYNC_COLLECTION_METHODS,
getAsyncMethodName
} from "meteor/minimongo/constants";
MongoInternals.RemoteCollectionDriver = function (
mongo_url, options) {
var self = this;
@@ -28,6 +33,16 @@ Object.assign(MongoInternals.RemoteCollectionDriver.prototype, {
REMOTE_COLLECTION_METHODS.forEach(
function (m) {
ret[m] = _.bind(self.mongo[m], self.mongo, name);
if (!ASYNC_COLLECTION_METHODS.includes(m)) return;
const asyncMethodName = getAsyncMethodName(m);
ret[asyncMethodName] = function (...args) {
try {
return Promise.resolve(ret[m](...args));
} catch (error) {
return Promise.reject(error);
}
}
});
return ret;
}

View File

@@ -2040,11 +2040,6 @@ class JsImage {
assetPath = files.convertToStandardPath(assetPath);
var promise;
if (! callback) {
if (! Fiber.current) {
throw new Error("The synchronous Assets API can " +
"only be called from within a Fiber.");
}
promise = new Promise(function (resolve, reject) {
callback = function (err, res) {
err ? reject(err) : resolve(res);
@@ -2069,7 +2064,7 @@ class JsImage {
}
if (promise) {
return promise.await();
return promise;
}
};
@@ -2226,7 +2221,20 @@ class JsImage {
* @param {Function} [asyncCallback] Optional callback, which is called asynchronously with the error or result after the function is complete. If not provided, the function runs synchronously.
*/
getText: function (assetPath, callback) {
return getAsset(item.assets, assetPath, "utf8", callback);
const result = getAsset(item.assets, assetPath, "utf8", callback);
if (!callback) {
if (!Fiber.current) {
throw new Error("The synchronous Assets API can " +
"only be called from within a Fiber.");
}
return Promise.await(result);
}
},
getTextAsync: function (assetPath) {
return getAsset(item.assets, assetPath, "utf8");
},
/**
@@ -2237,7 +2245,20 @@ class JsImage {
* @param {Function} [asyncCallback] Optional callback, which is called asynchronously with the error or result after the function is complete. If not provided, the function runs synchronously.
*/
getBinary: function (assetPath, callback) {
return getAsset(item.assets, assetPath, undefined, callback);
const result = getAsset(item.assets, assetPath, undefined, callback);
if (!callback) {
if (!Fiber.current) {
throw new Error("The synchronous Assets API can " +
"only be called from within a Fiber.");
}
return Promise.await(result);
}
},
getBinaryAsync: function (assetPath) {
return getAsset(item.assets, assetPath, undefined);
}
}
}, bindings || {});

View File

@@ -315,10 +315,14 @@ var loadServerBundles = Profile("Load server bundles", function () {
};
var getAsset = function (assetPath, encoding, callback) {
var fut;
var promiseResolver, promise;
if (! callback) {
fut = new Future();
callback = fut.resolver();
promise = new Promise((resolve, reject) => {
promiseResolver = function (error, result) {
error ? reject(error) : resolve(result);
}
});
callback = promiseResolver;
}
// This assumes that we've already loaded the meteor package, so meteor
// itself can't call Assets.get*. (We could change this function so that
@@ -347,16 +351,29 @@ var loadServerBundles = Profile("Load server bundles", function () {
var filePath = path.join(serverDir, fileInfo.assets[assetPath]);
fs.readFile(files.convertToOSPath(filePath), encoding, _callback);
}
if (fut)
return fut.wait();
if (promise)
return promise;
};
var Assets = {
getText: function (assetPath, callback) {
return getAsset(assetPath, "utf8", callback);
const result = getAsset(assetPath, "utf8", callback);
if (!callback) {
return Future.fromPromise(result).wait();
}
},
getTextAsync: function (assetPath) {
return getAsset(assetPath, "utf8");
},
getBinary: function (assetPath, callback) {
return getAsset(assetPath, undefined, callback);
const result = getAsset(assetPath, undefined, callback);
if (!callback) {
return Future.fromPromise(result).wait();
}
},
getBinaryAsync: function (assetPath) {
return getAsset(assetPath, undefined);
},
/**
* @summary Get the absolute path to the static server asset. Note that assets are read-only.