diff --git a/README.md b/README.md index 2cb5d2ae..2644549d 100644 --- a/README.md +++ b/README.md @@ -251,5 +251,35 @@ This architecture will make it very easy for the community to create others pack #### Unit of work -TODO +The work coordinator, responsible for keeping track of which resolvers are being resolved. +The number of parallel resolutions may be limited and configured. + +------------ + +#### Constructor + +UnitOfWork(options) + +Options: + +- `maxConcurrent`: maximum number of concurrent resolvers running (defaults to 5) + +------------ + +Public methods. + +`UnitOfWork#enqueue(resolver)`: Promise + +Enqueues a resolver to be ran. +The promise is fulfilled when the resolver successfully resolved or is rejected if it failed to resolve. + +`UnitOfWork#has(resolver)`: Boolean + +Checks if a resolver is already in the unit of work. + +`UnitOfWork#abort()`: Promise + +Aborts the current work being done. +Clears the queue. Please note that resolvers that are already being resolved can't be aborted. +Returns a promise that is fulfilled when the current resolvers being resolved complete the resolve process. diff --git a/lib/resolve/UnitOfWork.js b/lib/resolve/UnitOfWork.js index cf90354d..f4526f9b 100644 --- a/lib/resolve/UnitOfWork.js +++ b/lib/resolve/UnitOfWork.js @@ -2,197 +2,138 @@ var Q = require('q'); var util = require('util'); var mout = require('mout'); var events = require('events'); -var createError = require('../util/createError'); var UnitOfWork = function (options) { // Ensure options defaults this._options = mout.object.mixIn({ - failFast: true, maxConcurrent: 5 }, options); // Parse some of the options - this._options.failFast = !!this._options.failFast; this._options.maxConcurrent = this._options.maxConcurrent > 0 ? this._options.maxConcurrent : 0; // Initialize some needed properties this._queue = []; this._beingResolved = []; this._beingResolvedEndpoints = {}; - this._resolved = {}; - this._failed = {}; - this._completed = {}; }; util.inherits(UnitOfWork, events.EventEmitter); // ----------------- -UnitOfWork.prototype.enqueue = function (pkg) { - var deferred = Q.defer(), - index; +UnitOfWork.prototype.enqueue = function (resolver) { + var deferred; - // Throw it if already queued - index = this._indexOf(this._queue, pkg); - if (index !== -1) { - throw new Error('Package is already queued'); + if (this.has(resolver)) { + throw new Error('Attempting to enqueue an already enqueued resolver'); } - // Throw if already resolving - index = this._indexOf(this._beingResolved, pkg); - if (index !== -1) { - throw new Error('Package is already being resolved'); - } + deferred = Q.defer(); // Add to the queue this._queue.push({ - pkg: pkg, + resolver: resolver, deferred: deferred }); - this.emit('enqueue', pkg); // Process the queue shortly later so that handlers can be attached to the returned promise - Q.fcall(this._processQueue.bind(this)); + Q.fcall(this.doWork.bind(this)); return deferred.promise; }; -UnitOfWork.prototype.dequeue = function (pkg) { +UnitOfWork.prototype.has = function (resolver) { var index; - // Throw if the package is already is being resolved - index = this._indexOf(this._beingResolved, pkg); + // Check in the queue + index = this._indexOf(this._queue, resolver); if (index !== -1) { - throw new Error('Package is already being resolved'); + return true; } - // Attempt to remove from the queue - index = this._indexOf(this._queue, pkg); + // Check in the being resolved list + index = this._indexOf(this._beingResolved, resolver); if (index !== -1) { - this._queue.splice(index, 1); - this.emit('dequeue', pkg); + return true; } - return this; + return false; }; -UnitOfWork.prototype.getResolved = function (name) { - return name ? this._resolved[name] || [] : this._resolved; -}; +UnitOfWork.prototype.abort = function () { + var promises, + emptyFunc = function () {}; -UnitOfWork.prototype.getFailed = function (name) { - return name ? this._failed[name] || [] : this._failed; + // Empty queue + this._queue = []; + + // Wait for pending resolvers to resolve + promises = this._beingResolved.map(function (entry) { + // Please note that the promise resolution/fail is silenced + return entry.deferred.promise.then(emptyFunc, emptyFunc); + }); + + return Q.all(promises); }; // ----------------- -UnitOfWork.prototype._processQueue = function () { - // If marked to fail all, reject everything - if (this._failAll) { - return this._rejectAll(); - } - +UnitOfWork.prototype._doWork = function () { // Check if the number of allowed packages being resolved reached the maximum if (this._options.maxConcurrent && this._beingResolved.length >= this._options.maxConcurrent) { return; } - // Find candidates for the free spots - var freeSpots = this._options.maxConcurrent ? this._options.maxConcurrent - this._beingResolved.length : -1, - endpoint, - duplicate, + // Find candidates for the free slots + var freeSlots = this._options.maxConcurrent ? this._options.maxConcurrent - this._beingResolved.length : -1, entry, + resolver, x; - for (x = 0; x < this._queue.length && freeSpots; ++x) { + for (x = 0; x < this._queue.length && freeSlots; ++x) { entry = this._queue[x]; - endpoint = entry.pkg.getEndpoint(); + resolver = entry.resolver; - // Skip if there is a package being resolved with the same endpoint - if (this._beingResolvedEndpoints[endpoint]) { - continue; - } - - // Remove from the queue + // Remove from the queue and this._queue.splice(x--, 1); - this.emit('dequeue', entry.pkg); - // Check if the exact same package has been resolved (same endpoint and range) - // If so, we reject the promise with an appropriate error - duplicate = this._findDuplicate(entry.pkg); - if (duplicate) { - entry.deferred.reject(createError('Package with same endpoint and range was already resolved', 'EDUPL', { pkg: duplicate })); - continue; - } - - // Package is ok to resolve // Put it in the being resolved list this._beingResolved.push(entry); - this._beingResolvedEndpoints[endpoint] = true; + freeSlots--; - // Decrement the free spots available - freeSpots--; - - // Resolve the promise to let the package know that it can proceed - this.emit('before_resolve', entry.pkg); - entry.deferred.resolve(this._onPackageDone.bind(this, entry.pkg)); + // Resolve it, waiting for it to be done + this.emit('pre_resolve', resolver); + resolver.resolve().then(this._onResolveSuccess.bind(this, entry), this._onResolveFailure.bind(this, entry)); } }; -UnitOfWork.prototype._rejectAll = function () { - var error, - queue; - - // Reset the queue and being resolved list - queue = this._queue; - this._queue = []; - this._beingResolved = []; - this._beingResolvedEndpoints = {}; - - // Reject every deferred - error = createError('Package rejected to be resolved', 'EFFAST'); - queue.forEach(function (entry) { - entry.deferred.reject(error); - }); -}; - -UnitOfWork.prototype._onPackageDone = function (pkg, err) { - var pkgName = pkg.getName(), - pkgEndpoint = pkg.getEndpoint(), - arr, - index; - - // Ignore if already completed - if (this._completed[pkgEndpoint] && this._completed[pkgEndpoint].indexOf(pkg) !== -1) { - return; - } - - // Add it as completed - arr = this._completed[pkgEndpoint] = this._completed[pkgEndpoint] || []; - arr.push(pkg); +UnitOfWork.prototype._onResolveSuccess = function (entry, result) { + var index; // Remove the package from the being resolved list - index = this._indexOf(this._beingResolved, pkg); + index = this._beingResolved.indexOf(entry); this._beingResolved.splice(index, 1); - delete this._beingResolvedEndpoints[pkg.getEndpoint()]; - // If called with no error then add it as resolved - if (!err) { - arr = this._resolved[pkgName] = this._resolved[pkgName] || []; - arr.push(pkg); - this.emit('resolve', pkg); - // Otherwise, it failed to resolve so we mark it as failed - } else { - arr = this._failed[pkgName] = this._failed[pkgName] || []; - arr.push(pkg); - this.emit('failed', pkg); + entry.deferred.resolve(result); + this.emit('post_resolve', entry.resolver, result); - // If fail fast is enabled, make every other package in the queue to fail - this._failAll = this._options.failFast; - } + // A free spot became available, so let's do some more work + this._doWork(); +}; - // Call process queue in order to allow packages to take over the free spots in the queue - this._processQueue(); +UnitOfWork.prototype._onResolveFailure = function (entry, err) { + var index; + + // Remove the package from the being resolved list + index = this._beingResolved.indexOf(entry); + this._beingResolved.splice(index, 1); + + entry.deferred.reject(err); + this.emit('fail', entry.resolver, err); + + // A free spot became available, so let's do some more work + this._doWork(); }; UnitOfWork.prototype._indexOf = function (arr, pkg) { @@ -201,16 +142,4 @@ UnitOfWork.prototype._indexOf = function (arr, pkg) { }); }; -UnitOfWork.prototype._findDuplicate = function (pkg) { - var arr = this._completed[pkg.getEndpoint()]; - - if (!arr) { - return null; - } - - return mout.array.find(arr, function (item) { - return item.getRange() === pkg.getRange(); - }); -}; - module.exports = UnitOfWork; \ No newline at end of file