feat: add emitter based on Redis streams

Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/8
This commit is contained in:
Damien Arrachequesne
2025-11-06 17:57:56 +01:00
parent 693080cac7
commit 1c3e4711c1
12 changed files with 1355 additions and 66 deletions

142
package-lock.json generated
View File

@@ -16,7 +16,8 @@
"packages/socket.io-parser",
"packages/socket.io-client",
"packages/socket.io",
"packages/socket.io-postgres-emitter"
"packages/socket.io-postgres-emitter",
"packages/socket.io-redis-streams-emitter"
],
"devDependencies": {
"@babel/core": "^7.24.7",
@@ -31,6 +32,7 @@
"@rollup/plugin-node-resolve": "^15.2.3",
"@sinonjs/fake-timers": "^11.2.2",
"@socket.io/postgres-adapter": "^0.1.0",
"@socket.io/redis-streams-adapter": "~0.2.2",
"@types/debug": "^4.1.12",
"@types/expect.js": "^0.3.32",
"@types/mocha": "^10.0.7",
@@ -121,6 +123,7 @@
"resolved": "https://registry.npmjs.org/@babel/core/-/core-7.24.7.tgz",
"integrity": "sha512-nykK+LEK86ahTkX/3TgauT0ikKoNCfKHEaZYTUVupJdTLzGNvrblu4u6fa7DhZONAltdf8e662t/abY8idrd/g==",
"dev": true,
"peer": true,
"dependencies": {
"@ampproject/remapping": "^2.2.0",
"@babel/code-frame": "^7.24.7",
@@ -2809,6 +2812,7 @@
"resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.17.tgz",
"integrity": "sha512-IPvU9A31qRCZ7lds/x+ksuK/UMndd0EASveAvCvEtFFKIZjZ+m/a4a0L7S28KEWoR5ka8526hlSghDo4Hrc2Hg==",
"dev": true,
"peer": true,
"dependencies": {
"cluster-key-slot": "1.1.2",
"generic-pool": "3.9.0",
@@ -3209,6 +3213,45 @@
"resolved": "packages/socket.io-postgres-emitter",
"link": true
},
"node_modules/@socket.io/redis-streams-adapter": {
"version": "0.2.2",
"resolved": "https://registry.npmjs.org/@socket.io/redis-streams-adapter/-/redis-streams-adapter-0.2.2.tgz",
"integrity": "sha512-BMPa6oGC0wFgpMXoGksbJ75zMBwk+79pxjHc2YusdoK+X0BxN4fTsqEBuFV7yeXi9ekbi87rwlsT61+WZGVW9g==",
"dev": true,
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.3.1"
},
"engines": {
"node": ">=14.0.0"
},
"peerDependencies": {
"socket.io-adapter": "^2.5.4"
}
},
"node_modules/@socket.io/redis-streams-adapter/node_modules/debug": {
"version": "4.3.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
"integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"ms": "^2.1.3"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"node_modules/@socket.io/redis-streams-emitter": {
"resolved": "packages/socket.io-redis-streams-emitter",
"link": true
},
"node_modules/@szmarczak/http-timer": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/@szmarczak/http-timer/-/http-timer-4.0.6.tgz",
@@ -3306,7 +3349,6 @@
"resolved": "https://registry.npmjs.org/@types/eslint-scope/-/eslint-scope-3.7.7.tgz",
"integrity": "sha512-MzMFlSLBqNF2gcHWO0G1vP/YQyfvrxZ0bF+u7mzUdZ1/xK4A4sru+nraZz5i3iEIk1l1uyicaDVTB4QbbEkAYg==",
"dev": true,
"peer": true,
"dependencies": {
"@types/eslint": "*",
"@types/estree": "*"
@@ -3395,7 +3437,8 @@
"node_modules/@types/node": {
"version": "18.15.3",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.15.3.tgz",
"integrity": "sha512-p6ua9zBxz5otCmbpb5D3U4B5Nanw6Pk3PPyX05xnxbB/fRv71N7CPmORg7uAD5P70T0xmx1pzAx/FUfa5X+3cw=="
"integrity": "sha512-p6ua9zBxz5otCmbpb5D3U4B5Nanw6Pk3PPyX05xnxbB/fRv71N7CPmORg7uAD5P70T0xmx1pzAx/FUfa5X+3cw==",
"peer": true
},
"node_modules/@types/normalize-package-data": {
"version": "2.4.4",
@@ -4559,7 +4602,6 @@
"integrity": "sha512-nuBEDgQfm1ccRp/8bCQrx1frohyufl4JlbMMZ4P1wpeOfDhF6FQkxZJ1b/e+PLwr6X1Nhw6OLme5usuBWYBvuQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/helper-numbers": "1.13.2",
"@webassemblyjs/helper-wasm-bytecode": "1.13.2"
@@ -4570,24 +4612,21 @@
"resolved": "https://registry.npmjs.org/@webassemblyjs/floating-point-hex-parser/-/floating-point-hex-parser-1.13.2.tgz",
"integrity": "sha512-6oXyTOzbKxGH4steLbLNOu71Oj+C8Lg34n6CqRvqfS2O71BxY6ByfMDRhBytzknj9yGUPVJ1qIKhRlAwO1AovA==",
"dev": true,
"license": "MIT",
"peer": true
"license": "MIT"
},
"node_modules/@webassemblyjs/helper-api-error": {
"version": "1.13.2",
"resolved": "https://registry.npmjs.org/@webassemblyjs/helper-api-error/-/helper-api-error-1.13.2.tgz",
"integrity": "sha512-U56GMYxy4ZQCbDZd6JuvvNV/WFildOjsaWD3Tzzvmw/mas3cXzRJPMjP83JqEsgSbyrmaGjBfDtV7KDXV9UzFQ==",
"dev": true,
"license": "MIT",
"peer": true
"license": "MIT"
},
"node_modules/@webassemblyjs/helper-buffer": {
"version": "1.14.1",
"resolved": "https://registry.npmjs.org/@webassemblyjs/helper-buffer/-/helper-buffer-1.14.1.tgz",
"integrity": "sha512-jyH7wtcHiKssDtFPRB+iQdxlDf96m0E39yb0k5uJVhFGleZFoNw1c4aeIcVUPPbXUVJ94wwnMOAqUHyzoEPVMA==",
"dev": true,
"license": "MIT",
"peer": true
"license": "MIT"
},
"node_modules/@webassemblyjs/helper-numbers": {
"version": "1.13.2",
@@ -4595,7 +4634,6 @@
"integrity": "sha512-FE8aCmS5Q6eQYcV3gI35O4J789wlQA+7JrqTTpJqn5emA4U2hvwJmvFRC0HODS+3Ye6WioDklgd6scJ3+PLnEA==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/floating-point-hex-parser": "1.13.2",
"@webassemblyjs/helper-api-error": "1.13.2",
@@ -4607,8 +4645,7 @@
"resolved": "https://registry.npmjs.org/@webassemblyjs/helper-wasm-bytecode/-/helper-wasm-bytecode-1.13.2.tgz",
"integrity": "sha512-3QbLKy93F0EAIXLh0ogEVR6rOubA9AoZ+WRYhNbFyuB70j3dRdwH9g+qXhLAO0kiYGlg3TxDV+I4rQTr/YNXkA==",
"dev": true,
"license": "MIT",
"peer": true
"license": "MIT"
},
"node_modules/@webassemblyjs/helper-wasm-section": {
"version": "1.14.1",
@@ -4616,7 +4653,6 @@
"integrity": "sha512-ds5mXEqTJ6oxRoqjhWDU83OgzAYjwsCV8Lo/N+oRsNDmx/ZDpqalmrtgOMkHwxsG0iI//3BwWAErYRHtgn0dZw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/ast": "1.14.1",
"@webassemblyjs/helper-buffer": "1.14.1",
@@ -4630,7 +4666,6 @@
"integrity": "sha512-4LtOzh58S/5lX4ITKxnAK2USuNEvpdVV9AlgGQb8rJDHaLeHciwG4zlGr0j/SNWlr7x3vO1lDEsuePvtcDNCkw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@xtuc/ieee754": "^1.2.0"
}
@@ -4641,7 +4676,6 @@
"integrity": "sha512-Lde1oNoIdzVzdkNEAWZ1dZ5orIbff80YPdHx20mrHwHrVNNTjNr8E3xz9BdpcGqRQbAEa+fkrCb+fRFTl/6sQw==",
"dev": true,
"license": "Apache-2.0",
"peer": true,
"dependencies": {
"@xtuc/long": "4.2.2"
}
@@ -4651,8 +4685,7 @@
"resolved": "https://registry.npmjs.org/@webassemblyjs/utf8/-/utf8-1.13.2.tgz",
"integrity": "sha512-3NQWGjKTASY1xV5m7Hr0iPeXD9+RDobLll3T9d2AO+g3my8xy5peVyjSag4I50mR1bBSN/Ct12lo+R9tJk0NZQ==",
"dev": true,
"license": "MIT",
"peer": true
"license": "MIT"
},
"node_modules/@webassemblyjs/wasm-edit": {
"version": "1.14.1",
@@ -4660,7 +4693,6 @@
"integrity": "sha512-RNJUIQH/J8iA/1NzlE4N7KtyZNHi3w7at7hDjvRNm5rcUXa00z1vRz3glZoULfJ5mpvYhLybmVcwcjGrC1pRrQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/ast": "1.14.1",
"@webassemblyjs/helper-buffer": "1.14.1",
@@ -4678,7 +4710,6 @@
"integrity": "sha512-AmomSIjP8ZbfGQhumkNvgC33AY7qtMCXnN6bL2u2Js4gVCg8fp735aEiMSBbDR7UQIj90n4wKAFUSEd0QN2Ukg==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/ast": "1.14.1",
"@webassemblyjs/helper-wasm-bytecode": "1.13.2",
@@ -4693,7 +4724,6 @@
"integrity": "sha512-PTcKLUNvBqnY2U6E5bdOQcSM+oVP/PmrDY9NzowJjislEjwP/C4an2303MCVS2Mg9d3AJpIGdUFIQQWbPds0Sw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/ast": "1.14.1",
"@webassemblyjs/helper-buffer": "1.14.1",
@@ -4707,7 +4737,6 @@
"integrity": "sha512-JLBl+KZ0R5qB7mCnud/yyX08jWFw5MsoalJ1pQ4EdFlgj9VdXKGuENGsiCIjegI1W7p91rUlcB/LB5yRJKNTcQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/ast": "1.14.1",
"@webassemblyjs/helper-api-error": "1.13.2",
@@ -4723,7 +4752,6 @@
"integrity": "sha512-kPSSXE6De1XOR820C90RIo2ogvZG+c3KiHzqUoO/F34Y2shGzesfqv7o57xrxovZJH/MetF5UjroJ/R/3isoiw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@webassemblyjs/ast": "1.14.1",
"@xtuc/long": "4.2.2"
@@ -4734,16 +4762,14 @@
"resolved": "https://registry.npmjs.org/@xtuc/ieee754/-/ieee754-1.2.0.tgz",
"integrity": "sha512-DX8nKgqcGwsc0eJSqYt5lwP4DH5FlHnmuWWBRy7X0NcaGR0ZtuyeESgMwTYVEtxmsNGY+qit4QYT/MIYTOTPeA==",
"dev": true,
"license": "BSD-3-Clause",
"peer": true
"license": "BSD-3-Clause"
},
"node_modules/@xtuc/long": {
"version": "4.2.2",
"resolved": "https://registry.npmjs.org/@xtuc/long/-/long-4.2.2.tgz",
"integrity": "sha512-NuHqBY1PB/D8xU6s/thBgOAiAP7HOYDQ32+BFZILJ8ivkUkAHQnWfn6WhL79Owj1qmUnoN/YPhktdIoucipkAQ==",
"dev": true,
"license": "Apache-2.0",
"peer": true
"license": "Apache-2.0"
},
"node_modules/@zip.js/zip.js": {
"version": "2.7.45",
@@ -4786,6 +4812,7 @@
"integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==",
"dev": true,
"license": "MIT",
"peer": true,
"bin": {
"acorn": "bin/acorn"
},
@@ -4799,7 +4826,6 @@
"integrity": "sha512-wKmbr/DDiIXzEOiWrTTUcDm24kQ2vGfZQvM2fwg2vXqR5uW6aapr7ObPtj1th32b9u90/Pf4AItvdTh42fBmVQ==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=10.13.0"
},
@@ -5914,6 +5940,7 @@
}
],
"license": "MIT",
"peer": true,
"dependencies": {
"baseline-browser-mapping": "^2.8.9",
"caniuse-lite": "^1.0.30001746",
@@ -6313,7 +6340,6 @@
"integrity": "sha512-YclTJey34KUm5jB1aEJCq807bSievi7Nb/TU4Gu504fUYi3jw3KCIaH6L7nFWQhdEgH3V+wCh+kKD1P5cXnfxw==",
"dev": true,
"optional": true,
"peer": true,
"dependencies": {
"@types/node": "*",
"escape-string-regexp": "^4.0.0",
@@ -6333,7 +6359,6 @@
"integrity": "sha512-TtpcNJ3XAzx3Gq8sWRzJaVajRs0uVxA2YAkdb1jm2YkPz4G6egUFAyA3n5vtEIZefPk5Wa4UXbKuS5fKkJWdgA==",
"dev": true,
"optional": true,
"peer": true,
"engines": {
"node": ">=10"
},
@@ -6346,7 +6371,6 @@
"resolved": "https://registry.npmjs.org/chrome-trace-event/-/chrome-trace-event-1.0.4.tgz",
"integrity": "sha512-rNjApaLzuwaOTjCiT8lSDdGN1APCiqkChLMJxJPWLunPAt5fy8xgU9/jNOchV84wfIxrA0lRQB7oCT8jrn/wrQ==",
"dev": true,
"peer": true,
"engines": {
"node": ">=6.0"
}
@@ -7236,7 +7260,6 @@
"dev": true,
"license": "MIT",
"optional": true,
"peer": true,
"dependencies": {
"@types/node": "^22.2.0",
"@wdio/config": "8.46.0",
@@ -7262,7 +7285,8 @@
"resolved": "https://registry.npmjs.org/devtools-protocol/-/devtools-protocol-0.0.1400418.tgz",
"integrity": "sha512-U8j75zDOXF8IP3o0Cgb7K4tFA9uUHEOru2Wx64+EUqL4LNOh9dRe1i8WKR1k3mSpjcCe3aIkTDvEwq0YkI4hfw==",
"dev": true,
"license": "BSD-3-Clause"
"license": "BSD-3-Clause",
"peer": true
},
"node_modules/devtools/node_modules/@types/node": {
"version": "22.18.9",
@@ -7271,7 +7295,6 @@
"dev": true,
"license": "MIT",
"optional": true,
"peer": true,
"dependencies": {
"undici-types": "~6.21.0"
}
@@ -7283,7 +7306,6 @@
"dev": true,
"license": "MIT",
"optional": true,
"peer": true,
"dependencies": {
"@types/node": "^22.2.0"
},
@@ -7297,7 +7319,6 @@
"integrity": "sha512-LpB/54B+/2J5hqQ7imZHfdU31OlgQqx7ZicVlkm9kzg9/w8GKLEcFfJl/t7DCEDueOyBAD6zCCwTO6Fzs0NoEQ==",
"dev": true,
"optional": true,
"peer": true,
"engines": {
"node": ">=16"
}
@@ -7308,8 +7329,7 @@
"integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==",
"dev": true,
"license": "MIT",
"optional": true,
"peer": true
"optional": true
},
"node_modules/devtools/node_modules/uuid": {
"version": "10.0.0",
@@ -7322,7 +7342,6 @@
],
"license": "MIT",
"optional": true,
"peer": true,
"bin": {
"uuid": "dist/bin/uuid"
}
@@ -7333,7 +7352,6 @@
"integrity": "sha512-GlaYyEb07DPxYCKhKzplCWBJtvxZcZMrL+4UkrTSJHHPyZU4mYYTv3qaOe77H7EODLSSopAUFAc6W8U4yqvscg==",
"dev": true,
"optional": true,
"peer": true,
"dependencies": {
"isexe": "^3.1.1"
},
@@ -7721,8 +7739,7 @@
"version": "1.5.4",
"resolved": "https://registry.npmjs.org/es-module-lexer/-/es-module-lexer-1.5.4.tgz",
"integrity": "sha512-MVNK56NiMrOwitFB7cqDwq0CQutbw+0BvLshJSse0MUNU+y1FC3bUS/AQg7oUng+/wKrrki7JfmwtVHkVfPLlw==",
"dev": true,
"peer": true
"dev": true
},
"node_modules/es-object-atoms": {
"version": "1.1.1",
@@ -7861,7 +7878,6 @@
"resolved": "https://registry.npmjs.org/eslint-scope/-/eslint-scope-5.1.1.tgz",
"integrity": "sha512-2NxwbF/hZ0KpepYN0cNbo+FN6XoK7GaHlQhgx/hIZl6Va0bF45RQOOwhLIy8lQDbuCiadSLCBnH2CFYquit5bw==",
"dev": true,
"peer": true,
"dependencies": {
"esrecurse": "^4.3.0",
"estraverse": "^4.1.1"
@@ -7884,7 +7900,6 @@
"resolved": "https://registry.npmjs.org/esrecurse/-/esrecurse-4.3.0.tgz",
"integrity": "sha512-KmfKL3b6G+RXvP8N1vr3Tq1kL/oCFgn2NYXEtqP8/L3pKapUA4G8cFVaoF3SU323CD4XypR/ffioHmkti6/Tag==",
"dev": true,
"peer": true,
"dependencies": {
"estraverse": "^5.2.0"
},
@@ -7897,7 +7912,6 @@
"resolved": "https://registry.npmjs.org/estraverse/-/estraverse-5.3.0.tgz",
"integrity": "sha512-MMdARuVEQziNTeJD8DgMqmhwR11BRQ/cBP+pLtYdSTnf3MIO8fFeiINEbX36ZdNlfU/7A9f3gUw49B3oQsvwBA==",
"dev": true,
"peer": true,
"engines": {
"node": ">=4.0"
}
@@ -7907,7 +7921,6 @@
"resolved": "https://registry.npmjs.org/estraverse/-/estraverse-4.3.0.tgz",
"integrity": "sha512-39nnKffWz8xN1BU/2c79n9nB9HDzo0niYUqx6xyqUnyoAnQyyWpOTdZEeiCch8BBu515t4wp9ZmgVfVhn9EBpw==",
"dev": true,
"peer": true,
"engines": {
"node": ">=4.0"
}
@@ -9053,8 +9066,7 @@
"resolved": "https://registry.npmjs.org/glob-to-regexp/-/glob-to-regexp-0.4.1.tgz",
"integrity": "sha512-lkX1HJXwyMcprw/5YUZc2s7DrpAiHB21/V+E1rHUrVNokkvB6bqMzT0VfV6/86ZNabt1k14YOIaT7nDvOX3Iiw==",
"dev": true,
"license": "BSD-2-Clause",
"peer": true
"license": "BSD-2-Clause"
},
"node_modules/globals": {
"version": "11.12.0",
@@ -9748,7 +9760,6 @@
"integrity": "sha512-F+i2BKsFrH66iaUFc0woD8sLy8getkwTwtOBjvs56Cx4CgJDeKQeqfz8wAYiSb8JOprWhHH5p77PbmYCvvUuXQ==",
"dev": true,
"optional": true,
"peer": true,
"bin": {
"is-docker": "cli.js"
},
@@ -9904,7 +9915,6 @@
"integrity": "sha512-fKzAra0rGJUUBwGBgNkHZuToZcn+TtXHpeCgmkMJMMYx1sQDYaCSyjJBSCa2nH1DGm7s3n1oBnohoVTBaN7Lww==",
"dev": true,
"optional": true,
"peer": true,
"dependencies": {
"is-docker": "^2.0.0"
},
@@ -10356,7 +10366,6 @@
"integrity": "sha512-ioBrW3s2i97noEmnXxmUq7cjIcVRjT5HBpAYy8zE11CxU9HqlWHHeRxfeN1tn8F7OEMVPIC9x1f8t3Z7US9ehQ==",
"dev": true,
"optional": true,
"peer": true,
"dependencies": {
"debug": "^2.6.9",
"marky": "^1.2.2"
@@ -10368,7 +10377,6 @@
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"dev": true,
"optional": true,
"peer": true,
"dependencies": {
"ms": "2.0.0"
}
@@ -10378,8 +10386,7 @@
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==",
"dev": true,
"optional": true,
"peer": true
"optional": true
},
"node_modules/lines-and-columns": {
"version": "1.2.4",
@@ -10399,7 +10406,6 @@
"resolved": "https://registry.npmjs.org/loader-runner/-/loader-runner-4.3.0.tgz",
"integrity": "sha512-3R/1M+yS3j5ou80Me59j7F9IMs4PXs3VqRrm0TU3AbKPxlmpoY1TNscJV/oGJXo8qCatFGTfDbY6W6ipGOYXfg==",
"dev": true,
"peer": true,
"engines": {
"node": ">=6.11.5"
}
@@ -10639,8 +10645,7 @@
"resolved": "https://registry.npmjs.org/marky/-/marky-1.2.5.tgz",
"integrity": "sha512-q9JtQJKjpsVxCRVgQ+WapguSbKC3SQ5HEzFGPAJMStgh3QjCawp00UKv3MTTAArTmGmmPUvllHZoNbZ3gs0I+Q==",
"dev": true,
"optional": true,
"peer": true
"optional": true
},
"node_modules/math-intrinsics": {
"version": "1.1.0",
@@ -11111,8 +11116,7 @@
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/neo-async/-/neo-async-2.6.2.tgz",
"integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==",
"dev": true,
"peer": true
"dev": true
},
"node_modules/netmask": {
"version": "2.0.2",
@@ -11973,6 +11977,7 @@
"integrity": "sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"pg-connection-string": "^2.9.1",
"pg-pool": "^3.10.1",
@@ -13209,6 +13214,7 @@
"integrity": "sha512-fS6iqSPZDs3dr/y7Od6y5nha8dW1YnbgtsyotCVvoFGKbERG++CVRFv1meyGDE1SNItQA8BrnCw7ScdAhRJ3XQ==",
"dev": true,
"license": "MIT",
"peer": true,
"bin": {
"rollup": "dist/bin/rollup"
},
@@ -13362,6 +13368,7 @@
"resolved": "https://registry.npmjs.org/ajv/-/ajv-8.16.0.tgz",
"integrity": "sha512-F0twR8U1ZU67JIEtekUcLkXkoO5mMMmgGD8sK/xUFzJ805jxHQl92hImFAqqXMyMYjSPOyUPAwHYhB72g5sTXw==",
"dev": true,
"peer": true,
"dependencies": {
"fast-deep-equal": "^3.1.3",
"json-schema-traverse": "^1.0.0",
@@ -14480,7 +14487,6 @@
"integrity": "sha512-vkZjpUjb6OMS7dhV+tILUW6BhpDR7P2L/aQSAv+Uwk+m8KATX9EccViHTJR2qDtACKPIYndLGCyl3FMo+r2LMw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@jridgewell/trace-mapping": "^0.3.25",
"jest-worker": "^27.4.5",
@@ -14516,7 +14522,6 @@
"integrity": "sha512-7vuh85V5cdDofPyxn58nrPjBktZo0u9x1g8WtjQol+jZDaE+fhN+cIvTj11GndBnMnyfrUOG1sZQxCdjKh+DKg==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@types/node": "*",
"merge-stream": "^2.0.0",
@@ -14897,6 +14902,7 @@
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.5.3.tgz",
"integrity": "sha512-/hreyEujaB0w76zKo6717l3L0o/qEUtRgdvUBvlkhoWeOVMjMuHNHk0BRBzikzuGDqNmPQbg5ifMEqsHLiIUcQ==",
"dev": true,
"peer": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
@@ -14925,7 +14931,6 @@
}
],
"optional": true,
"peer": true,
"engines": {
"node": "*"
}
@@ -15213,7 +15218,6 @@
"integrity": "sha512-c5EGNOiyxxV5qmTtAB7rbiXxi1ooX1pQKMLX/MIabJjRA0SJBQOjKF+KSVfHkr9U1cADPon0mRiVe/riyaiDUA==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"glob-to-regexp": "^0.4.1",
"graceful-fs": "^4.1.2"
@@ -15669,7 +15673,6 @@
"integrity": "sha512-yd1RBzSGanHkitROoPFd6qsrxt+oFhg/129YzheDGqeustzX0vTZJZsSsQjVQC4yzBQ56K55XU8gaNCtIzOnTg==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=10.13.0"
}
@@ -16115,7 +16118,7 @@
},
"packages/socket.io-cluster-adapter": {
"name": "@socket.io/cluster-adapter",
"version": "0.2.2",
"version": "0.3.0",
"license": "MIT",
"dependencies": {
"debug": "~4.4.1"
@@ -16177,6 +16180,15 @@
"@msgpack/msgpack": "^2.7.0",
"debug": "~4.4.1"
}
},
"packages/socket.io-redis-streams-emitter": {
"name": "@socket.io/redis-streams-emitter",
"version": "0.0.1",
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.4.1"
}
}
}
}

View File

@@ -12,7 +12,8 @@
"packages/socket.io-parser",
"packages/socket.io-client",
"packages/socket.io",
"packages/socket.io-postgres-emitter"
"packages/socket.io-postgres-emitter",
"packages/socket.io-redis-streams-emitter"
],
"overrides": {
"@types/estree": "0.0.52",
@@ -32,6 +33,7 @@
"@rollup/plugin-node-resolve": "^15.2.3",
"@sinonjs/fake-timers": "^11.2.2",
"@socket.io/postgres-adapter": "^0.1.0",
"@socket.io/redis-streams-adapter": "~0.2.2",
"@types/debug": "^4.1.12",
"@types/expect.js": "^0.3.32",
"@types/mocha": "^10.0.7",

View File

@@ -0,0 +1,7 @@
Copyright (c) 2025-present Guillermo Rauch and Socket.IO contributors
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,220 @@
# Socket.IO Redis Streams emitter
The `@socket.io/redis-streams-emitter` package allows you to easily communicate with a group of Socket.IO servers from another Node.js process (server-side).
It must be used in conjunction with [`@socket.io/redis-streams-adapter`](https://github.com/socketio/socket.io-redis-streams-adapter).
**Table of contents**
<!-- TOC -->
* [Installation](#installation)
* [Usage](#usage)
* [With the `redis` package](#with-the-redis-package)
* [With the `redis` package and a Redis cluster](#with-the-redis-package-and-a-redis-cluster)
* [With the `ioredis` package](#with-the-ioredis-package)
* [With the `ioredis` package and a Redis cluster](#with-the-ioredis-package-and-a-redis-cluster)
* [Options](#options)
* [API](#api)
* [`Emitter(redisClient[, nsp][, opts])`](#emitterredisclient-nsp-opts)
* [`Emitter#to(room:string):BroadcastOperator`](#emittertoroomstringbroadcastoperator)
* [`Emitter#in(room:string):BroadcastOperator`](#emitterinroomstringbroadcastoperator)
* [`Emitter#except(room:string):BroadcastOperator`](#emitterexceptroomstringbroadcastoperator)
* [`Emitter#of(namespace:string):Emitter`](#emitterofnamespacestringemitter)
* [`Emitter#socketsJoin(rooms:string|string[])`](#emittersocketsjoinroomsstringstring)
* [`Emitter#socketsLeave(rooms:string|string[])`](#emittersocketsleaveroomsstringstring)
* [`Emitter#disconnectSockets(close:boolean)`](#emitterdisconnectsocketscloseboolean)
* [`Emitter#serverSideEmit(ev:string[,...args:any[]])`](#emitterserversideemitevstringargsany)
* [License](#license)
<!-- TOC -->
## Installation
```
npm install @socket.io/redis-streams-emitter redis
```
## Usage
### With the `redis` package
```js
import { createClient } from "redis";
import { Emitter } from "@socket.io/redis-streams-emitter";
const redisClient = createClient({
url: "redis://localhost:6379"
});
await redisClient.connect();
const io = new Emitter(redisClient);
setInterval(() => {
io.emit("ping", new Date());
}, 1000);
```
### With the `redis` package and a Redis cluster
```js
import { createCluster } from "redis";
import { Emitter } from "@socket.io/redis-streams-emitter";
const redisClient = createCluster({
rootNodes: [
{
url: "redis://localhost:7000",
},
{
url: "redis://localhost:7001",
},
{
url: "redis://localhost:7002",
},
],
});
await redisClient.connect();
const io = new Emitter(redisClient);
setInterval(() => {
io.emit("ping", new Date());
}, 1000);
```
### With the `ioredis` package
```js
import { Redis } from "ioredis";
import { Emitter } from "@socket.io/redis-streams-emitter";
const redisClient = new Redis();
const io = new Emitter(redisClient);
setInterval(() => {
io.emit("ping", new Date());
}, 1000);
```
### With the `ioredis` package and a Redis cluster
```js
import { Cluster } from "ioredis";
import { Emitter } from "@socket.io/redis-streams-emitter";
const redisClient = new Cluster([
{
host: "localhost",
port: 7000,
},
{
host: "localhost",
port: 7001,
},
{
host: "localhost",
port: 7002,
},
]);
const io = new Emitter(redisClient);
setInterval(() => {
io.emit("ping", new Date());
}, 1000);
```
## Options
| Name | Description | Default value |
|--------------|--------------------------------------------------------------------|---------------|
| `streamName` | The name of the Redis stream. | `socket.io` |
| `maxLen` | The maximum size of the stream. Almost exact trimming (~) is used. | `10_000` |
## API
### `Emitter(redisClient[, nsp][, opts])`
```js
const io = new Emitter(redisClient);
```
### `Emitter#to(room:string):BroadcastOperator`
### `Emitter#in(room:string):BroadcastOperator`
Specifies a specific `room` that you want to emit to.
```js
io.to("room1").emit("hello");
```
### `Emitter#except(room:string):BroadcastOperator`
Specifies a specific `room` that you want to exclude from broadcasting.
```js
io.except("room2").emit("hello");
```
### `Emitter#of(namespace:string):Emitter`
Specifies a specific namespace that you want to emit to.
```js
const customNamespace = io.of("/custom");
customNamespace.emit("hello");
```
### `Emitter#socketsJoin(rooms:string|string[])`
Makes the matching socket instances join the specified rooms:
```js
// make all Socket instances join the "room1" room
io.socketsJoin("room1");
// make all Socket instances of the "admin" namespace in the "room1" room join the "room2" room
io.of("/admin").in("room1").socketsJoin("room2");
```
### `Emitter#socketsLeave(rooms:string|string[])`
Makes the matching socket instances leave the specified rooms:
```js
// make all Socket instances leave the "room1" room
io.socketsLeave("room1");
// make all Socket instances of the "admin" namespace in the "room1" room leave the "room2" room
io.of("/admin").in("room1").socketsLeave("room2");
```
### `Emitter#disconnectSockets(close:boolean)`
Makes the matching socket instances disconnect:
```js
// make all Socket instances disconnect
io.disconnectSockets();
// make all Socket instances of the "admin" namespace in the "room1" room disconnect
io.of("/admin").in("room1").disconnectSockets();
// this also works with a single socket ID
io.of("/admin").in(theSocketId).disconnectSockets();
```
### `Emitter#serverSideEmit(ev:string[,...args:any[]])`
Emits an event that will be received by each Socket.IO server of the cluster.
```js
io.serverSideEmit("ping");
```
## License
[MIT](./LICENSE)

View File

@@ -0,0 +1,15 @@
services:
redis:
image: redis:5
ports:
- "6379:6379"
redis-cluster:
image: grokzen/redis-cluster:7.0.10
ports:
- "7000-7005:7000-7005"
valkey:
image: valkey/valkey:8
ports:
- "6389:6379"

View File

@@ -0,0 +1,496 @@
import debugModule from "debug";
import type {
DefaultEventsMap,
EventNames,
EventParams,
EventsMap,
TypedEventBroadcaster,
} from "./typed-events";
import { encode } from "@msgpack/msgpack";
import { hasBinary, XADD } from "./util";
import { ClusterMessage, MessageType, BroadcastFlags } from "socket.io-adapter";
const debug = debugModule("socket.io-redis-streams-emitter");
const EMITTER_UID = "emitter";
type DistributiveOmit<T, K extends keyof any> = T extends any
? Omit<T, K>
: never;
// TODO move to the socket.io-adapter package
abstract class BaseEmitter<
EmitEvents extends EventsMap = DefaultEventsMap,
ServerSideEvents extends EventsMap = DefaultEventsMap,
> {
protected abstract publish(
message: DistributiveOmit<ClusterMessage, "uid" | "nsp">,
): void;
/**
* Emits to all clients.
*
* @return Always true
* @public
*/
public emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
): true {
return this.newBroadcastOperator().emit(ev, ...args);
}
/**
* Targets a room when emitting.
*
* @param room
* @return BroadcastOperator
* @public
*/
public to(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.newBroadcastOperator().to(room);
}
/**
* Targets a room when emitting.
*
* @param room
* @return BroadcastOperator
* @public
*/
public in(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.newBroadcastOperator().in(room);
}
/**
* Excludes a room when emitting.
*
* @param room
* @return BroadcastOperator
* @public
*/
public except(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.newBroadcastOperator().except(room);
}
/**
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
* receive messages (because of network slowness or other issues, or because theyre connected through long polling
* and is in the middle of a request-response cycle).
*
* @return BroadcastOperator
* @public
*/
public get volatile(): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.newBroadcastOperator().volatile;
}
/**
* Sets the compress flag.
*
* @param compress - if `true`, compresses the sending data
* @return BroadcastOperator
* @public
*/
public compress(
compress: boolean,
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.newBroadcastOperator().compress(compress);
}
/**
* Makes the matching socket instances join the specified rooms
*
* @param rooms
* @public
*/
public socketsJoin(rooms: string | string[]): void {
return this.newBroadcastOperator().socketsJoin(rooms);
}
/**
* Makes the matching socket instances leave the specified rooms
*
* @param rooms
* @public
*/
public socketsLeave(rooms: string | string[]): void {
return this.newBroadcastOperator().socketsLeave(rooms);
}
/**
* Makes the matching socket instances disconnect
*
* @param close - whether to close the underlying connection
* @public
*/
public disconnectSockets(close: boolean = false): void {
return this.newBroadcastOperator().disconnectSockets(close);
}
/**
* Send a packet to the Socket.IO servers in the cluster
*
* @param ev - the event name
* @param args - any number of serializable arguments
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): void {
return this.newBroadcastOperator().serverSideEmit(ev, ...args);
}
private newBroadcastOperator() {
return new BroadcastOperator<EmitEvents, ServerSideEvents>((msg) =>
this.publish(msg),
);
}
}
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set(<const>[
"connect",
"connect_error",
"disconnect",
"disconnecting",
"newListener",
"removeListener",
]);
export class BroadcastOperator<
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap,
> implements TypedEventBroadcaster<EmitEvents>
{
constructor(
private readonly publish: (
message: DistributiveOmit<ClusterMessage, "uid" | "nsp">,
) => void,
private readonly rooms: Set<string> = new Set<string>(),
private readonly exceptRooms: Set<string> = new Set<string>(),
private readonly flags: BroadcastFlags = {},
) {}
/**
* Targets a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public to(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
const rooms = new Set(this.rooms);
if (Array.isArray(room)) {
room.forEach((r) => rooms.add(r));
} else {
rooms.add(room);
}
return new BroadcastOperator(
this.publish,
rooms,
this.exceptRooms,
this.flags,
);
}
/**
* Targets a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public in(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.to(room);
}
/**
* Excludes a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public except(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
const exceptRooms = new Set(this.exceptRooms);
if (Array.isArray(room)) {
room.forEach((r) => exceptRooms.add(r));
} else {
exceptRooms.add(room);
}
return new BroadcastOperator(
this.publish,
this.rooms,
exceptRooms,
this.flags,
);
}
/**
* Sets the compress flag.
*
* @param compress - if `true`, compresses the sending data
* @return a new BroadcastOperator instance
* @public
*/
public compress(
compress: boolean,
): BroadcastOperator<EmitEvents, ServerSideEvents> {
const flags = Object.assign({}, this.flags, { compress });
return new BroadcastOperator(
this.publish,
this.rooms,
this.exceptRooms,
flags,
);
}
/**
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
* receive messages (because of network slowness or other issues, or because theyre connected through long polling
* and is in the middle of a request-response cycle).
*
* @return a new BroadcastOperator instance
* @public
*/
public get volatile(): BroadcastOperator<EmitEvents, ServerSideEvents> {
const flags = Object.assign({}, this.flags, { volatile: true });
return new BroadcastOperator(
this.publish,
this.rooms,
this.exceptRooms,
flags,
);
}
/**
* Emits to all clients.
*
* @return Always true
* @public
*/
public emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
): true {
if (RESERVED_EVENTS.has(ev)) {
throw new Error(`"${String(ev)}" is a reserved event name`);
}
// set up packet object
const data = [ev, ...args];
const packet = {
type: 2, // EVENT
data: data,
};
const opts = {
rooms: [...this.rooms],
flags: this.flags,
except: [...this.exceptRooms],
};
this.publish({
type: MessageType.BROADCAST,
data: {
packet,
opts,
},
});
return true;
}
/**
* Makes the matching socket instances join the specified rooms
*
* @param rooms
* @public
*/
public socketsJoin(rooms: string | string[]): void {
this.publish({
type: MessageType.SOCKETS_JOIN,
data: {
opts: {
rooms: [...this.rooms],
except: [...this.exceptRooms],
flags: {},
},
rooms: Array.isArray(rooms) ? rooms : [rooms],
},
});
}
/**
* Makes the matching socket instances leave the specified rooms
*
* @param rooms
* @public
*/
public socketsLeave(rooms: string | string[]): void {
this.publish({
type: MessageType.SOCKETS_LEAVE,
data: {
opts: {
rooms: [...this.rooms],
except: [...this.exceptRooms],
flags: {},
},
rooms: Array.isArray(rooms) ? rooms : [rooms],
},
});
}
/**
* Makes the matching socket instances disconnect
*
* @param close - whether to close the underlying connection
* @public
*/
public disconnectSockets(close: boolean = false): void {
this.publish({
type: MessageType.DISCONNECT_SOCKETS,
data: {
opts: {
rooms: [...this.rooms],
except: [...this.exceptRooms],
flags: {},
},
close,
},
});
}
/**
* Send a packet to the Socket.IO servers in the cluster
*
* @param ev - the event name
* @param args - any number of serializable arguments
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): void {
const withAck = args.length && typeof args[args.length - 1] === "function";
if (withAck) {
throw new Error("Acknowledgements are not supported");
}
this.publish({
type: MessageType.SERVER_SIDE_EMIT,
data: {
packet: [ev, ...args],
},
});
}
}
function flattenPayload(message: ClusterMessage) {
const rawMessage = {
uid: message.uid,
nsp: message.nsp,
type: message.type.toString(),
data: undefined as string | undefined,
};
// @ts-expect-error
const data = message.data;
if (data) {
const mayContainBinary = [
MessageType.BROADCAST,
MessageType.FETCH_SOCKETS_RESPONSE,
MessageType.SERVER_SIDE_EMIT,
MessageType.SERVER_SIDE_EMIT_RESPONSE,
MessageType.BROADCAST_ACK,
].includes(message.type);
if (mayContainBinary && hasBinary(data)) {
rawMessage.data = Buffer.from(encode(data)).toString("base64");
} else {
rawMessage.data = JSON.stringify(data);
}
}
return rawMessage;
}
export interface RedisStreamsEmitterOptions {
/**
* The name of the Redis stream.
* @default "socket.io"
*/
streamName?: string;
/**
* The maximum size of the stream. Almost exact trimming (~) is used.
* @default 10_000
*/
maxLen?: number;
}
export class Emitter<
EmitEvents extends EventsMap = DefaultEventsMap,
ServerSideEvents extends EventsMap = DefaultEventsMap,
> extends BaseEmitter<EmitEvents, ServerSideEvents> {
readonly #redisClient: any;
readonly #opts: Required<RedisStreamsEmitterOptions>;
readonly #nsp: string;
constructor(
redisClient: any,
opts: RedisStreamsEmitterOptions = {},
nsp = "/",
) {
super();
this.#redisClient = redisClient;
this.#opts = Object.assign(
{
streamName: "socket.io",
maxLen: 10_000,
},
opts,
);
this.#nsp = nsp;
}
public of(nsp: string) {
return new Emitter(this.#redisClient, this.#opts, nsp);
}
protected override publish(
message: DistributiveOmit<ClusterMessage, "uid" | "nsp">,
) {
(message as ClusterMessage).uid = EMITTER_UID;
(message as ClusterMessage).nsp = this.#nsp;
debug(
"publishing message %s to stream %s",
message.type,
this.#opts.streamName,
);
if (message.type === MessageType.BROADCAST) {
// @ts-expect-error FIXME untyped packet object
message.data.packet.nsp = this.#nsp;
}
return XADD(
this.#redisClient,
this.#opts.streamName,
flattenPayload(message as ClusterMessage),
this.#opts.maxLen,
);
}
}

View File

@@ -0,0 +1,37 @@
/**
* An events map is an interface that maps event names to their value, which
* represents the type of the `on` listener.
*/
export interface EventsMap {
[event: string]: any;
}
/**
* The default events map, used if no EventsMap is given. Using this EventsMap
* is equivalent to accepting all event names, and any data.
*/
export interface DefaultEventsMap {
[event: string]: (...args: any[]) => void;
}
/**
* Returns a union type containing all the keys of an event map.
*/
export type EventNames<Map extends EventsMap> = keyof Map & (string | symbol);
/** The tuple type representing the parameters of an event listener */
export type EventParams<
Map extends EventsMap,
Ev extends EventNames<Map>,
> = Parameters<Map[Ev]>;
/**
* Interface for classes that aren't `EventEmitter`s, but still expose a
* strictly typed `emit` method.
*/
export interface TypedEventBroadcaster<EmitEvents extends EventsMap> {
emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
): boolean;
}

View File

@@ -0,0 +1,68 @@
export function hasBinary(obj: any, toJSON?: boolean): boolean {
if (!obj || typeof obj !== "object") {
return false;
}
if (obj instanceof ArrayBuffer || ArrayBuffer.isView(obj)) {
return true;
}
if (Array.isArray(obj)) {
for (let i = 0, l = obj.length; i < l; i++) {
if (hasBinary(obj[i])) {
return true;
}
}
return false;
}
for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key) && hasBinary(obj[key])) {
return true;
}
}
if (obj.toJSON && typeof obj.toJSON === "function" && !toJSON) {
return hasBinary(obj.toJSON(), true);
}
return false;
}
/**
* Whether the client comes from the `redis` package
*
* @param redisClient
*
* @see https://github.com/redis/node-redis
*/
function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}
/**
* @see https://redis.io/commands/xadd/
*/
export function XADD(
redisClient: any,
streamName: string,
payload: any,
maxLenThreshold: number,
) {
if (isRedisV4Client(redisClient)) {
return redisClient.xAdd(streamName, "*", payload, {
TRIM: {
strategy: "MAXLEN",
strategyModifier: "~",
threshold: maxLenThreshold,
},
});
} else {
const args = [streamName, "MAXLEN", "~", maxLenThreshold, "*"];
Object.keys(payload).forEach((k) => {
args.push(k, payload[k]);
});
return redisClient.xadd.call(redisClient, args);
}
}

View File

@@ -0,0 +1,38 @@
{
"name": "@socket.io/redis-streams-emitter",
"version": "0.0.1",
"description": "The Socket.IO Redis streams emitter, allowing to communicate with a group of Socket.IO servers from another Node.js process",
"license": "MIT",
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-redis-streams-emitter#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/socketio/socket.io.git"
},
"files": [
"dist/"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"compile": "rimraf ./dist && tsc",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
"format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'",
"prepack": "npm run compile",
"test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:ioredis-standalone",
"test:redis-standalone": "nyc mocha --import=tsx test/**/*.ts",
"test:redis-cluster": "REDIS_CLUSTER=1 mocha --import=tsx test/**/*.ts",
"test:ioredis-standalone": "REDIS_LIB=ioredis mocha --import=tsx test/**/*.ts",
"test:ioredis-cluster": "REDIS_LIB=ioredis REDIS_CLUSTER=1 mocha --import=tsx test/**/*.ts",
"test:valkey-standalone": "VALKEY=1 mocha --import=tsx test/**/*.ts"
},
"dependencies": {
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.4.1"
},
"keywords": [
"socket.io",
"redis",
"redis-streams",
"emitter"
]
}

View File

@@ -0,0 +1,225 @@
import { type Server, type Socket as ServerSocket } from "socket.io";
import { type Socket as ClientSocket } from "socket.io-client";
import expect = require("expect.js");
import { times, sleep, setup, initRedisClient } from "./util";
import { Emitter } from "../lib";
const PROPAGATION_DELAY_IN_MS = 100;
describe("@socket.io/redis-streams-emitter", () => {
let servers: Server[],
serverSockets: ServerSocket[],
clientSockets: ClientSocket[],
cleanup: () => void,
emitter: Emitter;
beforeEach(async () => {
const testContext = await setup();
servers = testContext.servers;
serverSockets = testContext.serverSockets;
clientSockets = testContext.clientSockets;
const redisClient = await initRedisClient();
emitter = new Emitter(redisClient);
cleanup = () => {
testContext.cleanup();
redisClient.quit();
};
});
afterEach(() => cleanup());
describe("broadcast", function () {
it("broadcasts to all clients", (done) => {
const partialDone = times(3, done);
clientSockets.forEach((clientSocket) => {
clientSocket.on("test", (arg1, arg2, arg3) => {
expect(arg1).to.eql(1);
expect(arg2).to.eql("2");
expect(Buffer.isBuffer(arg3)).to.be(true);
partialDone();
});
});
emitter.emit("test", 1, "2", Buffer.from([3, 4]));
});
it("broadcasts to all clients in a namespace", (done) => {
const partialDone = times(3, () => {
servers.forEach((server) => server.of("/custom").adapter.close());
done();
});
servers.forEach((server) => server.of("/custom"));
const onConnect = times(3, async () => {
await sleep(PROPAGATION_DELAY_IN_MS);
emitter.of("/custom").emit("test");
});
clientSockets.forEach((clientSocket) => {
const socket = clientSocket.io.socket("/custom");
socket.on("connect", onConnect);
socket.on("test", () => {
socket.disconnect();
partialDone();
});
});
});
it("broadcasts to all clients in a room", (done) => {
serverSockets[1].join("room1");
clientSockets[0].on("test", () => {
done(new Error("should not happen"));
});
clientSockets[1].on("test", () => {
done();
});
clientSockets[2].on("test", () => {
done(new Error("should not happen"));
});
emitter.to("room1").emit("test");
});
it("broadcasts to all clients except in room", (done) => {
const partialDone = times(2, done);
serverSockets[1].join("room1");
clientSockets[0].on("test", () => {
partialDone();
});
clientSockets[1].on("test", () => {
done(new Error("should not happen"));
});
clientSockets[2].on("test", () => {
partialDone();
});
emitter.of("/").except("room1").emit("test");
});
});
describe("socketsJoin", () => {
it("makes all socket instances join the specified room", async () => {
emitter.socketsJoin("room1");
await sleep(PROPAGATION_DELAY_IN_MS);
expect(serverSockets[0].rooms.has("room1")).to.be(true);
expect(serverSockets[1].rooms.has("room1")).to.be(true);
expect(serverSockets[2].rooms.has("room1")).to.be(true);
});
it("makes the matching socket instances join the specified room", async () => {
serverSockets[0].join("room1");
serverSockets[2].join("room1");
emitter.in("room1").socketsJoin("room2");
await sleep(PROPAGATION_DELAY_IN_MS);
expect(serverSockets[0].rooms.has("room2")).to.be(true);
expect(serverSockets[1].rooms.has("room2")).to.be(false);
expect(serverSockets[2].rooms.has("room2")).to.be(true);
});
it("makes the given socket instance join the specified room", async () => {
emitter.in(serverSockets[1].id).socketsJoin("room3");
await sleep(PROPAGATION_DELAY_IN_MS);
expect(serverSockets[0].rooms.has("room3")).to.be(false);
expect(serverSockets[1].rooms.has("room3")).to.be(true);
expect(serverSockets[2].rooms.has("room3")).to.be(false);
});
});
describe("socketsLeave", () => {
it("makes all socket instances leave the specified room", async () => {
serverSockets[0].join("room1");
serverSockets[2].join("room1");
emitter.socketsLeave("room1");
await sleep(PROPAGATION_DELAY_IN_MS);
expect(serverSockets[0].rooms.has("room1")).to.be(false);
expect(serverSockets[1].rooms.has("room1")).to.be(false);
expect(serverSockets[2].rooms.has("room1")).to.be(false);
});
it("makes the matching socket instances leave the specified room", async () => {
serverSockets[0].join(["room1", "room2"]);
serverSockets[1].join(["room1", "room2"]);
serverSockets[2].join(["room2"]);
emitter.in("room1").socketsLeave("room2");
await sleep(PROPAGATION_DELAY_IN_MS);
expect(serverSockets[0].rooms.has("room2")).to.be(false);
expect(serverSockets[1].rooms.has("room2")).to.be(false);
expect(serverSockets[2].rooms.has("room2")).to.be(true);
});
it("makes the given socket instance leave the specified room", async () => {
serverSockets[0].join("room3");
serverSockets[1].join("room3");
serverSockets[2].join("room3");
emitter.in(serverSockets[1].id).socketsLeave("room3");
await sleep(PROPAGATION_DELAY_IN_MS);
expect(serverSockets[0].rooms.has("room3")).to.be(true);
expect(serverSockets[1].rooms.has("room3")).to.be(false);
expect(serverSockets[2].rooms.has("room3")).to.be(true);
});
});
describe("disconnectSockets", () => {
it("makes all socket instances disconnect", (done) => {
const partialDone = times(3, done);
clientSockets.forEach((clientSocket) => {
clientSocket.on("disconnect", (reason) => {
expect(reason).to.eql("io server disconnect");
partialDone();
});
});
emitter.disconnectSockets();
});
});
describe("serverSideEmit", () => {
it("sends an event to other server instances", (done) => {
const partialDone = times(3, done);
emitter.serverSideEmit("hello", "world", 1, "2");
servers[0].on("hello", (arg1, arg2, arg3) => {
expect(arg1).to.eql("world");
expect(arg2).to.eql(1);
expect(arg3).to.eql("2");
partialDone();
});
servers[1].on("hello", (arg1, arg2, arg3) => {
partialDone();
});
servers[2].of("/").on("hello", () => {
partialDone();
});
});
});
});

View File

@@ -0,0 +1,156 @@
import { createServer } from "node:http";
import { type AddressInfo } from "node:net";
import { createClient, createCluster } from "redis";
import { Redis, Cluster } from "ioredis";
import { Server, type Socket as ServerSocket } from "socket.io";
import { io as ioc, type Socket as ClientSocket } from "socket.io-client";
import { createAdapter } from "@socket.io/redis-streams-adapter";
export function times(count: number, fn: () => void) {
let i = 0;
return () => {
i++;
if (i === count) {
fn();
} else if (i > count) {
throw new Error(`too many calls: ${i} instead of ${count}`);
}
};
}
export function sleep(duration: number) {
return new Promise((resolve) => setTimeout(resolve, duration));
}
const mode = process.env.REDIS_CLUSTER === "1" ? "cluster" : "standalone";
const lib = process.env.REDIS_LIB || "redis";
console.log(`[INFO] testing in ${mode} mode with ${lib}`);
export async function initRedisClient() {
if (mode === "cluster") {
if (lib === "ioredis") {
return new Cluster([
{
host: "localhost",
port: 7000,
},
{
host: "localhost",
port: 7001,
},
{
host: "localhost",
port: 7002,
},
{
host: "localhost",
port: 7003,
},
{
host: "localhost",
port: 7004,
},
{
host: "localhost",
port: 7005,
},
]);
} else {
const redisClient = createCluster({
rootNodes: [
{
url: "redis://localhost:7000",
},
{
url: "redis://localhost:7001",
},
{
url: "redis://localhost:7002",
},
{
url: "redis://localhost:7003",
},
{
url: "redis://localhost:7004",
},
{
url: "redis://localhost:7005",
},
],
});
await redisClient.connect();
return redisClient;
}
} else {
if (lib === "ioredis") {
return new Redis();
} else {
const port = process.env.VALKEY === "1" ? 6389 : 6379;
const redisClient = createClient({
url: `redis://localhost:${port}`,
});
await redisClient.connect();
return redisClient;
}
}
}
async function init() {
const redisClient = await initRedisClient();
const httpServer = createServer();
const io = new Server(httpServer, {
adapter: createAdapter(redisClient, {
readCount: 1, // return as soon as possible
}),
});
return new Promise<{
io: Server;
socket: ServerSocket;
clientSocket: ClientSocket;
cleanup: () => void;
}>((resolve) => {
httpServer.listen(() => {
const port = (httpServer.address() as AddressInfo).port;
const clientSocket = ioc(`http://localhost:${port}`);
io.on("connection", async (socket) => {
resolve({
io,
socket,
clientSocket,
cleanup: () => {
io.close();
clientSocket.disconnect();
redisClient.quit();
},
});
});
});
});
}
export async function setup() {
const results = await Promise.all([init(), init(), init()]);
const servers = results.map(({ io }) => io);
const serverSockets = results.map(({ socket }) => socket);
const clientSockets = results.map(({ clientSocket }) => clientSocket);
const cleanupMethods = results.map(({ cleanup }) => cleanup);
return {
servers,
serverSockets,
clientSockets,
cleanup: () => {
for (const cleanup of cleanupMethods) {
cleanup();
}
},
};
}

View File

@@ -0,0 +1,13 @@
{
"compilerOptions": {
"outDir": "./dist",
"allowJs": false,
"target": "es2017",
"module": "commonjs",
"declaration": true,
"strict": true
},
"include": [
"./lib/**/*"
]
}