mirror of
https://github.com/socketio/socket.io.git
synced 2026-04-30 03:00:39 -04:00
This method does what engine.attach use to do but is now on the instantiated server instance. This makes it possible to create engine.io servers without yet attaching them to any active http server. When creating a project which will server static files/pages alongside engine.io on the same domain/process this makes it easier to separate logic into files without having to pass around the http server.
376 lines
8.6 KiB
JavaScript
376 lines
8.6 KiB
JavaScript
|
|
/**
|
|
* Module dependencies.
|
|
*/
|
|
|
|
var qs = require('querystring')
|
|
, parse = require('url').parse
|
|
, readFileSync = require('fs').readFileSync
|
|
, crypto = require('crypto')
|
|
, base64id = require('base64id')
|
|
, transports = require('./transports')
|
|
, EventEmitter = require('events').EventEmitter
|
|
, Socket = require('./socket')
|
|
, WebSocketServer = require('ws').Server
|
|
, debug = require('debug')('engine');
|
|
|
|
/**
|
|
* Module exports.
|
|
*/
|
|
|
|
module.exports = Server;
|
|
|
|
/**
|
|
* Server constructor.
|
|
*
|
|
* @param {Object} options
|
|
* @api public
|
|
*/
|
|
|
|
function Server(opts){
|
|
this.clients = {};
|
|
this.clientsCount = 0;
|
|
|
|
opts = opts || {};
|
|
this.pingTimeout = opts.pingTimeout || 60000;
|
|
this.pingInterval = opts.pingInterval || 25000;
|
|
this.upgradeTimeout = opts.upgradeTimeout || 10000;
|
|
this.maxHttpBufferSize = opts.maxHttpBufferSize || 10E7;
|
|
this.transports = opts.transports || Object.keys(transports);
|
|
this.allowUpgrades = false !== opts.allowUpgrades;
|
|
this.allowRequest = opts.allowRequest;
|
|
this.cookie = false !== opts.cookie ? (opts.cookie || 'io') : false;
|
|
|
|
// initialize websocket server
|
|
if (~this.transports.indexOf('websocket')) {
|
|
this.ws = new WebSocketServer({ noServer: true, clientTracking: false });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Protocol errors mappings.
|
|
*/
|
|
|
|
Server.errors = {
|
|
UNKNOWN_TRANSPORT: 0,
|
|
UNKNOWN_SID: 1,
|
|
BAD_HANDSHAKE_METHOD: 2,
|
|
BAD_REQUEST: 3
|
|
};
|
|
|
|
Server.errorMessages = {
|
|
0: 'Transport unknown',
|
|
1: 'Session ID unknown',
|
|
2: 'Bad handshake method',
|
|
3: 'Bad request'
|
|
};
|
|
|
|
/**
|
|
* Inherits from EventEmitter.
|
|
*/
|
|
|
|
Server.prototype.__proto__ = EventEmitter.prototype;
|
|
|
|
/**
|
|
* Hash of open clients.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Server.prototype.clients;
|
|
|
|
/**
|
|
* Returns a list of available transports for upgrade given a certain transport.
|
|
*
|
|
* @return {Array}
|
|
* @api public
|
|
*/
|
|
|
|
Server.prototype.upgrades = function(transport){
|
|
if (!this.allowUpgrades) return [];
|
|
return transports[transport].upgradesTo || [];
|
|
};
|
|
|
|
/**
|
|
* Verifies a request.
|
|
*
|
|
* @param {http.ServerRequest}
|
|
* @return {Boolean} whether the request is valid
|
|
* @api private
|
|
*/
|
|
|
|
Server.prototype.verify = function(req, upgrade, fn){
|
|
// transport check
|
|
var transport = req._query.transport;
|
|
if (!~this.transports.indexOf(transport)) {
|
|
debug('unknown transport "%s"', transport);
|
|
return fn(Server.errors.UNKNOWN_TRANSPORT, false);
|
|
}
|
|
|
|
// sid check
|
|
var sid = req._query.sid;
|
|
if (sid) {
|
|
if (!this.clients.hasOwnProperty(sid))
|
|
return fn(Server.errors.UNKNOWN_SID, false);
|
|
if (!upgrade && this.clients[sid].transport.name !== transport) {
|
|
debug('bad request: unexpected transport without upgrade');
|
|
return fn(Server.errors.BAD_REQUEST, false);
|
|
}
|
|
} else {
|
|
// handshake is GET only
|
|
if ('GET' != req.method) return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);
|
|
if (!this.allowRequest) return fn(null, true);
|
|
return this.allowRequest(req, fn);
|
|
}
|
|
|
|
fn(null, true);
|
|
};
|
|
|
|
/**
|
|
* Prepares a request by processing the query string.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Server.prototype.prepare = function(req){
|
|
// try to leverage pre-existing `req._query` (e.g: from connect)
|
|
if (!req._query) {
|
|
req._query = ~req.url.indexOf('?') ? qs.parse(parse(req.url).query) : {};
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Closes all clients.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Server.prototype.close = function(){
|
|
debug('closing all open clients');
|
|
for (var i in this.clients) {
|
|
this.clients[i].close();
|
|
}
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Handles an Engine.IO HTTP request.
|
|
*
|
|
* @param {http.ServerRequest} request
|
|
* @param {http.ServerResponse|http.OutgoingMessage} response
|
|
* @api public
|
|
*/
|
|
|
|
Server.prototype.handleRequest = function(req, res){
|
|
debug('handling "%s" http request "%s"', req.method, req.url);
|
|
this.prepare(req);
|
|
req.res = res;
|
|
|
|
var self = this;
|
|
this.verify(req, false, function(err, success) {
|
|
if (!success) {
|
|
sendErrorMessage(res, err);
|
|
return;
|
|
}
|
|
|
|
if (req._query.sid) {
|
|
debug('setting new request for existing client');
|
|
self.clients[req._query.sid].transport.onRequest(req);
|
|
} else {
|
|
self.handshake(req._query.transport, req);
|
|
}
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Sends an Engine.IO Error Message
|
|
*
|
|
* @param {http.ServerResponse} response
|
|
* @param {code} error code
|
|
* @api private
|
|
*/
|
|
|
|
function sendErrorMessage(res, code) {
|
|
res.writeHead(400, { 'Content-Type': 'application/json' });
|
|
res.end(JSON.stringify({
|
|
code: code,
|
|
message: Server.errorMessages[code]
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Handshakes a new client.
|
|
*
|
|
* @param {String} transport name
|
|
* @param {Object} request object
|
|
* @api private
|
|
*/
|
|
|
|
Server.prototype.handshake = function(transport, req){
|
|
var id = base64id.generateId();
|
|
|
|
debug('handshaking client "%s"', id);
|
|
|
|
var transportName = transport;
|
|
try {
|
|
var transport = new transports[transport](req);
|
|
if ('polling' == transportName) {
|
|
transport.maxHttpBufferSize = this.maxHttpBufferSize;
|
|
}
|
|
|
|
if (req._query && req._query.b64) {
|
|
transport.supportsBinary = false;
|
|
} else {
|
|
transport.supportsBinary = true;
|
|
}
|
|
}
|
|
catch (e) {
|
|
sendErrorMessage(req.res, Server.errors.BAD_REQUEST);
|
|
return;
|
|
}
|
|
var socket = new Socket(id, this, transport, req);
|
|
var self = this;
|
|
|
|
if (false !== this.cookie) {
|
|
transport.on('headers', function(headers){
|
|
headers['Set-Cookie'] = self.cookie + '=' + id;
|
|
});
|
|
}
|
|
|
|
transport.onRequest(req);
|
|
|
|
this.clients[id] = socket;
|
|
this.clientsCount++;
|
|
|
|
socket.once('close', function(){
|
|
delete self.clients[id];
|
|
self.clientsCount--;
|
|
});
|
|
|
|
this.emit('connection', socket);
|
|
};
|
|
|
|
/**
|
|
* Handles an Engine.IO HTTP Upgrade.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Server.prototype.handleUpgrade = function(req, socket, head){
|
|
this.prepare(req);
|
|
|
|
var self = this;
|
|
this.verify(req, true, function(err, success) {
|
|
if (!success) {
|
|
socket.end();
|
|
return;
|
|
}
|
|
|
|
// delegate to ws
|
|
self.ws.handleUpgrade(req, socket, head, function(conn){
|
|
self.onWebSocket(req, conn);
|
|
});
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Called upon a ws.io connection.
|
|
*
|
|
* @param {ws.Socket} websocket
|
|
* @api private
|
|
*/
|
|
|
|
Server.prototype.onWebSocket = function(req, socket){
|
|
if (!transports[req._query.transport].prototype.handlesUpgrades) {
|
|
debug('transport doesnt handle upgraded requests');
|
|
socket.close();
|
|
return;
|
|
}
|
|
|
|
// get client id
|
|
var id = req._query.sid;
|
|
|
|
// keep a reference to the ws.Socket
|
|
req.websocket = socket;
|
|
|
|
if (id) {
|
|
if (!this.clients[id]) {
|
|
debug('upgrade attempt for closed client');
|
|
socket.close();
|
|
} else if (this.clients[id].upgraded) {
|
|
debug('transport had already been upgraded');
|
|
socket.close();
|
|
} else {
|
|
debug('upgrading existing transport');
|
|
var transport = new transports[req._query.transport](req);
|
|
if (req._query && req._query.b64) {
|
|
transport.supportsBinary = false;
|
|
} else {
|
|
transport.supportsBinary = true;
|
|
}
|
|
this.clients[id].maybeUpgrade(transport);
|
|
}
|
|
} else {
|
|
this.handshake(req._query.transport, req);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Captures upgrade requests for a http.Server.
|
|
*
|
|
* @param {http.Server} server
|
|
* @param {Object} options
|
|
* @api public
|
|
*/
|
|
|
|
Server.prototype.attach = function(server, options){
|
|
var self = this;
|
|
var options = options || {};
|
|
var path = (options.path || '/engine.io').replace(/\/$/, '');
|
|
|
|
var destroyUpgrade = (options.destroyUpgrade !== undefined) ? options.destroyUpgrade : true;
|
|
var destroyUpgradeTimeout = options.destroyUpgradeTimeout || 1000;
|
|
|
|
// normalize path
|
|
path += '/';
|
|
|
|
function check (req) {
|
|
return path == req.url.substr(0, path.length);
|
|
}
|
|
|
|
// cache and clean up listeners
|
|
var listeners = server.listeners('request').slice(0);
|
|
server.removeAllListeners('request');
|
|
server.on('close', self.close.bind(self));
|
|
|
|
// add request handler
|
|
server.on('request', function(req, res){
|
|
if (check(req)) {
|
|
debug('intercepting request for path "%s"', path);
|
|
self.handleRequest(req, res);
|
|
} else {
|
|
for (var i = 0, l = listeners.length; i < l; i++) {
|
|
listeners[i].call(server, req, res);
|
|
}
|
|
}
|
|
});
|
|
|
|
if(~self.transports.indexOf('websocket')) {
|
|
server.on('upgrade', function (req, socket, head) {
|
|
if (check(req)) {
|
|
self.handleUpgrade(req, socket, head);
|
|
} else if (false !== options.destroyUpgrade) {
|
|
// default node behavior is to disconnect when no handlers
|
|
// but by adding a handler, we prevent that
|
|
// and if no eio thing handles the upgrade
|
|
// then the socket needs to die!
|
|
setTimeout(function() {
|
|
if (socket.writable && socket.bytesWritten <= 0) {
|
|
return socket.end();
|
|
}
|
|
}, options.destroyUpgradeTimeout);
|
|
}
|
|
});
|
|
}
|
|
};
|