Files
socket.io/lib/socket.js
2012-10-21 09:28:30 -07:00

419 lines
9.4 KiB
JavaScript

/**
* Module dependencies.
*/
var util = require('./util')
, transports = require('./transports')
, debug = require('debug')('engine-client:socket')
, EventEmitter = require('./event-emitter');
/**
* Module exports.
*/
module.exports = Socket;
/**
* Socket constructor.
*
* @param {Object} options
* @api public
*/
function Socket (opts) {
if ('string' == typeof opts) {
var uri = util.parseUri(opts);
opts = arguments[1] || {};
opts.host = uri.host;
opts.secure = uri.protocol == 'https' || uri.protocol == 'wss';
opts.port = uri.port || (opts.secure ? 443 : 80);
}
opts = opts || {};
this.secure = null != opts.secure ? opts.secure : (global.location && 'https:' == global.location.protocol);
this.host = opts.host || opts.hostname || 'localhost';
this.port = opts.port || (this.secure ? 443 : 80);
this.query = opts.query || {};
this.query.uid = rnd();
this.upgrade = false !== opts.upgrade;
this.resource = opts.resource || 'default';
this.path = (opts.path || '/engine.io').replace(/\/$/, '');
this.path += '/' + this.resource + '/';
this.forceJSONP = !!opts.forceJSONP;
this.timestampParam = opts.timestampParam || 't';
this.timestampRequests = !!opts.timestampRequests;
this.flashPath = opts.flashPath || '';
this.transports = opts.transports || ['polling', 'websocket', 'flashsocket'];
this.readyState = '';
this.writeBuffer = [];
this.policyPort = opts.policyPort || 843;
this.open();
};
/**
* Inherits from EventEmitter.
*/
util.inherits(Socket, EventEmitter);
/**
* Creates transport of the given type.
*
* @param {String} transport name
* @return {Transport}
* @api private
*/
Socket.prototype.createTransport = function (name) {
debug('creating transport "%s"', name);
var query = clone(this.query);
query.transport = name;
if (this.id) {
query.sid = this.id;
}
var transport = new transports[name]({
host: this.host
, port: this.port
, secure: this.secure
, path: this.path
, query: query
, forceJSONP: this.forceJSONP
, timestampRequests: this.timestampRequests
, timestampParam: this.timestampParam
, flashPath: this.flashPath
, policyPort: this.policyPort
});
return transport;
};
function clone (obj) {
var o = {};
for (var i in obj) {
if (obj.hasOwnProperty(i)) {
o[i] = obj[i];
}
}
return o;
}
/**
* Initializes transport to use and starts probe.
*
* @api private
*/
Socket.prototype.open = function () {
this.readyState = 'opening';
var transport = this.createTransport(this.transports[0]);
transport.open();
this.setTransport(transport);
};
/**
* Sets the current transport. Disables the existing one (if any).
*
* @api private
*/
Socket.prototype.setTransport = function (transport) {
var self = this;
if (this.transport) {
debug('clearing existing transport');
this.transport.removeAllListeners();
}
// set up transport
this.transport = transport;
// set up transport listeners
transport
.on('drain', function () {
self.flush();
})
.on('packet', function (packet) {
self.onPacket(packet);
})
.on('error', function (e) {
self.onError(e);
})
.on('close', function () {
self.onClose('transport close');
});
};
/**
* Probes a transport.
*
* @param {String} transport name
* @api private
*/
Socket.prototype.probe = function (name) {
debug('probing transport "%s"', name);
var transport = this.createTransport(name, { probe: 1 })
, self = this;
transport.once('open', function () {
debug('probe transport "%s" opened', name);
transport.send([{ type: 'ping', data: 'probe' }]);
transport.once('packet', function (msg) {
if ('pong' == msg.type && 'probe' == msg.data) {
debug('probe transport "%s" pong', name);
self.upgrading = true;
self.emit('upgrading', transport);
debug('pausing current transport "%s"', self.transport.name);
self.transport.pause(function () {
if ('closed' == self.readyState || 'closing' == self.readyState) return;
debug('changing transport and sending upgrade packet');
self.emit('upgrade', transport);
self.setTransport(transport);
transport.send([{ type: 'upgrade' }]);
transport = null;
self.upgrading = false;
self.flush();
});
} else {
debug('probe transport "%s" failed', name);
var err = new Error('probe error');
err.transport = transport.name;
self.emit('error', err);
}
});
});
transport.open();
this.once('close', function () {
if (transport) {
debug('socket closed prematurely - aborting probe');
transport.close();
transport = null;
}
});
this.once('upgrading', function (to) {
if (transport && to.name != transport.name) {
debug('"%s" works - aborting "%s"', to.name, transport.name);
transport.close();
transport = null;
}
});
};
/**
* Called when connection is deemed open.
*
* @api public
*/
Socket.prototype.onOpen = function () {
debug('socket open');
this.readyState = 'open';
this.emit('open');
this.onopen && this.onopen.call(this);
this.flush();
if (this.upgrade && this.transport.pause) {
debug('starting upgrade probes');
for (var i = 0, l = this.upgrades.length; i < l; i++) {
this.probe(this.upgrades[i]);
}
}
};
/**
* Handles a packet.
*
* @api private
*/
Socket.prototype.onPacket = function (packet) {
if ('opening' == this.readyState || 'open' == this.readyState) {
debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
// Socket is live - any packet counts
this.emit('heartbeat');
switch (packet.type) {
case 'open':
this.onHandshake(util.parseJSON(packet.data));
break;
case 'pong':
this.ping();
break;
case 'error':
var err = new Error('server error');
err.code = packet.data;
this.emit('error', err);
break;
case 'message':
this.emit('message', packet.data);
var event = { data: packet.data };
event.toString = function () {
return packet.data;
};
this.onmessage && this.onmessage.call(this, event);
break;
}
} else {
debug('packet received with socket readyState "%s"', this.readyState);
}
};
/**
* Called upon handshake completion.
*
* @param {Object} handshake obj
* @api private
*/
Socket.prototype.onHandshake = function (data) {
this.emit('handshake', data);
this.id = data.sid;
this.transport.query.sid = data.sid;
this.upgrades = data.upgrades;
this.pingInterval = data.pingInterval;
this.pingTimeout = data.pingTimeout;
this.onOpen();
this.ping();
// Prolong liveness of socket on heartbeat
this.removeListener('heartbeat', this.onHeartbeat);
this.on('heartbeat', this.onHeartbeat);
};
/**
* Resets ping timeout.
*
* @api private
*/
Socket.prototype.onHeartbeat = function (timeout) {
clearTimeout(this.pingTimeoutTimer);
var self = this;
self.pingTimeoutTimer = setTimeout(function () {
if ('closed' == self.readyState) return;
self.onClose('ping timeout');
}, timeout || (self.pingInterval + self.pingTimeout));
};
/**
* Pings server every `this.pingInterval` and expects response
* within `this.pingTimeout` or closes connection.
*
* @api private
*/
Socket.prototype.ping = function () {
var self = this;
clearTimeout(self.pingIntervalTimer);
self.pingIntervalTimer = setTimeout(function () {
debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
self.sendPacket('ping');
self.onHeartbeat(self.pingTimeout);
}, self.pingInterval);
};
/**
* Flush write buffers.
*
* @api private
*/
Socket.prototype.flush = function () {
if ('closed' != this.readyState && this.transport.writable &&
!this.upgrading && this.writeBuffer.length) {
debug('flushing %d packets in socket', this.writeBuffer.length);
this.transport.send(this.writeBuffer);
this.writeBuffer = [];
}
};
/**
* Sends a message.
*
* @param {String} message.
* @return {Socket} for chaining.
* @api public
*/
Socket.prototype.write =
Socket.prototype.send = function (msg) {
this.sendPacket('message', msg);
return this;
};
/**
* Sends a packet.
*
* @param {String} packet type.
* @param {String} data.
* @api private
*/
Socket.prototype.sendPacket = function (type, data) {
var packet = { type: type, data: data };
this.writeBuffer.push(packet);
this.flush();
};
/**
* Closes the connection.
*
* @api private
*/
Socket.prototype.close = function () {
if ('opening' == this.readyState || 'open' == this.readyState) {
this.onClose('forced close');
debug('socket closing - telling transport to close');
this.transport.close();
}
return this;
};
/**
* Called upon transport error
*
* @api private
*/
Socket.prototype.onError = function (err) {
this.emit('error', err);
this.onClose('transport error', err);
};
/**
* Called upon transport close.
*
* @api private
*/
Socket.prototype.onClose = function (reason, desc) {
if ('closed' != this.readyState) {
debug('socket close with reason: "%s"', reason);
this.readyState = 'closed';
this.emit('close', reason, desc);
this.onclose && this.onclose.call(this);
}
};
/**
* Generates a random uid.
*
* @api private
*/
function rnd () {
return String(Math.random()).substr(5) + String(Math.random()).substr(5);
}