mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
fix: address review feedback on uws transport
- Use const/let instead of var - Use WeakMap instead of Map for socket listeners to prevent memory leaks - Write 502 Bad Gateway response before destroying socket on proxy error - Wire up idleTimeout setting in uWS config
This commit is contained in:
@@ -15,25 +15,26 @@ export function createUwsTransport() {
|
||||
return {
|
||||
name: 'uws',
|
||||
setup(httpServer, pathPrefix, options) {
|
||||
var emitter = new EventEmitter();
|
||||
var uws = Npm.require('uWebSockets.js');
|
||||
const emitter = new EventEmitter();
|
||||
const uws = Npm.require('uWebSockets.js');
|
||||
|
||||
var settings = __meteor_runtime_config__?.meteorSettings?.packages?.['ddp-server']?.uws || {};
|
||||
var uwsPort = Number(settings.port) || 5001;
|
||||
var uwsPayloadLength = Number(settings.payloadLength) || 48;
|
||||
var uwsSocketTimeout = Number(settings.timeout) || 45;
|
||||
var uwsHost = settings.host || '127.0.0.1';
|
||||
var uwsProxyHost = uwsHost === '0.0.0.0'
|
||||
const settings = __meteor_runtime_config__?.meteorSettings?.packages?.['ddp-server']?.uws || {};
|
||||
const uwsPort = Number(settings.port) || 5001;
|
||||
const uwsPayloadLength = Number(settings.payloadLength) || 48;
|
||||
const uwsSocketTimeout = Number(settings.timeout) || 45;
|
||||
const uwsHost = settings.host || '127.0.0.1';
|
||||
const uwsProxyHost = uwsHost === '0.0.0.0'
|
||||
? '127.0.0.1'
|
||||
: uwsHost === '::'
|
||||
? '::1'
|
||||
: uwsHost;
|
||||
|
||||
// Internal Maps for event listeners (uWS sockets don't have EventEmitter)
|
||||
var closeListeners = new Map();
|
||||
var messageListeners = new Map();
|
||||
// WeakMaps for event listeners (uWS sockets don't have EventEmitter).
|
||||
// WeakMap allows automatic GC if uWS drops a socket without firing close.
|
||||
const closeListeners = new WeakMap();
|
||||
const messageListeners = new WeakMap();
|
||||
|
||||
var uwsApp = uws.App();
|
||||
const uwsApp = uws.App();
|
||||
|
||||
uwsApp.get('/*', function (res) {
|
||||
res.end('OK');
|
||||
@@ -42,6 +43,7 @@ export function createUwsTransport() {
|
||||
uwsApp.ws('/*', {
|
||||
maxBackpressure: 16 * 1024 * 1024,
|
||||
maxPayloadLength: uwsPayloadLength * 1024,
|
||||
idleTimeout: uwsSocketTimeout,
|
||||
|
||||
open(socket) {
|
||||
// Adapt uWS socket to the interface expected by _onConnection.
|
||||
@@ -65,7 +67,7 @@ export function createUwsTransport() {
|
||||
},
|
||||
|
||||
upgrade(res, req, context) {
|
||||
var headers = {};
|
||||
const headers = {};
|
||||
req.forEach((key, value) => {
|
||||
headers[key] = value;
|
||||
});
|
||||
@@ -81,7 +83,7 @@ export function createUwsTransport() {
|
||||
|
||||
close(socket) {
|
||||
socket.isClosed = true;
|
||||
var closeListener = closeListeners.get(socket);
|
||||
const closeListener = closeListeners.get(socket);
|
||||
closeListeners.delete(socket);
|
||||
messageListeners.delete(socket);
|
||||
if (closeListener) closeListener();
|
||||
@@ -89,8 +91,8 @@ export function createUwsTransport() {
|
||||
|
||||
message(socket, message, isBinary) {
|
||||
if (isBinary) return;
|
||||
var str = Buffer.from(message).toString('utf-8');
|
||||
var messageListener = messageListeners.get(socket);
|
||||
const str = Buffer.from(message).toString('utf-8');
|
||||
const messageListener = messageListeners.get(socket);
|
||||
if (messageListener) messageListener(str);
|
||||
}
|
||||
});
|
||||
@@ -105,7 +107,7 @@ export function createUwsTransport() {
|
||||
|
||||
// Reject plain HTTP requests to /websocket
|
||||
WebApp.rawConnectHandlers.use(function (req, res, next) {
|
||||
var pathname = new URL(req.url, 'http://localhost').pathname;
|
||||
const pathname = new URL(req.url, 'http://localhost').pathname;
|
||||
if (pathname === pathPrefix + '/websocket' ||
|
||||
pathname === pathPrefix + '/websocket/') {
|
||||
res.writeHead(400, { 'Content-Type': 'text/plain' });
|
||||
@@ -129,23 +131,23 @@ export function createUwsTransport() {
|
||||
* to the uWebSockets.js server via a raw TCP connection.
|
||||
*/
|
||||
function proxyWebsocketToUws(httpServer, pathPrefix, uwsHost, uwsPort) {
|
||||
var oldUpgradeListeners = httpServer.listeners('upgrade').slice(0);
|
||||
const oldUpgradeListeners = httpServer.listeners('upgrade').slice(0);
|
||||
httpServer.removeAllListeners('upgrade');
|
||||
|
||||
httpServer.on('upgrade', function (req, rawSocket, head) {
|
||||
var pathname = new URL(req.url, 'http://localhost').pathname;
|
||||
const pathname = new URL(req.url, 'http://localhost').pathname;
|
||||
|
||||
if (pathname === pathPrefix + '/websocket' ||
|
||||
pathname === pathPrefix + '/websocket/') {
|
||||
|
||||
// Build the raw HTTP upgrade request to forward to uWS
|
||||
var uwsSocket = net.createConnection(uwsPort, uwsHost, function () {
|
||||
var headers = '';
|
||||
for (var i = 0; i < req.rawHeaders.length; i += 2) {
|
||||
const uwsSocket = net.createConnection(uwsPort, uwsHost, function () {
|
||||
let headers = '';
|
||||
for (let i = 0; i < req.rawHeaders.length; i += 2) {
|
||||
headers += req.rawHeaders[i] + ': ' + req.rawHeaders[i + 1] + '\r\n';
|
||||
}
|
||||
|
||||
var httpRequest =
|
||||
const httpRequest =
|
||||
req.method + ' ' + req.url + ' HTTP/' + req.httpVersion + '\r\n' +
|
||||
headers + '\r\n';
|
||||
|
||||
@@ -157,15 +159,24 @@ function proxyWebsocketToUws(httpServer, pathPrefix, uwsHost, uwsPort) {
|
||||
});
|
||||
|
||||
uwsSocket.on('error', function () {
|
||||
if (rawSocket.writable) {
|
||||
rawSocket.write(
|
||||
'HTTP/1.1 502 Bad Gateway\r\n' +
|
||||
'Connection: close\r\n' +
|
||||
'Content-Type: text/plain\r\n' +
|
||||
'\r\n' +
|
||||
'502 Bad Gateway: Upstream WebSocket server unreachable.'
|
||||
);
|
||||
}
|
||||
rawSocket.destroy();
|
||||
});
|
||||
|
||||
rawSocket.on('error', function () {
|
||||
uwsSocket.destroy();
|
||||
if (uwsSocket.writable) uwsSocket.destroy();
|
||||
});
|
||||
} else {
|
||||
// Pass to other upgrade handlers (HMR, etc.)
|
||||
for (var i = 0; i < oldUpgradeListeners.length; i++) {
|
||||
for (let i = 0; i < oldUpgradeListeners.length; i++) {
|
||||
oldUpgradeListeners[i].call(httpServer, req, rawSocket, head);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user