Fix bugs in the Worker, add tests.

This commit is contained in:
André Cruz
2013-04-20 01:39:39 +01:00
parent 05d974578d
commit 99af52ac2f
4 changed files with 286 additions and 441 deletions

View File

@@ -1,423 +0,0 @@
var expect = require('expect.js');
var Package = require('../lib/core/Package.js');
var UnitOfWork = require('../lib/core/UnitOfWork');
describe('UnitOfWork', function () {
describe('.enqueue', function () {
it('return a promise', function () {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork(),
promise;
promise = unitOfWork.enqueue(pkg);
expect(promise.then).to.be.a('function');
});
it('should resolve the promise with a callback that should be called once the package is done resolving', function (done) {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork();
unitOfWork.enqueue(pkg)
.then(function (cb) {
expect(cb).to.be.a('function');
cb();
done();
}, done);
});
it('should fire the "enqueue" event', function () {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork(),
fired = false;
unitOfWork.on('enqueue', function (pkg) {
expect(pkg).to.be.an(Package);
fired = true;
});
unitOfWork.enqueue(pkg);
expect(fired).to.be(true);
});
it('should throw if the package is already queued', function () {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork();
unitOfWork.enqueue(pkg);
expect(function () {
unitOfWork.enqueue(pkg);
}).to.throwException(/already queued/);
});
it('should throw if the package is already being resolved', function (done) {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork();
unitOfWork.enqueue(pkg);
setTimeout(function () {
expect(function () {
unitOfWork.enqueue(pkg);
}).to.throwException(/already being resolved/);
done();
}, 500);
});
});
describe('.dequeue', function () {
it('should dequeue a package', function (done) {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork(),
promise,
error,
timeout;
promise = unitOfWork.enqueue(pkg);
unitOfWork.dequeue(pkg);
error = function () {
clearTimeout(timeout);
done(new Error('Package was not dequeued'));
};
promise.then(error, error);
timeout = setTimeout(done, 500);
});
it('should fire the "dequeue" event if the package was really dequeued', function () {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork(),
fired = false;
unitOfWork.on('dequeue', function () {
fired = true;
});
unitOfWork.enqueue(pkg);
unitOfWork.dequeue(pkg);
expect(fired).to.be(true);
});
it('should not fire the "dequeue" event if the package is not queued', function () {
var pkg = new Package('foo'),
unitOfWork = new UnitOfWork(),
fired = false;
unitOfWork.on('dequeue', function () {
fired = true;
});
unitOfWork.dequeue(pkg);
expect(fired).to.be(false);
});
});
describe('.getResolved()', function () {
it('should always return a valid array/object', function () {
var unitOfWork = new UnitOfWork();
expect(unitOfWork.getResolved('foo')).to.eql([]);
expect(unitOfWork.getResolved()).to.eql({});
});
it('should return resolved packages of a specific name', function (done) {
var unitOfWork = new UnitOfWork(),
pkg1 = new Package('foo', { name: 'foo' }),
pkg2 = new Package('bar', { name: 'bar' }),
pkg3 = new Package('foo', { name: 'foo', range: '~0.0.1' }),
pkg4 = new Package('bar', { name: 'bar', range: '~0.0.1' }),
arr;
function ok(cb, time) {
return function (cb) {
setTimeout(cb, time);
};
}
unitOfWork.enqueue(pkg1).then(ok(50));
unitOfWork.enqueue(pkg2).then(ok(50));
unitOfWork.enqueue(pkg3).then(ok(100));
unitOfWork.enqueue(pkg4).then(ok(100));
setTimeout(function () {
arr = unitOfWork.getResolved('foo');
expect(arr.length).to.be(2);
expect(arr[0]).to.equal(pkg1);
expect(arr[1]).to.equal(pkg3);
arr = unitOfWork.getResolved('bar');
expect(arr.length).to.be(2);
expect(arr[0]).to.equal(pkg2);
expect(arr[1]).to.equal(pkg4);
done();
}, 500);
});
it('should return all resolved packages', function (done) {
var unitOfWork = new UnitOfWork({ failFast: false }),
pkg1 = new Package('foo', { name: 'foo' }),
pkg2 = new Package('bar', { name: 'bar' }),
pkg3 = new Package('foo', { name: 'foo', range: '~0.0.1' }),
pkg4 = new Package('bar', { name: 'bar', range: '~0.0.1' }),
obj;
function ok(cb, time) {
return function (cb) {
setTimeout(cb, time);
};
}
unitOfWork.enqueue(pkg1).then(ok(50));
unitOfWork.enqueue(pkg2).then(ok(50));
unitOfWork.enqueue(pkg3).then(ok(100));
unitOfWork.enqueue(pkg4).then(ok(100));
setTimeout(function () {
obj = unitOfWork.getResolved();
expect(Object.keys(obj)).to.eql(['foo', 'bar']);
expect(obj.foo).to.equal(unitOfWork.getResolved('foo'));
expect(obj.bar).to.equal(unitOfWork.getResolved('bar'));
done();
}, 500);
});
});
describe('.getFailed()', function () {
it('should always return a valid array/object', function () {
var unitOfWork = new UnitOfWork();
expect(unitOfWork.getFailed('foo')).to.eql([]);
expect(unitOfWork.getFailed()).to.eql({});
});
it('should return failed packages of a specific name', function (done) {
var unitOfWork = new UnitOfWork({ failFast: false }),
pkg1 = new Package('foo', { name: 'foo' }),
pkg2 = new Package('bar', { name: 'bar' }),
pkg3 = new Package('foo', { name: 'foo', range: '~0.0.1' }),
pkg4 = new Package('bar', { name: 'bar', range: '~0.0.1' }),
arr;
function error(cb, time) {
return function (cb) {
setTimeout(cb.bind(cb, new Error('some error')), time);
};
}
unitOfWork.enqueue(pkg1).then(error(50));
unitOfWork.enqueue(pkg2).then(error(50));
unitOfWork.enqueue(pkg3).then(error(100));
unitOfWork.enqueue(pkg4).then(error(100));
setTimeout(function () {
arr = unitOfWork.getFailed('foo');
expect(arr.length).to.be(2);
expect(arr[0]).to.equal(pkg1);
expect(arr[1]).to.equal(pkg3);
arr = unitOfWork.getFailed('bar');
expect(arr.length).to.be(2);
expect(arr[0]).to.equal(pkg2);
expect(arr[1]).to.equal(pkg4);
done();
}, 500);
});
it('should return all failed packages', function (done) {
var unitOfWork = new UnitOfWork({ failFast: false }),
pkg1 = new Package('foo', { name: 'foo' }),
pkg2 = new Package('bar', { name: 'bar' }),
pkg3 = new Package('foo', { name: 'foo', range: '~0.0.1' }),
pkg4 = new Package('bar', { name: 'bar', range: '~0.0.1' }),
obj;
function error(cb, time) {
return function (cb) {
setTimeout(cb.bind(cb, new Error('some error')), time);
};
}
unitOfWork.enqueue(pkg1).then(error(50));
unitOfWork.enqueue(pkg2).then(error(50));
unitOfWork.enqueue(pkg3).then(error(100));
unitOfWork.enqueue(pkg4).then(error(100));
setTimeout(function () {
obj = unitOfWork.getFailed();
expect(Object.keys(obj)).to.eql(['foo', 'bar']);
expect(obj.foo).to.equal(unitOfWork.getFailed('foo'));
expect(obj.bar).to.equal(unitOfWork.getFailed('bar'));
done();
}, 500);
});
});
describe('general stuff', function () {
it('should let only allow "maxConcurrent" packages to resolve at the same time', function (done) {
var unitOfWork = new UnitOfWork({ maxConcurrent: 2 }),
nrBeingResolved = 0,
pkg1 = new Package('foo'),
pkg2 = new Package('bar'),
pkg3 = new Package('baz'),
timeout;
unitOfWork.enqueue(pkg1).then(function () { nrBeingResolved++; });
unitOfWork.enqueue(pkg2).then(function () { nrBeingResolved++; });
unitOfWork.enqueue(pkg3).then(function () {
clearTimeout(timeout);
done(new Error('Maximum concurrent packages not being accounted correctly'));
});
timeout = setTimeout(function () {
expect(nrBeingResolved).to.equal(2);
done();
}, 500);
});
it('should let every package to resolve if the "maxConcurrent" option is less or equal than 0', function (done) {
var unitOfWork = new UnitOfWork({ maxConcurrent: 0 }),
unitOfWork2 = new UnitOfWork({ maxConcurrent: -1 }),
nrBeingResolved = 0,
pkg1 = new Package('foo1'),
pkg2 = new Package('bar1'),
pkg3 = new Package('baz1'),
pkg4 = new Package('foo2'),
pkg5 = new Package('bar2'),
pkg6 = new Package('baz2'),
timeout;
function increment() {
nrBeingResolved++;
}
unitOfWork.enqueue(pkg1).then(increment);
unitOfWork.enqueue(pkg2).then(increment);
unitOfWork.enqueue(pkg3).then(increment);
unitOfWork.enqueue(pkg4).then(increment);
unitOfWork.enqueue(pkg5).then(increment);
unitOfWork.enqueue(pkg6).then(increment);
unitOfWork2.enqueue(pkg1).then(increment);
unitOfWork2.enqueue(pkg2).then(increment);
unitOfWork2.enqueue(pkg3).then(increment);
unitOfWork2.enqueue(pkg4).then(increment);
unitOfWork2.enqueue(pkg5).then(increment);
unitOfWork2.enqueue(pkg6).then(increment);
timeout = setTimeout(function () {
expect(nrBeingResolved).to.equal(12);
done();
}, 500);
});
it('should prevent packages with same endpoint from being resolved at the same time', function (done) {
var unitOfWork = new UnitOfWork({ maxConcurrent: 2 }),
resolving = [],
pkg1 = new Package('foo'),
pkg2 = new Package('foo'),
pkg3 = new Package('baz');
unitOfWork.enqueue(pkg1).then(function () { resolving.push('foo'); });
unitOfWork.enqueue(pkg2).then(function () { resolving.push('foo'); });
unitOfWork.enqueue(pkg3).then(function () { resolving.push('baz'); });
setTimeout(function () {
expect(resolving).to.eql(['foo', 'baz']);
done();
}, 500);
});
it('should reject promises when a package with same endpoint and range was already resolved', function (done) {
var unitOfWork = new UnitOfWork(),
pkg1 = new Package('foo', { range: '~0.1.1' }),
pkg2 = new Package('foo', { range: '~0.1.1' });
unitOfWork.enqueue(pkg1).then(function (cb) {
setTimeout(cb, 200);
});
unitOfWork.enqueue(pkg2).then(function () {
done(new Error('Should have detected a duplicate'));
}, function (err) {
expect(err.code).to.equal('EDUPL');
expect(err.pkg).to.equal(pkg1);
done();
});
});
it('should fire "dequeue", "before_resolve", "resolve" and "failed" during the resolve process', function (done) {
var unitOfWork = new UnitOfWork(),
events = [],
pkg1 = new Package('foo'),
pkg2 = new Package('bar');
unitOfWork.on('dequeue', function (pkg) {
events.push('dequeue', pkg.getEndpoint());
});
unitOfWork.on('before_resolve', function (pkg) {
events.push('before_resolve', pkg.getEndpoint());
});
unitOfWork.on('resolve', function (pkg) {
events.push('resolve', pkg.getEndpoint());
});
unitOfWork.on('failed', function (pkg) {
events.push('failed', pkg.getEndpoint());
});
unitOfWork.enqueue(pkg1).then(function (cb) {
setTimeout(cb, 100);
});
unitOfWork.enqueue(pkg2).then(function (cb) {
setTimeout(cb.bind(cb, new Error('some error')), 200);
});
setTimeout(function () {
expect(events).to.eql([
'dequeue', pkg1.getEndpoint(),
'before_resolve', pkg1.getEndpoint(),
'dequeue', pkg2.getEndpoint(),
'before_resolve', pkg2.getEndpoint(),
'resolve', pkg1.getEndpoint(),
'failed', pkg2.getEndpoint()
]);
done();
}, 500);
});
it('should fail fast by default', function (done) {
var unitOfWork = new UnitOfWork({ maxConcurrent: 1 }),
pkg1 = new Package('foo', { name: 'foo' }),
pkg2 = new Package('bar', { name: 'bar' });
unitOfWork.enqueue(pkg1).then(function (cb) {
setTimeout(cb.bind(cb, new Error('some error')), 200);
});
unitOfWork.enqueue(pkg2).then(function () {
done(new Error('Should have failed fast'));
}, function (err) {
expect(err.code).to.equal('EFFAST');
done();
});
});
it('should not fail fast if the "failFast" option is false', function (done) {
var unitOfWork = new UnitOfWork({ maxConcurrent: 1, failFast: false }),
pkg1 = new Package('foo', { name: 'foo' }),
pkg2 = new Package('bar', { name: 'bar' });
unitOfWork.enqueue(pkg1).then(function (cb) {
setTimeout(cb.bind(cb, new Error('some error')), 200);
});
unitOfWork.enqueue(pkg2).then(function () {
done();
}, function (err) {
done(err.code === 'EFFAST' ? new Error('Should not have failed fast') : null);
});
});
});
});

267
test/worker.js Normal file
View File

@@ -0,0 +1,267 @@
var expect = require('expect.js');
var Q = require('Q');
var Worker = require('../lib/resolve/Worker');
describe('Worker', function () {
var timeout;
afterEach(function () {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
});
describe('.enqueue', function () {
it('return a promise', function () {
var worker = new Worker(),
promise;
promise = worker.enqueue(function () { return Q.resolve('foo'); });
expect(promise.then).to.be.a('function');
});
it('should call the function and resolve', function (done) {
var worker = new Worker();
worker.enqueue(function () { return Q.resolve('foo'); })
.then(function (ret) {
expect(ret).to.equal('foo');
done();
});
});
it('should work with functions that return values syncronously', function (done) {
var worker = new Worker();
worker.enqueue(function () { return 'foo'; })
.then(function (ret) {
expect(ret).to.equal('foo');
done();
});
});
it('should assume the default concurrency when a type is not specified', function (done) {
var worker = new Worker(1),
calls = 0;
worker.enqueue(function () { calls++; return Q.defer().promise; });
worker.enqueue(function () { done(new Error('Should not be called!')); });
timeout = setTimeout(function () {
expect(calls).to.equal(1);
done();
}, 100);
});
it('should assume the default concurrency when a type is not known', function (done) {
var worker = new Worker(1),
calls = 0;
worker.enqueue(function () { calls++; return Q.defer().promise; }, 'foo_type');
worker.enqueue(function () { done(new Error('Should not be called!')); }, 'foo_type');
timeout = setTimeout(function () {
expect(calls).to.equal(1);
done();
}, 100);
});
it('should have different slots when type is not passed or is not known', function (done) {
var worker = new Worker(1),
calls = 0;
worker.enqueue(function () { calls++; return Q.defer().promise; });
worker.enqueue(function () { calls++; return Q.defer().promise; }, 'foo_type');
worker.enqueue(function () { done(new Error('Should not be called!')); });
worker.enqueue(function () { done(new Error('Should not be called!')); }, 'foo_type');
timeout = setTimeout(function () {
expect(calls).to.equal(2);
done();
}, 100);
});
it('should use the configured concurrency for the type', function (done) {
var worker = new Worker(1, {
foo: 2,
bar: 3
}),
calls = {
def: 0,
foo: 0,
bar: 0
};
worker.enqueue(function () { calls.def++; return Q.defer().promise; });
worker.enqueue(function () { done(new Error('Should not be called!')); });
worker.enqueue(function () { calls.foo++; return Q.defer().promise; }, 'foo');
worker.enqueue(function () { calls.foo++; return Q.defer().promise; }, 'foo');
worker.enqueue(function () { calls.bar++; return Q.defer().promise; }, 'bar');
worker.enqueue(function () { calls.bar++; return Q.defer().promise; }, 'bar');
worker.enqueue(function () { calls.bar++; return Q.defer().promise; }, 'bar');
worker.enqueue(function () { done(new Error('Should not be called!')); }, 'bar');
timeout = setTimeout(function () {
expect(calls.def).to.equal(1);
expect(calls.foo).to.equal(2);
expect(calls.bar).to.equal(3);
done();
}, 100);
});
});
describe('.abort', function () {
it('should clear the whole queue', function (done) {
var worker = new Worker(1, {
foo: 2
}),
calls = 0;
worker.enqueue(function () { calls++; return Q.resolve(); });
worker.enqueue(function () { done(new Error('Should not be called!')); });
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
worker.enqueue(function () { done(new Error('Should not be called!')); }, 'foo');
worker.abort();
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
timeout = setTimeout(function () {
expect(calls).to.equal(4);
done();
}, 100);
});
it('should wait for currently running functions to finish', function (done) {
var worker = new Worker(1, {
foo: 2
}),
calls = [];
worker.enqueue(function () { calls.push(1); return Q.resolve(); });
worker.enqueue(function () { calls.push(2); return Q.resolve(); });
worker.enqueue(function () {
var deferred = Q.defer();
setTimeout(function () {
calls.push(3);
deferred.resolve();
}, 100);
return deferred.promise;
}, 'foo');
timeout = setTimeout(function () {
worker.abort().then(function () {
expect(calls).to.eql([1, 2, 3]);
done();
});
}, 30);
});
});
describe('scheduler', function () {
it('should start remaining tasks when one ends', function (done) {
var worker = new Worker(1, {
foo: 2
}),
calls = 0;
worker.enqueue(function () { calls++; return Q.resolve(); });
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
worker.enqueue(function () { calls++; return Q.resolve(); });
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
timeout = setTimeout(function () {
expect(calls).to.equal(5);
done();
}, 100);
});
it('should respect the enqueue order', function (done) {
var worker = new Worker(1),
defCalls = [],
fooCalls = [];
worker.enqueue(function () {
defCalls.push(1);
return Q.resolve();
});
worker.enqueue(function () {
defCalls.push(2);
return Q.resolve();
});
worker.enqueue(function () {
defCalls.push(3);
return Q.resolve();
});
worker.enqueue(function () {
fooCalls.push(1);
return Q.resolve();
}, 'foo');
worker.enqueue(function () {
fooCalls.push(2);
return Q.resolve();
}, 'foo');
worker.enqueue(function () {
fooCalls.push(3);
return Q.resolve();
}, 'foo');
timeout = setTimeout(function () {
expect(defCalls).to.eql([1, 2, 3]);
expect(fooCalls).to.eql([1, 2, 3]);
done();
}, 100);
});
it('should wait for one slot in every type on a multi-type function', function (done) {
var worker = new Worker(1, {
foo: 1,
bar: 2
}),
calls = 0;
worker.enqueue(function () { return Q.defer().promise; }, 'foo');
worker.enqueue(function () { return Q.defer().promise; }, 'bar');
worker.enqueue(function () { calls++; return Q.resolve(); }, 'bar');
worker.enqueue(function () { done(new Error('Should not be called!')); }, ['foo', 'bar']);
worker.enqueue(function () { calls++; return Q.resolve(); }, 'bar');
worker.enqueue(function () { done(new Error('Should not be called!')); }, 'foo');
timeout = setTimeout(function () {
expect(calls).to.equal(2);
done();
}, 100);
});
it('should free all type slots when finished running a function', function (done) {
var worker = new Worker(1, {
foo: 1,
bar: 2
}),
calls = 0;
worker.enqueue(function () { return Q.defer().promise; }, 'bar');
worker.enqueue(function () { calls++; return Q.resolve(); }, ['foo', 'bar']);
worker.enqueue(function () { calls++; return Q.resolve(); }, 'foo');
worker.enqueue(function () { calls++; return Q.resolve(); }, 'bar');
timeout = setTimeout(function () {
expect(calls).to.equal(3);
done();
}, 100);
});
});
});