Files
socket.io/lib/socket.js
2020-01-14 22:58:35 +01:00

747 lines
19 KiB
JavaScript

const transports = require("./transports/index");
const Emitter = require("component-emitter");
const debug = require("debug")("engine.io-client:socket");
const index = require("indexof");
const parser = require("engine.io-parser");
const parseuri = require("parseuri");
const parseqs = require("parseqs");
class Socket extends Emitter {
/**
* Socket constructor.
*
* @param {String|Object} uri or options
* @param {Object} options
* @api public
*/
constructor(uri, opts = {}) {
super();
if (uri && "object" === typeof uri) {
opts = uri;
uri = null;
}
if (uri) {
uri = parseuri(uri);
opts.hostname = uri.host;
opts.secure = uri.protocol === "https" || uri.protocol === "wss";
opts.port = uri.port;
if (uri.query) opts.query = uri.query;
} else if (opts.host) {
opts.hostname = parseuri(opts.host).host;
}
this.secure =
null != opts.secure
? opts.secure
: typeof location !== "undefined" && "https:" === location.protocol;
if (opts.hostname && !opts.port) {
// if no port is specified manually, use the protocol default
opts.port = this.secure ? "443" : "80";
}
this.agent = opts.agent || false;
this.hostname =
opts.hostname ||
(typeof location !== "undefined" ? location.hostname : "localhost");
this.port =
opts.port ||
(typeof location !== "undefined" && location.port
? location.port
: this.secure
? 443
: 80);
this.query = opts.query || {};
if ("string" === typeof this.query) this.query = parseqs.decode(this.query);
this.upgrade = false !== opts.upgrade;
this.path = (opts.path || "/engine.io").replace(/\/$/, "") + "/";
this.forceJSONP = !!opts.forceJSONP;
this.jsonp = false !== opts.jsonp;
this.forceBase64 = !!opts.forceBase64;
this.enablesXDR = !!opts.enablesXDR;
this.withCredentials = false !== opts.withCredentials;
this.timestampParam = opts.timestampParam || "t";
this.timestampRequests = opts.timestampRequests;
this.transports = opts.transports || ["polling", "websocket"];
this.transportOptions = opts.transportOptions || {};
this.readyState = "";
this.writeBuffer = [];
this.prevBufferLen = 0;
this.policyPort = opts.policyPort || 843;
this.rememberUpgrade = opts.rememberUpgrade || false;
this.binaryType = null;
this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
this.perMessageDeflate =
false !== opts.perMessageDeflate ? opts.perMessageDeflate || {} : false;
if (true === this.perMessageDeflate) this.perMessageDeflate = {};
if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
this.perMessageDeflate.threshold = 1024;
}
// SSL options for Node.js client
this.pfx = opts.pfx || null;
this.key = opts.key || null;
this.passphrase = opts.passphrase || null;
this.cert = opts.cert || null;
this.ca = opts.ca || null;
this.ciphers = opts.ciphers || null;
this.rejectUnauthorized =
opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
this.forceNode = !!opts.forceNode;
// detect ReactNative environment
this.isReactNative =
typeof navigator !== "undefined" &&
typeof navigator.product === "string" &&
navigator.product.toLowerCase() === "reactnative";
// other options for Node.js or ReactNative client
if (typeof self === "undefined" || this.isReactNative) {
if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
this.extraHeaders = opts.extraHeaders;
}
if (opts.localAddress) {
this.localAddress = opts.localAddress;
}
}
// set on handshake
this.id = null;
this.upgrades = null;
this.pingInterval = null;
this.pingTimeout = null;
// set on heartbeat
this.pingIntervalTimer = null;
this.pingTimeoutTimer = null;
this.open();
}
/**
* Creates transport of the given type.
*
* @param {String} transport name
* @return {Transport}
* @api private
*/
createTransport(name) {
debug('creating transport "%s"', name);
const query = clone(this.query);
// append engine.io protocol identifier
query.EIO = parser.protocol;
// transport name
query.transport = name;
// per-transport options
const options = this.transportOptions[name] || {};
// session id if we already have one
if (this.id) query.sid = this.id;
const transport = new transports[name]({
query: query,
socket: this,
agent: options.agent || this.agent,
hostname: options.hostname || this.hostname,
port: options.port || this.port,
secure: options.secure || this.secure,
path: options.path || this.path,
forceJSONP: options.forceJSONP || this.forceJSONP,
jsonp: options.jsonp || this.jsonp,
forceBase64: options.forceBase64 || this.forceBase64,
enablesXDR: options.enablesXDR || this.enablesXDR,
withCredentials: options.withCredentials || this.withCredentials,
timestampRequests: options.timestampRequests || this.timestampRequests,
timestampParam: options.timestampParam || this.timestampParam,
policyPort: options.policyPort || this.policyPort,
pfx: options.pfx || this.pfx,
key: options.key || this.key,
passphrase: options.passphrase || this.passphrase,
cert: options.cert || this.cert,
ca: options.ca || this.ca,
ciphers: options.ciphers || this.ciphers,
rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
extraHeaders: options.extraHeaders || this.extraHeaders,
forceNode: options.forceNode || this.forceNode,
localAddress: options.localAddress || this.localAddress,
requestTimeout: options.requestTimeout || this.requestTimeout,
protocols: options.protocols || void 0,
isReactNative: this.isReactNative
});
return transport;
}
/**
* Initializes transport to use and starts probe.
*
* @api private
*/
open() {
let transport;
if (
this.rememberUpgrade &&
Socket.priorWebsocketSuccess &&
this.transports.indexOf("websocket") !== -1
) {
transport = "websocket";
} else if (0 === this.transports.length) {
// Emit error on next tick so it can be listened to
const self = this;
setTimeout(function() {
self.emit("error", "No transports available");
}, 0);
return;
} else {
transport = this.transports[0];
}
this.readyState = "opening";
// Retry with the next transport if the transport is disabled (jsonp: false)
try {
transport = this.createTransport(transport);
} catch (e) {
this.transports.shift();
this.open();
return;
}
transport.open();
this.setTransport(transport);
}
/**
* Sets the current transport. Disables the existing one (if any).
*
* @api private
*/
setTransport(transport) {
debug("setting transport %s", transport.name);
const self = this;
if (this.transport) {
debug("clearing existing transport %s", this.transport.name);
this.transport.removeAllListeners();
}
// set up transport
this.transport = transport;
// set up transport listeners
transport
.on("drain", function() {
self.onDrain();
})
.on("packet", function(packet) {
self.onPacket(packet);
})
.on("error", function(e) {
self.onError(e);
})
.on("close", function() {
self.onClose("transport close");
});
}
/**
* Probes a transport.
*
* @param {String} transport name
* @api private
*/
probe(name) {
debug('probing transport "%s"', name);
let transport = this.createTransport(name, { probe: 1 });
let failed = false;
const self = this;
Socket.priorWebsocketSuccess = false;
function onTransportOpen() {
if (self.onlyBinaryUpgrades) {
const upgradeLosesBinary =
!this.supportsBinary && self.transport.supportsBinary;
failed = failed || upgradeLosesBinary;
}
if (failed) return;
debug('probe transport "%s" opened', name);
transport.send([{ type: "ping", data: "probe" }]);
transport.once("packet", function(msg) {
if (failed) return;
if ("pong" === msg.type && "probe" === msg.data) {
debug('probe transport "%s" pong', name);
self.upgrading = true;
self.emit("upgrading", transport);
if (!transport) return;
Socket.priorWebsocketSuccess = "websocket" === transport.name;
debug('pausing current transport "%s"', self.transport.name);
self.transport.pause(function() {
if (failed) return;
if ("closed" === self.readyState) return;
debug("changing transport and sending upgrade packet");
cleanup();
self.setTransport(transport);
transport.send([{ type: "upgrade" }]);
self.emit("upgrade", transport);
transport = null;
self.upgrading = false;
self.flush();
});
} else {
debug('probe transport "%s" failed', name);
const err = new Error("probe error");
err.transport = transport.name;
self.emit("upgradeError", err);
}
});
}
function freezeTransport() {
if (failed) return;
// Any callback called by transport should be ignored since now
failed = true;
cleanup();
transport.close();
transport = null;
}
// Handle any error that happens while probing
function onerror(err) {
const error = new Error("probe error: " + err);
error.transport = transport.name;
freezeTransport();
debug('probe transport "%s" failed because of error: %s', name, err);
self.emit("upgradeError", error);
}
function onTransportClose() {
onerror("transport closed");
}
// When the socket is closed while we're probing
function onclose() {
onerror("socket closed");
}
// When the socket is upgraded while we're probing
function onupgrade(to) {
if (transport && to.name !== transport.name) {
debug('"%s" works - aborting "%s"', to.name, transport.name);
freezeTransport();
}
}
// Remove all listeners on the transport and on self
function cleanup() {
transport.removeListener("open", onTransportOpen);
transport.removeListener("error", onerror);
transport.removeListener("close", onTransportClose);
self.removeListener("close", onclose);
self.removeListener("upgrading", onupgrade);
}
transport.once("open", onTransportOpen);
transport.once("error", onerror);
transport.once("close", onTransportClose);
this.once("close", onclose);
this.once("upgrading", onupgrade);
transport.open();
}
/**
* Called when connection is deemed open.
*
* @api public
*/
onOpen() {
debug("socket open");
this.readyState = "open";
Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
this.emit("open");
this.flush();
// we check for `readyState` in case an `open`
// listener already closed the socket
if ("open" === this.readyState && this.upgrade && this.transport.pause) {
debug("starting upgrade probes");
let i = 0;
const l = this.upgrades.length;
for (; i < l; i++) {
this.probe(this.upgrades[i]);
}
}
}
/**
* Handles a packet.
*
* @api private
*/
onPacket(packet) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
this.emit("packet", packet);
// Socket is live - any packet counts
this.emit("heartbeat");
switch (packet.type) {
case "open":
this.onHandshake(JSON.parse(packet.data));
break;
case "pong":
this.setPing();
this.emit("pong");
break;
case "error":
const err = new Error("server error");
err.code = packet.data;
this.onError(err);
break;
case "message":
this.emit("data", packet.data);
this.emit("message", packet.data);
break;
}
} else {
debug('packet received with socket readyState "%s"', this.readyState);
}
}
/**
* Called upon handshake completion.
*
* @param {Object} handshake obj
* @api private
*/
onHandshake(data) {
this.emit("handshake", data);
this.id = data.sid;
this.transport.query.sid = data.sid;
this.upgrades = this.filterUpgrades(data.upgrades);
this.pingInterval = data.pingInterval;
this.pingTimeout = data.pingTimeout;
this.onOpen();
// In case open handler closes socket
if ("closed" === this.readyState) return;
this.setPing();
// Prolong liveness of socket on heartbeat
this.removeListener("heartbeat", this.onHeartbeat);
this.on("heartbeat", this.onHeartbeat);
}
/**
* Resets ping timeout.
*
* @api private
*/
onHeartbeat(timeout) {
clearTimeout(this.pingTimeoutTimer);
const self = this;
self.pingTimeoutTimer = setTimeout(function() {
if ("closed" === self.readyState) return;
self.onClose("ping timeout");
}, timeout || self.pingInterval + self.pingTimeout);
}
/**
* Pings server every `this.pingInterval` and expects response
* within `this.pingTimeout` or closes connection.
*
* @api private
*/
setPing() {
const self = this;
clearTimeout(self.pingIntervalTimer);
self.pingIntervalTimer = setTimeout(function() {
debug(
"writing ping packet - expecting pong within %sms",
self.pingTimeout
);
self.ping();
self.onHeartbeat(self.pingTimeout);
}, self.pingInterval);
}
/**
* Sends a ping packet.
*
* @api private
*/
ping() {
const self = this;
this.sendPacket("ping", function() {
self.emit("ping");
});
}
/**
* Called on `drain` event
*
* @api private
*/
onDrain() {
this.writeBuffer.splice(0, this.prevBufferLen);
// setting prevBufferLen = 0 is very important
// for example, when upgrading, upgrade packet is sent over,
// and a nonzero prevBufferLen could cause problems on `drain`
this.prevBufferLen = 0;
if (0 === this.writeBuffer.length) {
this.emit("drain");
} else {
this.flush();
}
}
/**
* Flush write buffers.
*
* @api private
*/
flush() {
if (
"closed" !== this.readyState &&
this.transport.writable &&
!this.upgrading &&
this.writeBuffer.length
) {
debug("flushing %d packets in socket", this.writeBuffer.length);
this.transport.send(this.writeBuffer);
// keep track of current length of writeBuffer
// splice writeBuffer and callbackBuffer on `drain`
this.prevBufferLen = this.writeBuffer.length;
this.emit("flush");
}
}
/**
* Sends a message.
*
* @param {String} message.
* @param {Function} callback function.
* @param {Object} options.
* @return {Socket} for chaining.
* @api public
*/
write(msg, options, fn) {
this.sendPacket("message", msg, options, fn);
return this;
}
send(msg, options, fn) {
this.sendPacket("message", msg, options, fn);
return this;
}
/**
* Sends a packet.
*
* @param {String} packet type.
* @param {String} data.
* @param {Object} options.
* @param {Function} callback function.
* @api private
*/
sendPacket(type, data, options, fn) {
if ("function" === typeof data) {
fn = data;
data = undefined;
}
if ("function" === typeof options) {
fn = options;
options = null;
}
if ("closing" === this.readyState || "closed" === this.readyState) {
return;
}
options = options || {};
options.compress = false !== options.compress;
const packet = {
type: type,
data: data,
options: options
};
this.emit("packetCreate", packet);
this.writeBuffer.push(packet);
if (fn) this.once("flush", fn);
this.flush();
}
/**
* Closes the connection.
*
* @api private
*/
close() {
const self = this;
if ("opening" === this.readyState || "open" === this.readyState) {
this.readyState = "closing";
if (this.writeBuffer.length) {
this.once("drain", function() {
if (this.upgrading) {
waitForUpgrade();
} else {
close();
}
});
} else if (this.upgrading) {
waitForUpgrade();
} else {
close();
}
}
function close() {
self.onClose("forced close");
debug("socket closing - telling transport to close");
self.transport.close();
}
function cleanupAndClose() {
self.removeListener("upgrade", cleanupAndClose);
self.removeListener("upgradeError", cleanupAndClose);
close();
}
function waitForUpgrade() {
// wait for upgrade to finish since we can't send packets while pausing a transport
self.once("upgrade", cleanupAndClose);
self.once("upgradeError", cleanupAndClose);
}
return this;
}
/**
* Called upon transport error
*
* @api private
*/
onError(err) {
debug("socket error %j", err);
Socket.priorWebsocketSuccess = false;
this.emit("error", err);
this.onClose("transport error", err);
}
/**
* Called upon transport close.
*
* @api private
*/
onClose(reason, desc) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket close with reason: "%s"', reason);
const self = this;
// clear timers
clearTimeout(this.pingIntervalTimer);
clearTimeout(this.pingTimeoutTimer);
// stop event from firing again for transport
this.transport.removeAllListeners("close");
// ensure transport won't stay open
this.transport.close();
// ignore further transport communication
this.transport.removeAllListeners();
// set ready state
this.readyState = "closed";
// clear session id
this.id = null;
// emit close event
this.emit("close", reason, desc);
// clean buffers after, so users can still
// grab the buffers on `close` event
self.writeBuffer = [];
self.prevBufferLen = 0;
}
}
/**
* Filters upgrades, returning only those matching client transports.
*
* @param {Array} server upgrades
* @api private
*
*/
filterUpgrades(upgrades) {
const filteredUpgrades = [];
let i = 0;
const j = upgrades.length;
for (; i < j; i++) {
if (~index(this.transports, upgrades[i]))
filteredUpgrades.push(upgrades[i]);
}
return filteredUpgrades;
}
}
Socket.priorWebsocketSuccess = false;
/**
* Protocol version.
*
* @api public
*/
Socket.protocol = parser.protocol; // this is an int
function clone(obj) {
const o = {};
for (let i in obj) {
if (obj.hasOwnProperty(i)) {
o[i] = obj[i];
}
}
return o;
}
module.exports = Socket;