Files
socket.io/lib/socket.js
Damien Arrachequesne 2f5c948abe fix(react-native): restrict the list of options for the WebSocket object
Only 'headers' and 'localAddress' options are supported by the
WebSocket implementation in React Native.

The following message was printed to the console:

> Unrecognized WebSocket connection option(s) `agent`, `perMessageDeflate`, `pfx`, `key`, `passphrase`, `cert`, `ca`, `ciphers`, `rejectUnauthorized`. Did you mean to put these under `headers`?

Reference: https://reactnative.dev/docs/network.html#websocket-support
2020-05-25 07:34:17 +02:00

667 lines
16 KiB
JavaScript

const transports = require("./transports/index");
const Emitter = require("component-emitter");
const debug = require("debug")("engine.io-client:socket");
const parser = require("engine.io-parser");
const parseuri = require("parseuri");
const parseqs = require("parseqs");
class Socket extends Emitter {
/**
* Socket constructor.
*
* @param {String|Object} uri or options
* @param {Object} options
* @api public
*/
constructor(uri, opts = {}) {
super();
if (uri && "object" === typeof uri) {
opts = uri;
uri = null;
}
if (uri) {
uri = parseuri(uri);
opts.hostname = uri.host;
opts.secure = uri.protocol === "https" || uri.protocol === "wss";
opts.port = uri.port;
if (uri.query) opts.query = uri.query;
} else if (opts.host) {
opts.hostname = parseuri(opts.host).host;
}
this.secure =
null != opts.secure
? opts.secure
: typeof location !== "undefined" && "https:" === location.protocol;
if (opts.hostname && !opts.port) {
// if no port is specified manually, use the protocol default
opts.port = this.secure ? "443" : "80";
}
this.hostname =
opts.hostname ||
(typeof location !== "undefined" ? location.hostname : "localhost");
this.port =
opts.port ||
(typeof location !== "undefined" && location.port
? location.port
: this.secure
? 443
: 80);
this.transports = opts.transports || ["polling", "websocket"];
this.readyState = "";
this.writeBuffer = [];
this.prevBufferLen = 0;
this.opts = Object.assign(
{
path: "/engine.io",
agent: false,
upgrade: true,
jsonp: true,
timestampParam: "t",
policyPort: 843,
rememberUpgrade: false,
rejectUnauthorized: true,
perMessageDeflate: {
threshold: 1024
},
transportOptions: {}
},
opts
);
this.opts.path = this.opts.path.replace(/\/$/, "") + "/";
if (typeof this.opts.query === "string") {
this.opts.query = parseqs.decode(this.opts.query);
}
// set on handshake
this.id = null;
this.upgrades = null;
this.pingInterval = null;
this.pingTimeout = null;
// set on heartbeat
this.pingTimeoutTimer = null;
this.open();
}
/**
* Creates transport of the given type.
*
* @param {String} transport name
* @return {Transport}
* @api private
*/
createTransport(name) {
debug('creating transport "%s"', name);
const query = clone(this.opts.query);
// append engine.io protocol identifier
query.EIO = parser.protocol;
// transport name
query.transport = name;
// session id if we already have one
if (this.id) query.sid = this.id;
const opts = Object.assign(
{},
this.opts.transportOptions[name],
this.opts,
{
query,
socket: this,
hostname: this.hostname,
secure: this.secure,
port: this.port
}
);
debug("options: %j", opts);
return new transports[name](opts);
}
/**
* Initializes transport to use and starts probe.
*
* @api private
*/
open() {
let transport;
if (
this.opts.rememberUpgrade &&
Socket.priorWebsocketSuccess &&
this.transports.indexOf("websocket") !== -1
) {
transport = "websocket";
} else if (0 === this.transports.length) {
// Emit error on next tick so it can be listened to
const self = this;
setTimeout(function() {
self.emit("error", "No transports available");
}, 0);
return;
} else {
transport = this.transports[0];
}
this.readyState = "opening";
// Retry with the next transport if the transport is disabled (jsonp: false)
try {
transport = this.createTransport(transport);
} catch (e) {
debug("error while creating transport: %s", e);
this.transports.shift();
this.open();
return;
}
transport.open();
this.setTransport(transport);
}
/**
* Sets the current transport. Disables the existing one (if any).
*
* @api private
*/
setTransport(transport) {
debug("setting transport %s", transport.name);
const self = this;
if (this.transport) {
debug("clearing existing transport %s", this.transport.name);
this.transport.removeAllListeners();
}
// set up transport
this.transport = transport;
// set up transport listeners
transport
.on("drain", function() {
self.onDrain();
})
.on("packet", function(packet) {
self.onPacket(packet);
})
.on("error", function(e) {
self.onError(e);
})
.on("close", function() {
self.onClose("transport close");
});
}
/**
* Probes a transport.
*
* @param {String} transport name
* @api private
*/
probe(name) {
debug('probing transport "%s"', name);
let transport = this.createTransport(name, { probe: 1 });
let failed = false;
const self = this;
Socket.priorWebsocketSuccess = false;
function onTransportOpen() {
if (self.onlyBinaryUpgrades) {
const upgradeLosesBinary =
!this.supportsBinary && self.transport.supportsBinary;
failed = failed || upgradeLosesBinary;
}
if (failed) return;
debug('probe transport "%s" opened', name);
transport.send([{ type: "ping", data: "probe" }]);
transport.once("packet", function(msg) {
if (failed) return;
if ("pong" === msg.type && "probe" === msg.data) {
debug('probe transport "%s" pong', name);
self.upgrading = true;
self.emit("upgrading", transport);
if (!transport) return;
Socket.priorWebsocketSuccess = "websocket" === transport.name;
debug('pausing current transport "%s"', self.transport.name);
self.transport.pause(function() {
if (failed) return;
if ("closed" === self.readyState) return;
debug("changing transport and sending upgrade packet");
cleanup();
self.setTransport(transport);
transport.send([{ type: "upgrade" }]);
self.emit("upgrade", transport);
transport = null;
self.upgrading = false;
self.flush();
});
} else {
debug('probe transport "%s" failed', name);
const err = new Error("probe error");
err.transport = transport.name;
self.emit("upgradeError", err);
}
});
}
function freezeTransport() {
if (failed) return;
// Any callback called by transport should be ignored since now
failed = true;
cleanup();
transport.close();
transport = null;
}
// Handle any error that happens while probing
function onerror(err) {
const error = new Error("probe error: " + err);
error.transport = transport.name;
freezeTransport();
debug('probe transport "%s" failed because of error: %s', name, err);
self.emit("upgradeError", error);
}
function onTransportClose() {
onerror("transport closed");
}
// When the socket is closed while we're probing
function onclose() {
onerror("socket closed");
}
// When the socket is upgraded while we're probing
function onupgrade(to) {
if (transport && to.name !== transport.name) {
debug('"%s" works - aborting "%s"', to.name, transport.name);
freezeTransport();
}
}
// Remove all listeners on the transport and on self
function cleanup() {
transport.removeListener("open", onTransportOpen);
transport.removeListener("error", onerror);
transport.removeListener("close", onTransportClose);
self.removeListener("close", onclose);
self.removeListener("upgrading", onupgrade);
}
transport.once("open", onTransportOpen);
transport.once("error", onerror);
transport.once("close", onTransportClose);
this.once("close", onclose);
this.once("upgrading", onupgrade);
transport.open();
}
/**
* Called when connection is deemed open.
*
* @api public
*/
onOpen() {
debug("socket open");
this.readyState = "open";
Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
this.emit("open");
this.flush();
// we check for `readyState` in case an `open`
// listener already closed the socket
if (
"open" === this.readyState &&
this.opts.upgrade &&
this.transport.pause
) {
debug("starting upgrade probes");
let i = 0;
const l = this.upgrades.length;
for (; i < l; i++) {
this.probe(this.upgrades[i]);
}
}
}
/**
* Handles a packet.
*
* @api private
*/
onPacket(packet) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
this.emit("packet", packet);
// Socket is live - any packet counts
this.emit("heartbeat");
switch (packet.type) {
case "open":
this.onHandshake(JSON.parse(packet.data));
break;
case "ping":
this.resetPingTimeout();
this.sendPacket("pong");
this.emit("pong");
break;
case "error":
const err = new Error("server error");
err.code = packet.data;
this.onError(err);
break;
case "message":
this.emit("data", packet.data);
this.emit("message", packet.data);
break;
}
} else {
debug('packet received with socket readyState "%s"', this.readyState);
}
}
/**
* Called upon handshake completion.
*
* @param {Object} handshake obj
* @api private
*/
onHandshake(data) {
this.emit("handshake", data);
this.id = data.sid;
this.transport.query.sid = data.sid;
this.upgrades = this.filterUpgrades(data.upgrades);
this.pingInterval = data.pingInterval;
this.pingTimeout = data.pingTimeout;
this.onOpen();
// In case open handler closes socket
if ("closed" === this.readyState) return;
this.resetPingTimeout();
}
/**
* Sets and resets ping timeout timer based on server pings.
*
* @api private
*/
resetPingTimeout() {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = setTimeout(() => {
this.onClose("ping timeout");
}, this.pingInterval + this.pingTimeout);
}
/**
* Called on `drain` event
*
* @api private
*/
onDrain() {
this.writeBuffer.splice(0, this.prevBufferLen);
// setting prevBufferLen = 0 is very important
// for example, when upgrading, upgrade packet is sent over,
// and a nonzero prevBufferLen could cause problems on `drain`
this.prevBufferLen = 0;
if (0 === this.writeBuffer.length) {
this.emit("drain");
} else {
this.flush();
}
}
/**
* Flush write buffers.
*
* @api private
*/
flush() {
if (
"closed" !== this.readyState &&
this.transport.writable &&
!this.upgrading &&
this.writeBuffer.length
) {
debug("flushing %d packets in socket", this.writeBuffer.length);
this.transport.send(this.writeBuffer);
// keep track of current length of writeBuffer
// splice writeBuffer and callbackBuffer on `drain`
this.prevBufferLen = this.writeBuffer.length;
this.emit("flush");
}
}
/**
* Sends a message.
*
* @param {String} message.
* @param {Function} callback function.
* @param {Object} options.
* @return {Socket} for chaining.
* @api public
*/
write(msg, options, fn) {
this.sendPacket("message", msg, options, fn);
return this;
}
send(msg, options, fn) {
this.sendPacket("message", msg, options, fn);
return this;
}
/**
* Sends a packet.
*
* @param {String} packet type.
* @param {String} data.
* @param {Object} options.
* @param {Function} callback function.
* @api private
*/
sendPacket(type, data, options, fn) {
if ("function" === typeof data) {
fn = data;
data = undefined;
}
if ("function" === typeof options) {
fn = options;
options = null;
}
if ("closing" === this.readyState || "closed" === this.readyState) {
return;
}
options = options || {};
options.compress = false !== options.compress;
const packet = {
type: type,
data: data,
options: options
};
this.emit("packetCreate", packet);
this.writeBuffer.push(packet);
if (fn) this.once("flush", fn);
this.flush();
}
/**
* Closes the connection.
*
* @api private
*/
close() {
const self = this;
if ("opening" === this.readyState || "open" === this.readyState) {
this.readyState = "closing";
if (this.writeBuffer.length) {
this.once("drain", function() {
if (this.upgrading) {
waitForUpgrade();
} else {
close();
}
});
} else if (this.upgrading) {
waitForUpgrade();
} else {
close();
}
}
function close() {
self.onClose("forced close");
debug("socket closing - telling transport to close");
self.transport.close();
}
function cleanupAndClose() {
self.removeListener("upgrade", cleanupAndClose);
self.removeListener("upgradeError", cleanupAndClose);
close();
}
function waitForUpgrade() {
// wait for upgrade to finish since we can't send packets while pausing a transport
self.once("upgrade", cleanupAndClose);
self.once("upgradeError", cleanupAndClose);
}
return this;
}
/**
* Called upon transport error
*
* @api private
*/
onError(err) {
debug("socket error %j", err);
Socket.priorWebsocketSuccess = false;
this.emit("error", err);
this.onClose("transport error", err);
}
/**
* Called upon transport close.
*
* @api private
*/
onClose(reason, desc) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket close with reason: "%s"', reason);
const self = this;
// clear timers
clearTimeout(this.pingIntervalTimer);
clearTimeout(this.pingTimeoutTimer);
// stop event from firing again for transport
this.transport.removeAllListeners("close");
// ensure transport won't stay open
this.transport.close();
// ignore further transport communication
this.transport.removeAllListeners();
// set ready state
this.readyState = "closed";
// clear session id
this.id = null;
// emit close event
this.emit("close", reason, desc);
// clean buffers after, so users can still
// grab the buffers on `close` event
self.writeBuffer = [];
self.prevBufferLen = 0;
}
}
/**
* Filters upgrades, returning only those matching client transports.
*
* @param {Array} server upgrades
* @api private
*
*/
filterUpgrades(upgrades) {
const filteredUpgrades = [];
let i = 0;
const j = upgrades.length;
for (; i < j; i++) {
if (~this.transports.indexOf(upgrades[i]))
filteredUpgrades.push(upgrades[i]);
}
return filteredUpgrades;
}
}
Socket.priorWebsocketSuccess = false;
/**
* Protocol version.
*
* @api public
*/
Socket.protocol = parser.protocol; // this is an int
function clone(obj) {
const o = {};
for (let i in obj) {
if (obj.hasOwnProperty(i)) {
o[i] = obj[i];
}
}
return o;
}
module.exports = Socket;