mirror of
https://github.com/bower/bower.git
synced 2026-02-12 06:55:04 -05:00
Updated UoW doc and implementation according to the final architecture.
This commit is contained in:
@@ -2,197 +2,138 @@ var Q = require('q');
|
||||
var util = require('util');
|
||||
var mout = require('mout');
|
||||
var events = require('events');
|
||||
var createError = require('../util/createError');
|
||||
|
||||
var UnitOfWork = function (options) {
|
||||
// Ensure options defaults
|
||||
this._options = mout.object.mixIn({
|
||||
failFast: true,
|
||||
maxConcurrent: 5
|
||||
}, options);
|
||||
|
||||
// Parse some of the options
|
||||
this._options.failFast = !!this._options.failFast;
|
||||
this._options.maxConcurrent = this._options.maxConcurrent > 0 ? this._options.maxConcurrent : 0;
|
||||
|
||||
// Initialize some needed properties
|
||||
this._queue = [];
|
||||
this._beingResolved = [];
|
||||
this._beingResolvedEndpoints = {};
|
||||
this._resolved = {};
|
||||
this._failed = {};
|
||||
this._completed = {};
|
||||
};
|
||||
|
||||
util.inherits(UnitOfWork, events.EventEmitter);
|
||||
|
||||
// -----------------
|
||||
|
||||
UnitOfWork.prototype.enqueue = function (pkg) {
|
||||
var deferred = Q.defer(),
|
||||
index;
|
||||
UnitOfWork.prototype.enqueue = function (resolver) {
|
||||
var deferred;
|
||||
|
||||
// Throw it if already queued
|
||||
index = this._indexOf(this._queue, pkg);
|
||||
if (index !== -1) {
|
||||
throw new Error('Package is already queued');
|
||||
if (this.has(resolver)) {
|
||||
throw new Error('Attempting to enqueue an already enqueued resolver');
|
||||
}
|
||||
|
||||
// Throw if already resolving
|
||||
index = this._indexOf(this._beingResolved, pkg);
|
||||
if (index !== -1) {
|
||||
throw new Error('Package is already being resolved');
|
||||
}
|
||||
deferred = Q.defer();
|
||||
|
||||
// Add to the queue
|
||||
this._queue.push({
|
||||
pkg: pkg,
|
||||
resolver: resolver,
|
||||
deferred: deferred
|
||||
});
|
||||
this.emit('enqueue', pkg);
|
||||
|
||||
// Process the queue shortly later so that handlers can be attached to the returned promise
|
||||
Q.fcall(this._processQueue.bind(this));
|
||||
Q.fcall(this.doWork.bind(this));
|
||||
|
||||
return deferred.promise;
|
||||
};
|
||||
|
||||
UnitOfWork.prototype.dequeue = function (pkg) {
|
||||
UnitOfWork.prototype.has = function (resolver) {
|
||||
var index;
|
||||
|
||||
// Throw if the package is already is being resolved
|
||||
index = this._indexOf(this._beingResolved, pkg);
|
||||
// Check in the queue
|
||||
index = this._indexOf(this._queue, resolver);
|
||||
if (index !== -1) {
|
||||
throw new Error('Package is already being resolved');
|
||||
return true;
|
||||
}
|
||||
|
||||
// Attempt to remove from the queue
|
||||
index = this._indexOf(this._queue, pkg);
|
||||
// Check in the being resolved list
|
||||
index = this._indexOf(this._beingResolved, resolver);
|
||||
if (index !== -1) {
|
||||
this._queue.splice(index, 1);
|
||||
this.emit('dequeue', pkg);
|
||||
return true;
|
||||
}
|
||||
|
||||
return this;
|
||||
return false;
|
||||
};
|
||||
|
||||
UnitOfWork.prototype.getResolved = function (name) {
|
||||
return name ? this._resolved[name] || [] : this._resolved;
|
||||
};
|
||||
UnitOfWork.prototype.abort = function () {
|
||||
var promises,
|
||||
emptyFunc = function () {};
|
||||
|
||||
UnitOfWork.prototype.getFailed = function (name) {
|
||||
return name ? this._failed[name] || [] : this._failed;
|
||||
// Empty queue
|
||||
this._queue = [];
|
||||
|
||||
// Wait for pending resolvers to resolve
|
||||
promises = this._beingResolved.map(function (entry) {
|
||||
// Please note that the promise resolution/fail is silenced
|
||||
return entry.deferred.promise.then(emptyFunc, emptyFunc);
|
||||
});
|
||||
|
||||
return Q.all(promises);
|
||||
};
|
||||
|
||||
// -----------------
|
||||
|
||||
UnitOfWork.prototype._processQueue = function () {
|
||||
// If marked to fail all, reject everything
|
||||
if (this._failAll) {
|
||||
return this._rejectAll();
|
||||
}
|
||||
|
||||
UnitOfWork.prototype._doWork = function () {
|
||||
// Check if the number of allowed packages being resolved reached the maximum
|
||||
if (this._options.maxConcurrent && this._beingResolved.length >= this._options.maxConcurrent) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Find candidates for the free spots
|
||||
var freeSpots = this._options.maxConcurrent ? this._options.maxConcurrent - this._beingResolved.length : -1,
|
||||
endpoint,
|
||||
duplicate,
|
||||
// Find candidates for the free slots
|
||||
var freeSlots = this._options.maxConcurrent ? this._options.maxConcurrent - this._beingResolved.length : -1,
|
||||
entry,
|
||||
resolver,
|
||||
x;
|
||||
|
||||
for (x = 0; x < this._queue.length && freeSpots; ++x) {
|
||||
for (x = 0; x < this._queue.length && freeSlots; ++x) {
|
||||
entry = this._queue[x];
|
||||
endpoint = entry.pkg.getEndpoint();
|
||||
resolver = entry.resolver;
|
||||
|
||||
// Skip if there is a package being resolved with the same endpoint
|
||||
if (this._beingResolvedEndpoints[endpoint]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Remove from the queue
|
||||
// Remove from the queue and
|
||||
this._queue.splice(x--, 1);
|
||||
this.emit('dequeue', entry.pkg);
|
||||
|
||||
// Check if the exact same package has been resolved (same endpoint and range)
|
||||
// If so, we reject the promise with an appropriate error
|
||||
duplicate = this._findDuplicate(entry.pkg);
|
||||
if (duplicate) {
|
||||
entry.deferred.reject(createError('Package with same endpoint and range was already resolved', 'EDUPL', { pkg: duplicate }));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Package is ok to resolve
|
||||
// Put it in the being resolved list
|
||||
this._beingResolved.push(entry);
|
||||
this._beingResolvedEndpoints[endpoint] = true;
|
||||
freeSlots--;
|
||||
|
||||
// Decrement the free spots available
|
||||
freeSpots--;
|
||||
|
||||
// Resolve the promise to let the package know that it can proceed
|
||||
this.emit('before_resolve', entry.pkg);
|
||||
entry.deferred.resolve(this._onPackageDone.bind(this, entry.pkg));
|
||||
// Resolve it, waiting for it to be done
|
||||
this.emit('pre_resolve', resolver);
|
||||
resolver.resolve().then(this._onResolveSuccess.bind(this, entry), this._onResolveFailure.bind(this, entry));
|
||||
}
|
||||
};
|
||||
|
||||
UnitOfWork.prototype._rejectAll = function () {
|
||||
var error,
|
||||
queue;
|
||||
|
||||
// Reset the queue and being resolved list
|
||||
queue = this._queue;
|
||||
this._queue = [];
|
||||
this._beingResolved = [];
|
||||
this._beingResolvedEndpoints = {};
|
||||
|
||||
// Reject every deferred
|
||||
error = createError('Package rejected to be resolved', 'EFFAST');
|
||||
queue.forEach(function (entry) {
|
||||
entry.deferred.reject(error);
|
||||
});
|
||||
};
|
||||
|
||||
UnitOfWork.prototype._onPackageDone = function (pkg, err) {
|
||||
var pkgName = pkg.getName(),
|
||||
pkgEndpoint = pkg.getEndpoint(),
|
||||
arr,
|
||||
index;
|
||||
|
||||
// Ignore if already completed
|
||||
if (this._completed[pkgEndpoint] && this._completed[pkgEndpoint].indexOf(pkg) !== -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add it as completed
|
||||
arr = this._completed[pkgEndpoint] = this._completed[pkgEndpoint] || [];
|
||||
arr.push(pkg);
|
||||
UnitOfWork.prototype._onResolveSuccess = function (entry, result) {
|
||||
var index;
|
||||
|
||||
// Remove the package from the being resolved list
|
||||
index = this._indexOf(this._beingResolved, pkg);
|
||||
index = this._beingResolved.indexOf(entry);
|
||||
this._beingResolved.splice(index, 1);
|
||||
delete this._beingResolvedEndpoints[pkg.getEndpoint()];
|
||||
|
||||
// If called with no error then add it as resolved
|
||||
if (!err) {
|
||||
arr = this._resolved[pkgName] = this._resolved[pkgName] || [];
|
||||
arr.push(pkg);
|
||||
this.emit('resolve', pkg);
|
||||
// Otherwise, it failed to resolve so we mark it as failed
|
||||
} else {
|
||||
arr = this._failed[pkgName] = this._failed[pkgName] || [];
|
||||
arr.push(pkg);
|
||||
this.emit('failed', pkg);
|
||||
entry.deferred.resolve(result);
|
||||
this.emit('post_resolve', entry.resolver, result);
|
||||
|
||||
// If fail fast is enabled, make every other package in the queue to fail
|
||||
this._failAll = this._options.failFast;
|
||||
}
|
||||
// A free spot became available, so let's do some more work
|
||||
this._doWork();
|
||||
};
|
||||
|
||||
// Call process queue in order to allow packages to take over the free spots in the queue
|
||||
this._processQueue();
|
||||
UnitOfWork.prototype._onResolveFailure = function (entry, err) {
|
||||
var index;
|
||||
|
||||
// Remove the package from the being resolved list
|
||||
index = this._beingResolved.indexOf(entry);
|
||||
this._beingResolved.splice(index, 1);
|
||||
|
||||
entry.deferred.reject(err);
|
||||
this.emit('fail', entry.resolver, err);
|
||||
|
||||
// A free spot became available, so let's do some more work
|
||||
this._doWork();
|
||||
};
|
||||
|
||||
UnitOfWork.prototype._indexOf = function (arr, pkg) {
|
||||
@@ -201,16 +142,4 @@ UnitOfWork.prototype._indexOf = function (arr, pkg) {
|
||||
});
|
||||
};
|
||||
|
||||
UnitOfWork.prototype._findDuplicate = function (pkg) {
|
||||
var arr = this._completed[pkg.getEndpoint()];
|
||||
|
||||
if (!arr) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return mout.array.find(arr, function (item) {
|
||||
return item.getRange() === pkg.getRange();
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = UnitOfWork;
|
||||
Reference in New Issue
Block a user