/*! * socket.io-node * Copyright(c) 2011 LearnBoost * MIT Licensed */ /** * Module dependencies. */ var parser = require('./parser'); /** * Expose the constructor. */ exports = module.exports = Transport; /** * Transport constructor. * * @api public */ function Transport (mng, data) { this.manager = mng; this.id = data.id; this.paused = true; this.disconnected = false; this.drained = true; }; /** * Sets the corresponding request object. */ Transport.prototype.__defineSetter__('request', function (req) { this.log.debug('setting request', req.method, req.url); this.handleRequest(req); }); /** * Access the logger. * * @api public */ Transport.prototype.__defineGetter__('log', function () { return this.manager.log; }); /** * Access the store. * * @api public */ Transport.prototype.__defineGetter__('store', function () { return this.manager.store; }); /** * Handles a request when it's set. * * @api private */ Transport.prototype.handleRequest = function (req) { this.req = req; if (req.method == 'GET') { this.socket = req.socket; this.open = true; this.drained = true; this.log.debug('publishing that', this.id, 'connected'); var self = this; this.store.publish('open:' + this.id, function () { self.store.once('open:' + self.id, function () { self.log.info('request for existing session connection change'); self.close(); self.clearTimeouts(); self.clearHandlers(); }); if (!self.paused) { self.subscribe(); } }); this.setHandlers(); this.onSocketConnect(); } }; /** * Called when a connection is first set. * * @api private */ Transport.prototype.onSocketConnect = function () { }; /** * Sets transport handlers * * @api private */ Transport.prototype.setHandlers = function () { var self = this; this.store.once('disconnect-force:' + this.id, function () { self.onForcedDisconnect(); }); this.store.on('heartbeat-clear:' + this.id, function () { self.clearHeartbeatTimeout(); self.setHeartbeatInterval(); }); this.store.on('volatile:' + this.id, function (packet) { self.writeVolatile(packet); }); this.bound = { end: this.onSocketEnd.bind(this) , close: this.onSocketClose.bind(this) , error: this.onSocketError.bind(this) , drain: this.onSocketDrain.bind(this) }; this.socket.on('end', this.bound.end); this.socket.on('close', this.bound.close); this.socket.on('error', this.bound.error); this.socket.on('drain', this.bound.drain); }; /** * Removes transport handlers * * @api private */ Transport.prototype.clearHandlers = function () { this.store.unsubscribe('disconnect-force:' + this.id); this.store.unsubscribe('heartbeat-clear:' + this.id); this.store.unsubscribe('volatile:' + this.id); this.socket.removeListener('end', this.bound.end); this.socket.removeListener('close', this.bound.close); this.socket.removeListener('error', this.bound.error); this.socket.removeListener('drain', this.bound.drain); }; /** * Called when the connection dies * * @api private */ Transport.prototype.onSocketEnd = function () { // we check that the socket wasn't swapped // we don't want to sever a connection that's not active, since we don't kill // inactive sockets that the browser might reuse for other purposes this.end(false, 'socket end'); }; /** * Called when the connection dies * * @api private */ Transport.prototype.onSocketClose = function (error) { this.end(false, error ? 'socket error' : 'socket close'); }; /** * Called when the connection has an error. * * @api private */ Transport.prototype.onSocketError = function (err) { if (this.open) { this.socket.destroy(); this.onClose(); } this.log.info('socket error ' + err.stack); }; /** * Called when the connection is drained. * * @api private */ Transport.prototype.onSocketDrain = function () { this.drained = true; }; /** * Called upon a forced disconnection. * * @api private */ Transport.prototype.onForcedDisconnect = function () { if (!this.disconnected) { this.log.info('transport end by forced client disconnection'); if (this.open) { this.packet({ type: 'disconnect' }); } this.end(true); } }; /** * Sets the close timeout. */ Transport.prototype.setCloseTimeout = function () { if (!this.closeTimeout) { var self = this; this.closeTimeout = setTimeout(function () { self.log.debug('fired close timeout for client', self.id); self.closeTimeout = null; self.end(false, 'close timeout'); }, this.manager.get('close timeout') * 1000); this.log.debug('set close timeout for client', this.id); } }; /** * Clears the close timeout. */ Transport.prototype.clearCloseTimeout = function () { if (this.closeTimeout) { clearTimeout(this.closeTimeout); this.closeTimeout = null; this.log.debug('cleared close timeout for client', this.id); } }; /** * Sets the heartbeat timeout */ Transport.prototype.setHeartbeatTimeout = function () { if (!this.heartbeatTimeout) { var self = this; this.heartbeatTimeout = setTimeout(function () { self.log.debug('fired heartbeat timeout for client', self.id); self.heartbeatTimeout = null; self.end(false, 'heartbeat timeout'); }, this.manager.get('heartbeat timeout') * 1000); this.log.debug('set heartbeat timeout for client', this.id); } }; /** * Clears the heartbeat timeout * * @param text */ Transport.prototype.clearHeartbeatTimeout = function () { if (this.heartbeatTimeout) { clearTimeout(this.heartbeatTimeout); this.heartbeatTimeout = null; this.log.debug('cleared heartbeat timeout for client', this.id); } }; /** * Sets the heartbeat interval. To be called when a connection opens and when * a heartbeat is received. * * @api private */ Transport.prototype.setHeartbeatInterval = function () { if (!this.heartbeatTimeout) { var self = this; this.heartbeatInterval = setTimeout(function () { self.heartbeat(); }, this.manager.get('heartbeat interval') * 1000); this.log.debug('set heartbeat interval for client', this.id); } }; /** * Clears all timeouts. * * @api private */ Transport.prototype.clearTimeouts = function () { this.clearCloseTimeout(); this.clearHeartbeatTimeout(); this.clearHeartbeatInterval(); }; /** * Sends a heartbeat * * @api private */ Transport.prototype.heartbeat = function () { if (this.open) { this.log.debug('emitting heartbeat for client', this.id); this.packet({ type: 'heartbeat' }); this.setHeartbeatTimeout(); } return this; }; /** * Handles a message. * * @param {Object} packet object * @api private */ Transport.prototype.onMessage = function (packet) { if ('heartbeat' == packet.type) { this.log.debug('got heartbeat packet'); this.store.heartbeat(this.id); } else if ('disconnect' == packet.type && packet.endpoint == '') { this.log.debug('got disconnection packet'); this.store.disconnect(this.id, true); } else { this.log.debug('got packet'); if (packet.id && packet.ack != 'data') { this.log.debug('acknowledging packet automatically'); this.store.client(this.id).publish(parser.encodePacket({ type: 'ack' , ackId: packet.id , endpoint: packet.endpoint || '' })); } this.store.message(this.id, packet); } }; /** * Clears the heartbeat interval * * @api private */ Transport.prototype.clearHeartbeatInterval = function () { if (this.heartbeatInterval) { clearTimeout(this.heartbeatInterval); this.heartbeatInterval = null; this.log.debug('cleared heartbeat interval for client', this.id); } }; /** * Finishes the connection and makes sure client doesn't reopen * * @api private */ Transport.prototype.disconnect = function (reason) { this.packet({ type: 'disconnect' }); this.end(false, reason); return this; }; /** * Closes the connection. * * @api private */ Transport.prototype.close = function () { if (this.open) { this.doClose(); this.onClose(); } }; /** * Called upon a connection close. * * @api private */ Transport.prototype.onClose = function () { if (this.open) { this.setCloseTimeout(); this.unsubscribe(); this.open = false; } }; /** * Cleans up the connection, considers the client disconnected. * * @api private */ Transport.prototype.end = function (forced, reason) { if (!this.disconnected) { this.log.info('ending socket'); this.close(); this.clearTimeouts(); if (!forced) this.store.disconnect(this.id, false, reason); this.disconnected = true; } }; /** * Signals that the transport can start flushing buffers. * * @api public */ Transport.prototype.resume = function () { if (!this.disconnected) { this.paused = false; this.setHeartbeatInterval(); this.subscribe(); } return this; }; /** * Signals that the transport should pause and buffer data. * * @api public */ Transport.prototype.pause = function () { this.paused = true; return this; }; /** * Writes an error packet with the specified reason and advice. * * @param {Number} advice * @param {Number} reason * @api public */ Transport.prototype.error = function (reason, advice) { this.packet({ type: 'error' , reason: reason , advice: advice }); this.log.warn(reason, advice ? ('client should ' + advice) : ''); this.end(false, 'error'); }; /** * Write a packet. * * @api public */ Transport.prototype.packet = function (obj) { return this.write(parser.encodePacket(obj)); }; /** * Subscribe client. * * @api private */ Transport.prototype.subscribe = function () { if (!this.subscribed) { this.log.debug('subscribing', this.id); var self = this; // subscribe to buffered + normal messages this.store.client(this.id).consume(function (payload, packet) { if (payload) { self.payload(payload); } else { self.write(packet); } }); this.subscribed = true; } }; /** * Unsubscribe client. * * @api private */ Transport.prototype.unsubscribe = function () { this.log.info('unsubscribing', this.id); this.store.client(this.id).pause(); this.subscribed = false; }; /** * Writes a volatile message. * * @api private */ Transport.prototype.writeVolatile = function (msg) { if (this.open) { if (this.drained) { this.write(msg); } else { this.log.debug('ignoring volatile packet, buffer not drained'); } } else { this.log.debug('ignoring volatile packet, transport not open'); } };