Change UnitOfWork to Worker.

This commit is contained in:
André Cruz
2013-04-19 19:05:33 +01:00
parent f100982b04
commit b968b4a9a4
6 changed files with 200 additions and 246 deletions

View File

@@ -1,144 +0,0 @@
var Q = require('q');
var util = require('util');
var mout = require('mout');
var events = require('events');
var UnitOfWork = function (options) {
// Ensure options defaults
this._options = mout.object.mixIn({
maxConcurrent: 5
}, options);
// Parse some of the options
this._options.maxConcurrent = this._options.maxConcurrent > 0 ? this._options.maxConcurrent : 0;
// Initialize some needed properties
this._queue = [];
this._beingResolved = [];
};
util.inherits(UnitOfWork, events.EventEmitter);
// -----------------
UnitOfWork.prototype.enqueue = function (resolver) {
var deferred;
if (this.has(resolver)) {
throw new Error('Attempting to enqueue an already enqueued resolver');
}
deferred = Q.defer();
// Add to the queue
this._queue.push({
resolver: resolver,
deferred: deferred
});
// Process the queue shortly later so that handlers can be attached to the returned promise
Q.fcall(this.doWork.bind(this));
return deferred.promise;
};
UnitOfWork.prototype.has = function (resolver) {
var index;
// Check in the queue
index = this._indexOf(this._queue, resolver);
if (index !== -1) {
return true;
}
// Check in the being resolved list
index = this._indexOf(this._beingResolved, resolver);
if (index !== -1) {
return true;
}
return false;
};
UnitOfWork.prototype.abort = function () {
var promises,
emptyFunc = function () {};
// Empty queue
this._queue = [];
// Wait for pending resolvers to resolve
promises = this._beingResolved.map(function (entry) {
// Please note that the promise resolution/fail is silenced
return entry.deferred.promise.then(emptyFunc, emptyFunc);
});
return Q.all(promises);
};
// -----------------
UnitOfWork.prototype._doWork = function () {
// Check if the number of allowed packages being resolved reached the maximum
if (this._options.maxConcurrent && this._beingResolved.length >= this._options.maxConcurrent) {
return;
}
// Find candidates for the free slots
var freeSlots = this._options.maxConcurrent ? this._options.maxConcurrent - this._beingResolved.length : -1,
entry,
resolver,
x;
for (x = 0; x < this._queue.length && freeSlots; ++x) {
entry = this._queue[x];
resolver = entry.resolver;
// Remove from the queue and
this._queue.splice(x--, 1);
// Put it in the being resolved list
this._beingResolved.push(entry);
freeSlots--;
// Resolve it, waiting for it to be done
this.emit('pre_resolve', resolver);
resolver.resolve().then(this._onResolveSuccess.bind(this, entry), this._onResolveFailure.bind(this, entry));
}
};
UnitOfWork.prototype._onResolveSuccess = function (entry, result) {
var index;
// Remove the package from the being resolved list
index = this._beingResolved.indexOf(entry);
this._beingResolved.splice(index, 1);
entry.deferred.resolve(result);
this.emit('post_resolve', entry.resolver, result);
// A free spot became available, so let's do some more work
this._doWork();
};
UnitOfWork.prototype._onResolveFailure = function (entry, err) {
var index;
// Remove the package from the being resolved list
index = this._beingResolved.indexOf(entry);
this._beingResolved.splice(index, 1);
entry.deferred.reject(err);
this.emit('fail', entry.resolver, err);
// A free spot became available, so let's do some more work
this._doWork();
};
UnitOfWork.prototype._indexOf = function (arr, pkg) {
return mout.array.findIndex(arr, function (item) {
return item.pkg === pkg;
});
};
module.exports = UnitOfWork;

150
lib/resolve/Worker.js Normal file
View File

@@ -0,0 +1,150 @@
var Q = require('q');
var util = require('util');
var events = require('events');
var mout = require('mout');
var Worker = function (defaultConcurrency, types) {
this._defaultConcurrency = defaultConcurrency != null ? defaultConcurrency : 10;
// Initialize some needed properties
this._queue = {};
this._slots = types || {};
this._executing = [];
};
util.inherits(Worker, events.EventEmitter);
// -----------------
Worker.prototype.enqueue = function (func, type) {
var deferred = Q.defer(),
types,
entry;
type = type || '';
types = Array.isArray(type) ? type : [type];
entry = {
func: func,
types: types,
deferred: deferred
};
// Add the entry to all the types queues
types.forEach(function (type) {
this._queue[type] = this._queue[type] || [];
this._queue.push(entry);
}, this);
// Process the entry shortly later so that handlers can be attached to the returned promise
Q.fcall(this._processEntry.bind(this, entry));
return deferred.promise;
};
Worker.prototype.abort = function (type) {
var promises = [],
types;
type = type || '';
types = Array.isArray(type) ? type : [type];
// Empty the queue of each specified types
types.forEach(function (type) {
this._queue[type] = [];
}, this);
// Wait for all pending functions of the specified type to finish
promises = this._executing.map(function (entry) {
return entry.deferred.promise;
});
return Q.allResolved(promises)
.then(function () {}); // Resolve with no value
};
// -----------------
Worker.prottoype._processQueue = function (type) {
var queue = this._queue[type],
length = queue ? queue.length : 0,
x;
for (x = 0; x < length; x += 1) {
if (!this._processEntry(this._queue[x])) {
break;
}
}
};
Worker.prototype._processEntry = function (entry) {
var allFree = entry.types.every(this._hasSlot, this);
// If there is a free slot for every tag
if (allFree) {
// Foreach type
entry.types.forEach(function (type) {
// Remove entry from the queue
mout.utils.remove(this._queue[type], entry);
// Take slot
this._takeSlot(type);
}, this);
// Execute the function
this._executing.push(entry);
entry.func().then(
this._onResolve.bind(this, entry, true),
this._onResolve.bind(this, entry, false)
);
}
return allFree;
};
Worker.prototype._onResolve = function (entry, ok, result) {
// Resolve/reject the deferred based on sucess/error of the promise
if (ok) {
entry.deferred.resolve(result);
} else {
entry.deferred.reject(result);
}
// Remove it from the executing list
mout.array.remove(this._executing, entry);
// Free up slots for every type
entry.types.forEach(this._freeSlot, this);
// Find candidates for the free slots of each type
entry.types.forEach(this._processQueue, this);
};
Worker.prototype._hasSlot = function (type) {
var freeSlots = this._slots[type];
if (freeSlots == null) {
freeSlots = this._defaultConcurrency;
}
return freeSlots > 0;
};
Worker.prototype._takeSlot = function (type) {
if (this._slots[type] == null) {
this._slots[type] = this._defaultConcurrency;
} else if (!this._slots[type]) {
throw new Error('No free slots');
}
// Decrement the free slots
--this._slots[type];
};
Worker.prototype._freeSlot = function (type) {
if (this._slots[type] != null) {
++this._slots[type];
}
};
module.exports = Worker;