Compare commits

...

7 Commits

Author SHA1 Message Date
Timothy J Fontaine
c20d028652 http: sockets close transitions via _socketClose 2015-03-24 14:09:17 -07:00
Timothy J Fontaine
ea56a3690e tlswrap: don't emit secure twice 2015-03-24 14:09:17 -07:00
Timothy J Fontaine
c5a0360072 agent: add warning comment about interface 2015-03-24 14:09:17 -07:00
Timothy J Fontaine
f840636e0a streams: resume/pause should not fire after end 2015-03-24 14:09:16 -07:00
Timothy J Fontaine
13f2e7c117 net: reinitialize duplex on reconnect 2015-03-24 14:09:16 -07:00
Timothy J Fontaine
f201331754 net: add destroy event 2015-03-24 14:09:16 -07:00
Timothy J Fontaine
fa79367e98 events: add StrictEE 2015-03-24 14:09:16 -07:00
13 changed files with 424 additions and 19 deletions

View File

@@ -63,6 +63,8 @@ function Agent(options) {
var name = self.getName(options);
debug('agent.on(free)', name);
// This seems an absurd check, we should not be firing this event after
// it has been closed/destroyed
if (!socket.destroyed &&
self.requests[name] && self.requests[name].length) {
self.requests[name].shift().onSocket(socket);

View File

@@ -42,6 +42,29 @@ var Agent = require('_http_agent');
function ClientRequest(options, cb) {
var self = this;
OutgoingMessage.call(self);
this.strictEERegister({
connect: {
maxCount: 1,
after: ['socket'],
},
close: {
maxCount: 1,
after: ['error', 'upgrade', 'aborted', 'finish', 'end'],
},
error: {
maxCount: 1,
notAfter: ['close'],
},
continue: {
maxCount: 1,
notAfter: ['error', 'close'],
after: ['socket'],
},
response: {
notAfter: ['error', 'close'],
maxCount: 1,
},
}, true);
if (util.isString(options)) {
options = url.parse(options);

View File

@@ -38,6 +38,15 @@ exports.readStop = readStop;
/* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage(socket) {
Stream.Readable.call(this);
this.strictEERegister({
aborted: {
maxCount: 1,
notAfter: ['error', 'close', 'end'],
},
timeout: {
notAfter: ['error', 'close'],
},
});
// XXX This implementation is kind of all over the place
// When the parser emits body chunks, they go in this list.

View File

@@ -62,7 +62,18 @@ utcDate._onTimeout = function() {
function OutgoingMessage() {
Stream.call(this);
Stream.Writable.call(this);
this.strictEERegister({
socket: {
notAfter: ['close', 'error', 'finish'],
},
upgrade: {
maxCount: 1,
},
timeout: {
notAfter: ['error', 'close'],
},
});
this.output = [];
this.outputEncodings = [];
@@ -90,7 +101,7 @@ function OutgoingMessage() {
this._headers = null;
this._headerNames = {};
}
util.inherits(OutgoingMessage, Stream);
util.inherits(OutgoingMessage, Stream.Writable);
exports.OutgoingMessage = OutgoingMessage;

View File

@@ -100,6 +100,15 @@ var STATUS_CODES = exports.STATUS_CODES = {
function ServerResponse(req) {
OutgoingMessage.call(this);
this.strictEERegister({
close: {
maxCount: 1,
after: ['abort', 'error', '_socketClose'],
},
_socketClose: {
maxCount: 1,
},
}, true);
if (req.method === 'HEAD') this._hasBody = false;
@@ -143,7 +152,10 @@ function onServerResponseClose() {
// Ergo, we need to deal with stale 'close' events and handle the case
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
if (this._httpMessage) this._httpMessage.emit('close');
if (this._httpMessage) {
this._httpMessage.emit('_socketClose');
this._httpMessage.emit('close');
}
}
ServerResponse.prototype.assignSocket = function(socket) {
@@ -238,6 +250,27 @@ function Server(requestListener) {
if (!(this instanceof Server)) return new Server(requestListener);
net.Server.call(this, { allowHalfOpen: true });
this.strictEERegister({
checkContinue: {
notAfter: ['close', 'error'],
after: ['listening', 'connection'],
},
request: {
notAfter: ['close', 'error'],
},
socket: {
notAfter: ['close', 'error'],
},
timeout: {
after: ['listening', 'connection'],
notAfter: ['error', 'close'],
},
upgrade: {
after: ['listening', 'connection'],
notAfter: ['close', 'error'],
},
});
if (requestListener) {
this.addListener('request', requestListener);
}

View File

@@ -22,7 +22,7 @@
module.exports = Readable;
Readable.ReadableState = ReadableState;
var EE = require('events').EventEmitter;
var EE = require('events').StrictEE;
var Stream = require('stream');
var Buffer = require('buffer').Buffer;
var util = require('util');
@@ -106,6 +106,24 @@ function Readable(options) {
this.readable = true;
Stream.call(this);
this.strictEERegister({
data: {
notAfter: ['close', 'end', 'error'],
},
end: {
maxCount: 1,
notAfter: ['error', 'close'],
},
pause: {
notAfter: ['end', 'error', 'close'],
},
readable: {
notAfter: ['end', 'error', 'close'],
},
resume: {
notAfter: ['end', 'error', 'close'],
},
}, true);
}
// Manually shove something into the read() buffer.
@@ -720,6 +738,9 @@ function resume(stream, state) {
}
function resume_(stream, state) {
if (state.ended)
return;
if (!state.reading) {
debug('resume read 0');
stream.read(0);
@@ -734,7 +755,8 @@ function resume_(stream, state) {
Readable.prototype.pause = function() {
debug('call pause flowing=%j', this._readableState.flowing);
if (false !== this._readableState.flowing) {
if (false !== this._readableState.flowing &&
!this._readableState.ended) {
debug('pause');
this._readableState.flowing = false;
this.emit('pause');

View File

@@ -155,6 +155,27 @@ function Writable(options) {
this.writable = true;
Stream.call(this);
this.strictEERegister({
drain: {
notAfter: ['finish', 'close', 'error'],
},
finish: {
maxCount: 1,
after: ['prefinish'],
notAfter: ['close', 'error'],
},
pipe: {
notAfter: ['close', 'error', 'finish'],
},
prefinish: {
maxCount: 1,
notAfter: ['close', 'error', 'finish'],
},
unpipe: {
after: ['pipe'],
notAfter: ['close', 'error'],
},
}, true);
}
// Otherwise people can pipe Writable streams, which is just wrong.

View File

@@ -235,6 +235,18 @@ function TLSSocket(socket, options) {
readable: false,
writable: false
});
this.strictEERegister({
secure: {
maxCount: 1,
},
secureConnect: {
maxCount: 1,
after: ['connect', 'secure'],
},
_tlsError: {
maxCount: 1,
},
});
if (socket) {
this._parent = socket;
@@ -453,6 +465,9 @@ TLSSocket.prototype._finishInit = function() {
this.servername = this.ssl.getServername();
}
if (this._secureEstablished)
return;
debug('secure established');
this._secureEstablished = true;
if (this._tlsOptions.handshakeTimeout > 0)
@@ -674,6 +689,12 @@ function Server(/* [options], listener */) {
}
});
});
this.strictEERegister({
secureConnection: {
after: ['listening', 'connection'],
notAfter: ['close', 'error'],
},
});
if (listener) {
this.on('secureConnection', listener);

View File

@@ -19,7 +19,7 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var domain;
var assert, domain;
var util = require('util');
function EventEmitter() {
@@ -315,3 +315,163 @@ EventEmitter.listenerCount = function(emitter, type) {
ret = emitter._events[type].length;
return ret;
};
// StrictEE
var strictID = 0;
var expectedEvents = {};
function StrictEE(rules, override) {
if (EventEmitter.TrackMinCount === undefined) {
EventEmitter.TrackMinCount = process.NODE_STRICT_EVENTS || false;
process.on('exit', function StrictEEOnExit() {
if (EventEmitter.TrackMinCount && Object.keys(expectedEvents).length > 0) {
(function MinCountAssertionFailure(ee) {
process.abort();
})(expectedEvents);
}
});
}
if (EventEmitter.RequireEventDef === undefined) {
EventEmitter.RequireEventDef = process.NODE_REQUIRE_EVENTS || true;
}
if (assert === undefined)
assert = require('assert');
if (!(this instanceof StrictEE))
return new StrictEE(rules);
this._strictID = undefined;
this.strictEERegister(rules, override);
EventEmitter.call(this);
}
util.inherits(StrictEE, EventEmitter);
EventEmitter.StrictEE = StrictEE;
EventEmitter.TrackMinCount = undefined;
EventEmitter.RequireEventDef = undefined;
StrictEE.prototype.strictEERegister = function strictRegister(rules, override) {
var oldRules = this._strictEERules = this._strictEERules || {};
this._strictEvents = this._strictEvents || {};
var i, e;
for (i in rules) {
if (!override) {
var r = oldRules[i];
assert.strictEqual(r, undefined,
util.format('Event "%s" already registered, %j', i, r));
}
e = rules[i];
if (util.isArray(e.notAfter) && e.notAfter.indexOf('*') > -1) {
assert.strictEqual(e.notAfter.length, 1,
util.format(
'Event "%s" notAfter wild card must only have one entry, not %j',
i, e.notAfter));
}
if (EventEmitter.TrackMinCount && util.isNumber(e.minCount)) {
if (!this._strictID)
this._strictID = ++strictID;
var ee = expectedEvents[strictID] = expectedEvents[strictID] || { __: this };
ee[i] = e.minCount;
}
}
this._strictEERules = util._extend(oldRules, rules);
};
StrictEE.prototype.emit = function strictEmit(type) {
var strictRule = this._strictEERules[type];
if (!strictRule) {
if (EventEmitter.RequireEventDef)
assert.ok(false, util.format('Event "%s" was not defined, fired %j',
type, Object.keys(this._strictEvents)));
else
return EventEmitter.prototype.emit.apply(this, arguments);
}
var keys;
if (util.isArray(strictRule.notAfter) && strictRule.notAfter.length === 1 &&
strictRule.notAfter[0] === '*') {
keys = Object.keys(this._strictEvents);
assert.strictEqual(keys.length, 0,
util.format('Event "%s" must come first but came after %j', type, keys));
}
var strictEvent = this._strictEvents[type] || {};
strictEvent.count = 1 + (strictEvent.count || 0);
if (util.isNumber(strictRule.maxCount)) {
assert.ok(strictEvent.count <= strictRule.maxCount,
util.format('Event "%s" fired %d times, more than %d',
type, strictEvent.count, strictRule.maxCount));
}
if (util.isNumber(strictRule.minCount)) {
assert.ok(this._strictID, 'Object must have a strictID');
var oee = expectedEvents[this._strictID];
if (oee) {
var ee = oee[type];
if (ee) {
if (--ee === 0)
delete oee[type];
}
if (Object.keys(oee) === 1)
delete expectedEvents[this._strictID];
}
}
var found, i, e;
if (util.isArray(strictRule.notAfter)) {
found = false;
for (i in strictRule.notAfter) {
if (i === '*') continue;
e = strictRule.notAfter[i];
if (this._strictEvents[e]) {
found = e;
break;
}
}
assert.strictEqual(found, false,
util.format('Event "%s" must **not** fire after "%s", fired %j',
type, found, Object.keys(this._strictEvents)));
}
if (util.isArray(strictRule.after)) {
found = false;
for (i in strictRule.after) {
e = strictRule.after[i];
if (this._strictEvents[e]) {
found = true;
break;
}
}
console.error(this);
assert.strictEqual(found, true,
util.format('Event "%s" must fire after at least one of %j, fired %j',
type, strictRule.after, Object.keys(this._strictEvents)));
}
this._strictEvents[type] = strictEvent;
return EventEmitter.prototype.emit.apply(this, arguments);
};

View File

@@ -1567,6 +1567,12 @@ function ReadStream(path, options) {
}, options || {});
Readable.call(this, options);
this.strictEERegister({
open: {
maxCount: 1,
notAfter: ['close', 'error'],
},
});
this.path = path;
this.fd = options.hasOwnProperty('fd') ? options.fd : null;
@@ -1733,6 +1739,12 @@ function WriteStream(path, options) {
options = options || {};
Writable.call(this, options);
this.strictEERegister({
open: {
maxCount: 1,
notAfter: ['error', 'close'],
},
});
this.path = path;
this.fd = null;
@@ -1814,7 +1826,13 @@ WriteStream.prototype.destroySoon = WriteStream.prototype.end;
// SyncWriteStream is internal. DO NOT USE.
// Temporary hack for process.stdout and process.stderr when piped to files.
function SyncWriteStream(fd, options) {
Stream.call(this);
Stream.Writable.call(this);
this.strictEERegister({
open: {
maxOpen: 1,
notAfter: ['error', 'close'],
},
});
options = options || {};
@@ -1825,7 +1843,7 @@ function SyncWriteStream(fd, options) {
options.autoClose : true;
}
util.inherits(SyncWriteStream, Stream);
util.inherits(SyncWriteStream, Stream.Writable);
// Export

View File

@@ -34,6 +34,26 @@ function Server(opts, requestListener) {
}
tls.Server.call(this, opts, http._connectionListener);
this.strictEERegister({
checkContinue: {
notAfter: ['close', 'error'],
after: ['listening', 'connection'],
},
request: {
notAfter: ['close', 'error'],
},
socket: {
notAfter: ['close', 'error'],
},
timeout: {
after: ['listening', 'connection'],
notAfter: ['error', 'close'],
},
upgrade: {
after: ['listening', 'connection'],
notAfter: ['close', 'error'],
},
});
this.httpAllowHalfOpen = false;

View File

@@ -148,6 +148,7 @@ function Socket(options) {
else if (util.isUndefined(options))
options = {};
this._originalOptions = options;
stream.Duplex.call(this, options);
if (options.handle) {
@@ -197,6 +198,38 @@ function Socket(options) {
this.read(0);
}
}
this.strictEERegister({
agentRemove: {
notAfter: ['close', 'error'],
},
// aborted comes from http
connect: {
maxCount: 1,
notAfter: ['data', 'error', 'end', 'finish', 'close'],
},
destroy: {
//maxCount: 1,
notAfter: ['close'],
},
// XXX
// used by httpAgent, and inexplicably we can't actually say it doesn't
// happen after any of the events you might consider being final.
free: {
//notAfter: ['error', 'finish', 'end', 'close'],
},
lookup: {
maxCount: 1,
notAfter: ['connect', 'data', 'error', 'end', 'finish', 'close'],
},
timeout: {
notAfter: ['error', 'end', 'close'],
},
_socketEnd: {
maxCount: 1,
notAfter: ['error', 'end', 'close'],
},
});
}
util.inherits(Socket, stream.Duplex);
@@ -492,6 +525,7 @@ Socket.prototype._destroy = function(exception, cb) {
// to make it re-entrance safe in case Socket.prototype.destroy()
// is called within callbacks
this.destroyed = true;
this.emit('destroy', exception);
fireErrorCallbacks();
if (this.server) {
@@ -856,16 +890,12 @@ Socket.prototype.connect = function(options, cb) {
return Socket.prototype.connect.apply(this, args);
}
// XXX this logic is suspect
if (this.destroyed) {
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.finished = false;
this._writableState.errorEmitted = false;
stream.Duplex.call(this, this._originalOptions);
this.destroyed = false;
this._handle = null;
this._strictEvents = {};
}
var self = this;
@@ -1004,7 +1034,32 @@ function afterConnect(status, handle, req, readable, writable) {
function Server(/* [ options, ] listener */) {
if (!(this instanceof Server)) return new Server(arguments[0], arguments[1]);
events.EventEmitter.call(this);
events.StrictEE.call(this, {
clientError: {
after: ['listening'],
notAfter: ['error', 'close'],
},
close: {
after: ['listening', 'error'],
maxCount: 1,
},
connect: {
after: ['listening'],
notAfter: ['error', 'close'],
},
connection: {
after: ['listening'],
notAfter: ['close', 'error'],
},
error: {
notAfter: ['close'],
maxCount: 1,
},
listening: {
notAfter: ['close', 'error'],
maxCount: 1,
},
});
var self = this;
@@ -1044,7 +1099,7 @@ function Server(/* [ options, ] listener */) {
this.allowHalfOpen = options.allowHalfOpen || false;
this.pauseOnConnect = !!options.pauseOnConnect;
}
util.inherits(Server, events.EventEmitter);
util.inherits(Server, events.StrictEE);
exports.Server = Server;

View File

@@ -21,7 +21,7 @@
module.exports = Stream;
var EE = require('events').EventEmitter;
var EE = require('events').StrictEE;
var util = require('util');
util.inherits(Stream, EE);
@@ -40,7 +40,17 @@ Stream.Stream = Stream;
// part of this class) is overridden in the Readable class.
function Stream() {
EE.call(this);
EE.call(this, {
close: {
maxCount: 1,
//aborted comes from http semantics
after: ['error', 'end', 'finish', 'destroy', 'aborted'],
},
error: {
maxCount: 1,
notAfter: ['close', 'end', 'finish'],
},
}, true);
}
Stream.prototype.pipe = function(dest, options) {