mirror of
https://github.com/socketio/socket.io.git
synced 2026-01-13 17:07:54 -05:00
This commit restores the ability to send additional data in the
middleware functions, which was removed during the rewrite to
Typescript ([1]).
The only difference with the previous implementation is that the client
will now emit a "connect_error" (previously, "error") event with an
actual Error object, with both the message and an optional "data"
attribute.
```js
// server-side
io.use((socket, next) => {
const err = new Error("not authorized");
err.data = { content: "Please retry later" };
next(err);
});
// client-side
socket.on("connect_error", err => {
console.log(err.message); // not authorized
console.log(err.data.content); // Please retry later
});
```
[1]: a5581a9789
716 lines
18 KiB
TypeScript
716 lines
18 KiB
TypeScript
import http from "http";
|
||
import { createReadStream } from "fs";
|
||
import { createDeflate, createGzip, createBrotliCompress } from "zlib";
|
||
import accepts = require("accepts");
|
||
import { pipeline } from "stream";
|
||
import path from "path";
|
||
import engine from "engine.io";
|
||
import { Client } from "./client";
|
||
import { EventEmitter } from "events";
|
||
import { ExtendedError, Namespace } from "./namespace";
|
||
import { ParentNamespace } from "./parent-namespace";
|
||
import { Adapter, Room, SocketId } from "socket.io-adapter";
|
||
import * as parser from "socket.io-parser";
|
||
import { Encoder } from "socket.io-parser";
|
||
import debugModule from "debug";
|
||
import { Socket } from "./socket";
|
||
import { CookieSerializeOptions } from "cookie";
|
||
import { CorsOptions } from "cors";
|
||
|
||
const debug = debugModule("socket.io:server");
|
||
|
||
const clientVersion = require("../package.json").version;
|
||
const dotMapRegex = /\.map/;
|
||
|
||
type Transport = "polling" | "websocket";
|
||
|
||
interface EngineOptions {
|
||
/**
|
||
* how many ms without a pong packet to consider the connection closed
|
||
* @default 5000
|
||
*/
|
||
pingTimeout: number;
|
||
/**
|
||
* how many ms before sending a new ping packet
|
||
* @default 25000
|
||
*/
|
||
pingInterval: number;
|
||
/**
|
||
* how many ms before an uncompleted transport upgrade is cancelled
|
||
* @default 10000
|
||
*/
|
||
upgradeTimeout: number;
|
||
/**
|
||
* how many bytes or characters a message can be, before closing the session (to avoid DoS).
|
||
* @default 1e5 (100 KB)
|
||
*/
|
||
maxHttpBufferSize: number;
|
||
/**
|
||
* A function that receives a given handshake or upgrade request as its first parameter,
|
||
* and can decide whether to continue or not. The second argument is a function that needs
|
||
* to be called with the decided information: fn(err, success), where success is a boolean
|
||
* value where false means that the request is rejected, and err is an error code.
|
||
*/
|
||
allowRequest: (
|
||
req: http.IncomingMessage,
|
||
fn: (err: string | null | undefined, success: boolean) => void
|
||
) => void;
|
||
/**
|
||
* the low-level transports that are enabled
|
||
* @default ["polling", "websocket"]
|
||
*/
|
||
transports: Transport[];
|
||
/**
|
||
* whether to allow transport upgrades
|
||
* @default true
|
||
*/
|
||
allowUpgrades: boolean;
|
||
/**
|
||
* parameters of the WebSocket permessage-deflate extension (see ws module api docs). Set to false to disable.
|
||
* @default false
|
||
*/
|
||
perMessageDeflate: boolean | object;
|
||
/**
|
||
* parameters of the http compression for the polling transports (see zlib api docs). Set to false to disable.
|
||
* @default true
|
||
*/
|
||
httpCompression: boolean | object;
|
||
/**
|
||
* what WebSocket server implementation to use. Specified module must
|
||
* conform to the ws interface (see ws module api docs). Default value is ws.
|
||
* An alternative c++ addon is also available by installing uws module.
|
||
*/
|
||
wsEngine: string;
|
||
/**
|
||
* an optional packet which will be concatenated to the handshake packet emitted by Engine.IO.
|
||
*/
|
||
initialPacket: any;
|
||
/**
|
||
* configuration of the cookie that contains the client sid to send as part of handshake response headers. This cookie
|
||
* might be used for sticky-session. Defaults to not sending any cookie.
|
||
* @default false
|
||
*/
|
||
cookie: CookieSerializeOptions | boolean;
|
||
/**
|
||
* the options that will be forwarded to the cors module
|
||
*/
|
||
cors: CorsOptions;
|
||
}
|
||
|
||
interface AttachOptions {
|
||
/**
|
||
* name of the path to capture
|
||
* @default "/engine.io"
|
||
*/
|
||
path: string;
|
||
/**
|
||
* destroy unhandled upgrade requests
|
||
* @default true
|
||
*/
|
||
destroyUpgrade: boolean;
|
||
/**
|
||
* milliseconds after which unhandled requests are ended
|
||
* @default 1000
|
||
*/
|
||
destroyUpgradeTimeout: number;
|
||
}
|
||
|
||
interface EngineAttachOptions extends EngineOptions, AttachOptions {}
|
||
|
||
interface ServerOptions extends EngineAttachOptions {
|
||
/**
|
||
* name of the path to capture
|
||
* @default "/socket.io"
|
||
*/
|
||
path: string;
|
||
/**
|
||
* whether to serve the client files
|
||
* @default true
|
||
*/
|
||
serveClient: boolean;
|
||
/**
|
||
* the adapter to use
|
||
* @default the in-memory adapter (https://github.com/socketio/socket.io-adapter)
|
||
*/
|
||
adapter: any;
|
||
/**
|
||
* the parser to use
|
||
* @default the default parser (https://github.com/socketio/socket.io-parser)
|
||
*/
|
||
parser: any;
|
||
/**
|
||
* how many ms before a client without namespace is closed
|
||
* @default 45000
|
||
*/
|
||
connectTimeout: number;
|
||
}
|
||
|
||
export class Server extends EventEmitter {
|
||
public readonly sockets: Namespace;
|
||
|
||
/** @private */
|
||
readonly _parser;
|
||
/** @private */
|
||
readonly encoder: Encoder;
|
||
|
||
/**
|
||
* @private
|
||
*/
|
||
_nsps: Map<string, Namespace> = new Map();
|
||
private parentNsps: Map<
|
||
| string
|
||
| RegExp
|
||
| ((
|
||
name: string,
|
||
query: object,
|
||
fn: (err: Error, success: boolean) => void
|
||
) => void),
|
||
ParentNamespace
|
||
> = new Map();
|
||
private _adapter: any;
|
||
private _serveClient: boolean;
|
||
private eio;
|
||
private engine;
|
||
private _path: string;
|
||
private clientPathRegex: RegExp;
|
||
|
||
/**
|
||
* @private
|
||
*/
|
||
_connectTimeout: number;
|
||
private httpServer: http.Server;
|
||
|
||
/**
|
||
* Server constructor.
|
||
*
|
||
* @param {http.Server|Number|Object} srv http server, port or options
|
||
* @param {Object} [opts]
|
||
* @public
|
||
*/
|
||
constructor(opts?: Partial<ServerOptions>);
|
||
constructor(srv: http.Server, opts?: Partial<ServerOptions>);
|
||
constructor(srv: number, opts?: Partial<ServerOptions>);
|
||
constructor(srv?: any, opts: Partial<ServerOptions> = {}) {
|
||
super();
|
||
if ("object" == typeof srv && srv instanceof Object && !srv.listen) {
|
||
opts = srv;
|
||
srv = null;
|
||
}
|
||
this.path(opts.path || "/socket.io");
|
||
this.connectTimeout(opts.connectTimeout || 45000);
|
||
this.serveClient(false !== opts.serveClient);
|
||
this._parser = opts.parser || parser;
|
||
this.encoder = new this._parser.Encoder();
|
||
this.adapter(opts.adapter || Adapter);
|
||
this.sockets = this.of("/");
|
||
if (srv) this.attach(srv, opts);
|
||
}
|
||
|
||
/**
|
||
* Sets/gets whether client code is being served.
|
||
*
|
||
* @param {Boolean} v - whether to serve client code
|
||
* @return {Server|Boolean} self when setting or value when getting
|
||
* @public
|
||
*/
|
||
public serveClient(v: boolean): Server;
|
||
public serveClient(): boolean;
|
||
public serveClient(v?: boolean): Server | boolean {
|
||
if (!arguments.length) return this._serveClient;
|
||
this._serveClient = v;
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Executes the middleware for an incoming namespace not already created on the server.
|
||
*
|
||
* @param {String} name - name of incoming namespace
|
||
* @param {Object} auth - the auth parameters
|
||
* @param {Function} fn - callback
|
||
*
|
||
* @private
|
||
*/
|
||
_checkNamespace(
|
||
name: string,
|
||
auth: object,
|
||
fn: (nsp: Namespace | boolean) => void
|
||
) {
|
||
if (this.parentNsps.size === 0) return fn(false);
|
||
|
||
const keysIterator = this.parentNsps.keys();
|
||
|
||
const run = () => {
|
||
let nextFn = keysIterator.next();
|
||
if (nextFn.done) {
|
||
return fn(false);
|
||
}
|
||
nextFn.value(name, auth, (err, allow) => {
|
||
if (err || !allow) {
|
||
run();
|
||
} else {
|
||
fn(this.parentNsps.get(nextFn.value).createChild(name));
|
||
}
|
||
});
|
||
};
|
||
|
||
run();
|
||
}
|
||
|
||
/**
|
||
* Sets the client serving path.
|
||
*
|
||
* @param {String} v pathname
|
||
* @return {Server|String} self when setting or value when getting
|
||
* @public
|
||
*/
|
||
public path(v: string): Server;
|
||
public path(): string;
|
||
public path(v?: string): Server | string {
|
||
if (!arguments.length) return this._path;
|
||
|
||
this._path = v.replace(/\/$/, "");
|
||
|
||
const escapedPath = this._path.replace(/[-\/\\^$*+?.()|[\]{}]/g, "\\$&");
|
||
this.clientPathRegex = new RegExp(
|
||
"^" +
|
||
escapedPath +
|
||
"/socket\\.io(\\.min|\\.msgpack\\.min)?\\.js(\\.map)?$"
|
||
);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Set the delay after which a client without namespace is closed
|
||
* @param v
|
||
* @public
|
||
*/
|
||
public connectTimeout(v: number): Server;
|
||
public connectTimeout(): number;
|
||
public connectTimeout(v?: number): Server | number {
|
||
if (v === undefined) return this._connectTimeout;
|
||
this._connectTimeout = v;
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Sets the adapter for rooms.
|
||
*
|
||
* @param {Adapter} v pathname
|
||
* @return {Server|Adapter} self when setting or value when getting
|
||
* @public
|
||
*/
|
||
public adapter(): any;
|
||
public adapter(v: any);
|
||
public adapter(v?): Server | any {
|
||
if (!arguments.length) return this._adapter;
|
||
this._adapter = v;
|
||
for (const nsp of this._nsps.values()) {
|
||
nsp._initAdapter();
|
||
}
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Attaches socket.io to a server or port.
|
||
*
|
||
* @param {http.Server|Number} srv - server or port
|
||
* @param {Object} opts - options passed to engine.io
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public listen(srv: http.Server, opts?: Partial<ServerOptions>): Server;
|
||
public listen(srv: number, opts?: Partial<ServerOptions>): Server;
|
||
public listen(srv: any, opts: Partial<ServerOptions> = {}): Server {
|
||
return this.attach(srv, opts);
|
||
}
|
||
|
||
/**
|
||
* Attaches socket.io to a server or port.
|
||
*
|
||
* @param {http.Server|Number} srv - server or port
|
||
* @param {Object} opts - options passed to engine.io
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public attach(srv: http.Server, opts?: Partial<ServerOptions>): Server;
|
||
public attach(port: number, opts?: Partial<ServerOptions>): Server;
|
||
public attach(srv: any, opts: Partial<ServerOptions> = {}): Server {
|
||
if ("function" == typeof srv) {
|
||
const msg =
|
||
"You are trying to attach socket.io to an express " +
|
||
"request handler function. Please pass a http.Server instance.";
|
||
throw new Error(msg);
|
||
}
|
||
|
||
// handle a port as a string
|
||
if (Number(srv) == srv) {
|
||
srv = Number(srv);
|
||
}
|
||
|
||
if ("number" == typeof srv) {
|
||
debug("creating http server and binding to %d", srv);
|
||
const port = srv;
|
||
srv = http.createServer((req, res) => {
|
||
res.writeHead(404);
|
||
res.end();
|
||
});
|
||
srv.listen(port);
|
||
}
|
||
|
||
// set engine.io path to `/socket.io`
|
||
opts.path = opts.path || this._path;
|
||
|
||
this.initEngine(srv, opts);
|
||
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Initialize engine
|
||
*
|
||
* @param srv - the server to attach to
|
||
* @param opts - options passed to engine.io
|
||
* @private
|
||
*/
|
||
private initEngine(srv: http.Server, opts: Partial<EngineAttachOptions>) {
|
||
// initialize engine
|
||
debug("creating engine.io instance with opts %j", opts);
|
||
this.eio = engine.attach(srv, opts);
|
||
|
||
// attach static file serving
|
||
if (this._serveClient) this.attachServe(srv);
|
||
|
||
// Export http server
|
||
this.httpServer = srv;
|
||
|
||
// bind to engine events
|
||
this.bind(this.eio);
|
||
}
|
||
|
||
/**
|
||
* Attaches the static file serving.
|
||
*
|
||
* @param {Function|http.Server} srv http server
|
||
* @private
|
||
*/
|
||
private attachServe(srv) {
|
||
debug("attaching client serving req handler");
|
||
|
||
const evs = srv.listeners("request").slice(0);
|
||
srv.removeAllListeners("request");
|
||
srv.on("request", (req, res) => {
|
||
if (this.clientPathRegex.test(req.url)) {
|
||
this.serve(req, res);
|
||
} else {
|
||
for (let i = 0; i < evs.length; i++) {
|
||
evs[i].call(srv, req, res);
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* Handles a request serving of client source and map
|
||
*
|
||
* @param {http.IncomingMessage} req
|
||
* @param {http.ServerResponse} res
|
||
* @private
|
||
*/
|
||
private serve(req: http.IncomingMessage, res: http.ServerResponse) {
|
||
const filename = req.url.replace(this._path, "");
|
||
const isMap = dotMapRegex.test(filename);
|
||
const type = isMap ? "map" : "source";
|
||
|
||
// Per the standard, ETags must be quoted:
|
||
// https://tools.ietf.org/html/rfc7232#section-2.3
|
||
const expectedEtag = '"' + clientVersion + '"';
|
||
|
||
const etag = req.headers["if-none-match"];
|
||
if (etag) {
|
||
if (expectedEtag == etag) {
|
||
debug("serve client %s 304", type);
|
||
res.writeHead(304);
|
||
res.end();
|
||
return;
|
||
}
|
||
}
|
||
|
||
debug("serve client %s", type);
|
||
|
||
res.setHeader("Cache-Control", "public, max-age=0");
|
||
res.setHeader(
|
||
"Content-Type",
|
||
"application/" + (isMap ? "json" : "javascript")
|
||
);
|
||
res.setHeader("ETag", expectedEtag);
|
||
|
||
if (!isMap) {
|
||
res.setHeader("X-SourceMap", filename.substring(1) + ".map");
|
||
}
|
||
Server.sendFile(filename, req, res);
|
||
}
|
||
|
||
/**
|
||
* @param filename
|
||
* @param req
|
||
* @param res
|
||
* @private
|
||
*/
|
||
private static sendFile(
|
||
filename: string,
|
||
req: http.IncomingMessage,
|
||
res: http.ServerResponse
|
||
) {
|
||
const readStream = createReadStream(
|
||
path.join(__dirname, "../client-dist/", filename)
|
||
);
|
||
const encoding = accepts(req).encodings(["br", "gzip", "deflate"]);
|
||
|
||
const onError = err => {
|
||
if (err) {
|
||
res.end();
|
||
}
|
||
};
|
||
|
||
switch (encoding) {
|
||
case "br":
|
||
res.writeHead(200, { "content-encoding": "br" });
|
||
readStream.pipe(createBrotliCompress()).pipe(res);
|
||
pipeline(readStream, createBrotliCompress(), res, onError);
|
||
break;
|
||
case "gzip":
|
||
res.writeHead(200, { "content-encoding": "gzip" });
|
||
pipeline(readStream, createGzip(), res, onError);
|
||
break;
|
||
case "deflate":
|
||
res.writeHead(200, { "content-encoding": "deflate" });
|
||
pipeline(readStream, createDeflate(), res, onError);
|
||
break;
|
||
default:
|
||
res.writeHead(200);
|
||
pipeline(readStream, res, onError);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Binds socket.io to an engine.io instance.
|
||
*
|
||
* @param {engine.Server} engine engine.io (or compatible) server
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public bind(engine): Server {
|
||
this.engine = engine;
|
||
this.engine.on("connection", this.onconnection.bind(this));
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Called with each incoming transport connection.
|
||
*
|
||
* @param {engine.Socket} conn
|
||
* @return {Server} self
|
||
* @private
|
||
*/
|
||
private onconnection(conn): Server {
|
||
debug("incoming connection with id %s", conn.id);
|
||
new Client(this, conn);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Looks up a namespace.
|
||
*
|
||
* @param {String|RegExp|Function} name nsp name
|
||
* @param {Function} [fn] optional, nsp `connection` ev handler
|
||
* @public
|
||
*/
|
||
public of(
|
||
name:
|
||
| string
|
||
| RegExp
|
||
| ((
|
||
name: string,
|
||
query: object,
|
||
fn: (err: Error, success: boolean) => void
|
||
) => void),
|
||
fn?: (socket: Socket) => void
|
||
) {
|
||
if (typeof name === "function" || name instanceof RegExp) {
|
||
const parentNsp = new ParentNamespace(this);
|
||
debug("initializing parent namespace %s", parentNsp.name);
|
||
if (typeof name === "function") {
|
||
this.parentNsps.set(name, parentNsp);
|
||
} else {
|
||
this.parentNsps.set(
|
||
(nsp, conn, next) => next(null, (name as RegExp).test(nsp)),
|
||
parentNsp
|
||
);
|
||
}
|
||
if (fn) {
|
||
// @ts-ignore
|
||
parentNsp.on("connect", fn);
|
||
}
|
||
return parentNsp;
|
||
}
|
||
|
||
if (String(name)[0] !== "/") name = "/" + name;
|
||
|
||
let nsp = this._nsps.get(name);
|
||
if (!nsp) {
|
||
debug("initializing namespace %s", name);
|
||
nsp = new Namespace(this, name);
|
||
this._nsps.set(name, nsp);
|
||
}
|
||
if (fn) nsp.on("connect", fn);
|
||
return nsp;
|
||
}
|
||
|
||
/**
|
||
* Closes server connection
|
||
*
|
||
* @param {Function} [fn] optional, called as `fn([err])` on error OR all conns closed
|
||
* @public
|
||
*/
|
||
public close(fn?: (err?: Error) => void): void {
|
||
for (const socket of this.sockets.sockets.values()) {
|
||
socket._onclose("server shutting down");
|
||
}
|
||
|
||
this.engine.close();
|
||
|
||
if (this.httpServer) {
|
||
this.httpServer.close(fn);
|
||
} else {
|
||
fn && fn();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Sets up namespace middleware.
|
||
*
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public use(
|
||
fn: (socket: Socket, next: (err?: ExtendedError) => void) => void
|
||
): Server {
|
||
this.sockets.use(fn);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Targets a room when emitting.
|
||
*
|
||
* @param {String} name
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public to(name: Room): Server {
|
||
this.sockets.to(name);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Targets a room when emitting.
|
||
*
|
||
* @param {String} name
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public in(name: Room): Server {
|
||
this.sockets.in(name);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Sends a `message` event to all clients.
|
||
*
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public send(...args): Server {
|
||
args.unshift("message");
|
||
this.sockets.emit.apply(this.sockets, args);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Sends a `message` event to all clients.
|
||
*
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public write(...args): Server {
|
||
args.unshift("message");
|
||
this.sockets.emit.apply(this.sockets, args);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Gets a list of socket ids.
|
||
*
|
||
* @public
|
||
*/
|
||
public allSockets(): Promise<Set<SocketId>> {
|
||
return this.sockets.allSockets();
|
||
}
|
||
|
||
/**
|
||
* Sets the compress flag.
|
||
*
|
||
* @param {Boolean} compress - if `true`, compresses the sending data
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public compress(compress: boolean): Server {
|
||
this.sockets.compress(compress);
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
|
||
* receive messages (because of network slowness or other issues, or because they’re connected through long polling
|
||
* and is in the middle of a request-response cycle).
|
||
*
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public get volatile(): Server {
|
||
this.sockets.volatile;
|
||
return this;
|
||
}
|
||
|
||
/**
|
||
* Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node.
|
||
*
|
||
* @return {Server} self
|
||
* @public
|
||
*/
|
||
public get local(): Server {
|
||
this.sockets.local;
|
||
return this;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Expose main namespace (/).
|
||
*/
|
||
|
||
const emitterMethods = Object.keys(EventEmitter.prototype).filter(function(
|
||
key
|
||
) {
|
||
return typeof EventEmitter.prototype[key] === "function";
|
||
});
|
||
|
||
emitterMethods.forEach(function(fn) {
|
||
Server.prototype[fn] = function() {
|
||
return this.sockets[fn].apply(this.sockets, arguments);
|
||
};
|
||
});
|
||
|
||
module.exports = (srv?, opts?) => new Server(srv, opts);
|
||
module.exports.Server = Server;
|
||
|
||
export { Socket }; // for "connect" event typing
|