Merge remote-tracking branch 'socket.io-postgres-emitter/main' into monorepo

This commit is contained in:
Damien Arrachequesne
2025-09-04 09:28:36 +02:00
15 changed files with 2703 additions and 21 deletions

View File

@@ -35,6 +35,18 @@ jobs:
ports:
- 6379:6379
postgres:
image: postgres:14
env:
POSTGRES_PASSWORD: changeit
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Checkout repository
uses: actions/checkout@v4

250
package-lock.json generated
View File

@@ -14,7 +14,8 @@
"packages/socket.io-adapter",
"packages/socket.io-parser",
"packages/socket.io-client",
"packages/socket.io"
"packages/socket.io",
"packages/socket.io-postgres-emitter"
],
"devDependencies": {
"@babel/core": "^7.24.7",
@@ -28,10 +29,12 @@
"@rollup/plugin-commonjs": "^26.0.1",
"@rollup/plugin-node-resolve": "^15.2.3",
"@sinonjs/fake-timers": "^11.2.2",
"@socket.io/postgres-adapter": "^0.1.0",
"@types/debug": "^4.1.12",
"@types/expect.js": "^0.3.32",
"@types/mocha": "^10.0.7",
"@types/node": "18.15.3",
"@types/pg": "^8.6.0",
"@types/sinonjs__fake-timers": "^8.1.5",
"@wdio/cli": "^8.39.1",
"@wdio/local-runner": "^8.39.1",
@@ -54,6 +57,7 @@
"mocha": "^10.6.0",
"node-forge": "^1.3.1",
"nyc": "^17.0.0",
"pg": "^8.6.0",
"prettier": "^3.3.2",
"redis": "^4.6.15",
"rimraf": "^6.0.0",
@@ -2074,15 +2078,6 @@
"node": ">=8"
}
},
"node_modules/@istanbuljs/load-nyc-config/node_modules/resolve-from": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/resolve-from/-/resolve-from-5.0.0.tgz",
"integrity": "sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==",
"dev": true,
"engines": {
"node": ">=8"
}
},
"node_modules/@istanbuljs/schema": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@istanbuljs/schema/-/schema-0.1.3.tgz",
@@ -2750,6 +2745,42 @@
"resolved": "packages/socket.io-component-emitter",
"link": true
},
"node_modules/@socket.io/postgres-adapter": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/@socket.io/postgres-adapter/-/postgres-adapter-0.1.1.tgz",
"integrity": "sha512-xUWeLC1Tz771XIEoJ0iuPTRUbEg8aQdyb1CSHXAT2y+WxvtGy7XxDN5kwkGSumyfxb+Qo8RSX8ZfTmbdx+gXXQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "~2.7.0",
"debug": "~4.3.1",
"socket.io-adapter": "~2.3.0"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/@socket.io/postgres-adapter/node_modules/@msgpack/msgpack": {
"version": "2.7.2",
"resolved": "https://registry.npmjs.org/@msgpack/msgpack/-/msgpack-2.7.2.tgz",
"integrity": "sha512-rYEi46+gIzufyYUAoHDnRzkWGxajpD9vVXFQ3g1vbjrBm6P7MBmm+s/fqPa46sxa+8FOUdEuRQKaugo5a4JWpw==",
"dev": true,
"license": "ISC",
"engines": {
"node": ">= 10"
}
},
"node_modules/@socket.io/postgres-adapter/node_modules/socket.io-adapter": {
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.3.3.tgz",
"integrity": "sha512-Qd/iwn3VskrpNO60BeRyCyr8ZWw9CPZyitW4AQwmRZ8zCiyDiL+znRnWX6tDHXnWn1sJrM1+b6Mn6wEDJJ4aYQ==",
"dev": true,
"license": "MIT"
},
"node_modules/@socket.io/postgres-emitter": {
"resolved": "packages/socket.io-postgres-emitter",
"link": true
},
"node_modules/@szmarczak/http-timer": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/@szmarczak/http-timer/-/http-timer-4.0.6.tgz",
@@ -2948,6 +2979,18 @@
"integrity": "sha512-37i+OaWTh9qeK4LSHPsyRC7NahnGotNuZvjLSgcPzblpHB3rrCJxAOgI5gCdKm7coonsaX1Of0ILiTcnZjbfxA==",
"dev": true
},
"node_modules/@types/pg": {
"version": "8.15.5",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.15.5.tgz",
"integrity": "sha512-LF7lF6zWEKxuT3/OR8wAZGzkg4ENGXFNyiV/JeOt9z5B+0ZVwbql9McqX5c/WStFq1GaGso7H1AzP/qSzmlCKQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/node": "*",
"pg-protocol": "*",
"pg-types": "^2.2.0"
}
},
"node_modules/@types/resolve": {
"version": "1.20.2",
"resolved": "https://registry.npmjs.org/@types/resolve/-/resolve-1.20.2.tgz",
@@ -10927,15 +10970,6 @@
"node": ">=8"
}
},
"node_modules/nyc/node_modules/resolve-from": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/resolve-from/-/resolve-from-5.0.0.tgz",
"integrity": "sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==",
"dev": true,
"engines": {
"node": ">=8"
}
},
"node_modules/nyc/node_modules/rimraf": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/rimraf/-/rimraf-3.0.2.tgz",
@@ -11441,6 +11475,103 @@
"integrity": "sha512-F3asv42UuXchdzt+xXqfW1OGlVBe+mxa2mqI0pg5yAHZPvFmY3Y6drSf/GQ1A86WgWEN9Kzh/WrgKa6iGcHXLg==",
"dev": true
},
"node_modules/pg": {
"version": "8.16.3",
"resolved": "https://registry.npmjs.org/pg/-/pg-8.16.3.tgz",
"integrity": "sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==",
"dev": true,
"license": "MIT",
"dependencies": {
"pg-connection-string": "^2.9.1",
"pg-pool": "^3.10.1",
"pg-protocol": "^1.10.3",
"pg-types": "2.2.0",
"pgpass": "1.0.5"
},
"engines": {
"node": ">= 16.0.0"
},
"optionalDependencies": {
"pg-cloudflare": "^1.2.7"
},
"peerDependencies": {
"pg-native": ">=3.0.1"
},
"peerDependenciesMeta": {
"pg-native": {
"optional": true
}
}
},
"node_modules/pg-cloudflare": {
"version": "1.2.7",
"resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.2.7.tgz",
"integrity": "sha512-YgCtzMH0ptvZJslLM1ffsY4EuGaU0cx4XSdXLRFae8bPP4dS5xL1tNB3k2o/N64cHJpwU7dxKli/nZ2lUa5fLg==",
"dev": true,
"license": "MIT",
"optional": true
},
"node_modules/pg-connection-string": {
"version": "2.9.1",
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.9.1.tgz",
"integrity": "sha512-nkc6NpDcvPVpZXxrreI/FOtX3XemeLl8E0qFr6F2Lrm/I8WOnaWNhIPK2Z7OHpw7gh5XJThi6j6ppgNoaT1w4w==",
"dev": true,
"license": "MIT"
},
"node_modules/pg-int8": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz",
"integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==",
"dev": true,
"license": "ISC",
"engines": {
"node": ">=4.0.0"
}
},
"node_modules/pg-pool": {
"version": "3.10.1",
"resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.10.1.tgz",
"integrity": "sha512-Tu8jMlcX+9d8+QVzKIvM/uJtp07PKr82IUOYEphaWcoBhIYkoHpLXN3qO59nAI11ripznDsEzEv8nUxBVWajGg==",
"dev": true,
"license": "MIT",
"peerDependencies": {
"pg": ">=8.0"
}
},
"node_modules/pg-protocol": {
"version": "1.10.3",
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.10.3.tgz",
"integrity": "sha512-6DIBgBQaTKDJyxnXaLiLR8wBpQQcGWuAESkRBX/t6OwA8YsqP+iVSiond2EDy6Y/dsGk8rh/jtax3js5NeV7JQ==",
"dev": true,
"license": "MIT"
},
"node_modules/pg-types": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz",
"integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==",
"dev": true,
"license": "MIT",
"dependencies": {
"pg-int8": "1.0.1",
"postgres-array": "~2.0.0",
"postgres-bytea": "~1.0.0",
"postgres-date": "~1.0.4",
"postgres-interval": "^1.1.0"
},
"engines": {
"node": ">=4"
}
},
"node_modules/pgpass": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz",
"integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==",
"dev": true,
"license": "MIT",
"dependencies": {
"split2": "^4.1.0"
}
},
"node_modules/picocolors": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.0.1.tgz",
@@ -11571,6 +11702,49 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/postgres-array": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz",
"integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=4"
}
},
"node_modules/postgres-bytea": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz",
"integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/postgres-date": {
"version": "1.0.7",
"resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz",
"integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/postgres-interval": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz",
"integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"xtend": "^4.0.0"
},
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/prebuild-install": {
"version": "7.1.2",
"resolved": "https://registry.npmjs.org/prebuild-install/-/prebuild-install-7.1.2.tgz",
@@ -12443,6 +12617,16 @@
"integrity": "sha512-0a1F4l73/ZFZOakJnQ3FvkJ2+gSTQWz/r2KE5OdDY0TxPm5h4GkqkWWfM47T7HsbnOtcJVEF4epCVy6u7Q3K+g==",
"dev": true
},
"node_modules/resolve-from": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/resolve-from/-/resolve-from-5.0.0.tgz",
"integrity": "sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=8"
}
},
"node_modules/responselike": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/responselike/-/responselike-2.0.1.tgz",
@@ -15507,7 +15691,7 @@
}
},
"packages/engine.io": {
"version": "6.6.3",
"version": "6.6.4",
"license": "MIT",
"dependencies": {
"@types/cors": "^2.8.12",
@@ -15676,6 +15860,32 @@
"optional": true
}
}
},
"packages/socket.io-postgres-emitter": {
"name": "@socket.io/postgres-emitter",
"version": "0.1.0",
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "^2.7.0",
"debug": "~4.3.1"
}
},
"packages/socket.io-postgres-emitter/node_modules/debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"license": "MIT",
"dependencies": {
"ms": "2.1.2"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
}
}
}

View File

@@ -10,7 +10,8 @@
"packages/socket.io-adapter",
"packages/socket.io-parser",
"packages/socket.io-client",
"packages/socket.io"
"packages/socket.io",
"packages/socket.io-postgres-emitter"
],
"overrides": {
"@types/estree": "0.0.52",
@@ -29,10 +30,12 @@
"@rollup/plugin-commonjs": "^26.0.1",
"@rollup/plugin-node-resolve": "^15.2.3",
"@sinonjs/fake-timers": "^11.2.2",
"@socket.io/postgres-adapter": "^0.1.0",
"@types/debug": "^4.1.12",
"@types/expect.js": "^0.3.32",
"@types/mocha": "^10.0.7",
"@types/node": "18.15.3",
"@types/pg": "^8.6.0",
"@types/sinonjs__fake-timers": "^8.1.5",
"@wdio/cli": "^8.39.1",
"@wdio/local-runner": "^8.39.1",
@@ -55,6 +58,7 @@
"mocha": "^10.6.0",
"node-forge": "^1.3.1",
"nyc": "^17.0.0",
"pg": "^8.6.0",
"prettier": "^3.3.2",
"redis": "^4.6.15",
"rimraf": "^6.0.0",

View File

@@ -0,0 +1,4 @@
# 0.1.0 (2021-06-14)
Initial commit

View File

@@ -0,0 +1,7 @@
Copyright (c) 2021 Damien Arrachequesne (@darrachequesne)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,149 @@
# Socket.IO Postgres emitter
The `@socket.io/postgres-emitter` package allows you to easily communicate with a group of Socket.IO servers from another Node.js process (server-side).
![Emitter diagram](./assets/emitter.png)
It must be used in conjunction with [`@socket.io/postgres-adapter`](https://github.com/socketio/socket.io-postgres-adapter/).
Supported features:
- [broadcasting](https://socket.io/docs/v4/broadcasting-events/)
- [utility methods](https://socket.io/docs/v4/server-instance/#Utility-methods)
- [`socketsJoin`](https://socket.io/docs/v4/server-instance/#socketsJoin)
- [`socketsLeave`](https://socket.io/docs/v4/server-instance/#socketsLeave)
- [`disconnectSockets`](https://socket.io/docs/v4/server-instance/#disconnectSockets)
- [`serverSideEmit`](https://socket.io/docs/v4/server-instance/#serverSideEmit)
Related packages:
- Postgres adapter: https://github.com/socketio/socket.io-postgres-adapter/
- Redis adapter: https://github.com/socketio/socket.io-redis-adapter/
- Redis emitter: https://github.com/socketio/socket.io-redis-emitter/
- MongoDB adapter: https://github.com/socketio/socket.io-mongo-adapter/
- MongoDB emitter: https://github.com/socketio/socket.io-mongo-emitter/
**Table of contents**
- [Installation](#installation)
- [Usage](#usage)
- [API](#api)
- [Known errors](#known-errors)
- [License](#license)
## Installation
```
npm install @socket.io/postgres-emitter pg
```
For TypeScript users, you might also need `@types/pg`.
## Usage
```js
const { Emitter } = require("@socket.io/postgres-emitter");
const { Pool } = require("pg");
const pool = new Pool({
user: "postgres",
host: "localhost",
database: "postgres",
password: "changeit",
port: 5432,
});
const io = new Emitter(pool);
setInterval(() => {
io.emit("ping", new Date());
}, 1000);
```
## API
### `Emitter(pool[, nsp][, opts])`
```js
const io = new Emitter(pool);
```
The `pool` argument is a [Pool object](https://node-postgres.com/api/pool) from the `pg` package.
### `Emitter#to(room:string):BroadcastOperator`
### `Emitter#in(room:string):BroadcastOperator`
Specifies a specific `room` that you want to emit to.
```js
io.to("room1").emit("hello");
```
### `Emitter#except(room:string):BroadcastOperator`
Specifies a specific `room` that you want to exclude from broadcasting.
```js
io.except("room2").emit("hello");
```
### `Emitter#of(namespace:string):Emitter`
Specifies a specific namespace that you want to emit to.
```js
const customNamespace = io.of("/custom");
customNamespace.emit("hello");
```
### `Emitter#socketsJoin(rooms:string|string[])`
Makes the matching socket instances join the specified rooms:
```js
// make all Socket instances join the "room1" room
io.socketsJoin("room1");
// make all Socket instances of the "admin" namespace in the "room1" room join the "room2" room
io.of("/admin").in("room1").socketsJoin("room2");
```
### `Emitter#socketsLeave(rooms:string|string[])`
Makes the matching socket instances leave the specified rooms:
```js
// make all Socket instances leave the "room1" room
io.socketsLeave("room1");
// make all Socket instances of the "admin" namespace in the "room1" room leave the "room2" room
io.of("/admin").in("room1").socketsLeave("room2");
```
### `Emitter#disconnectSockets(close:boolean)`
Makes the matching socket instances disconnect:
```js
// make all Socket instances disconnect
io.disconnectSockets();
// make all Socket instances of the "admin" namespace in the "room1" room disconnect
io.of("/admin").in("room1").disconnectSockets();
// this also works with a single socket ID
io.of("/admin").in(theSocketId).disconnectSockets();
```
### `Emitter#serverSideEmit(ev:string[,...args:any[]])`
Emits an event that will be received by each Socket.IO server of the cluster.
```js
io.serverSideEmit("ping");
```
## License
[MIT](LICENSE)

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

View File

@@ -0,0 +1,9 @@
version: "3"
services:
postgres:
image: postgres:12
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: "changeit"

View File

@@ -0,0 +1,534 @@
import debugModule from "debug";
import type {
DefaultEventsMap,
EventNames,
EventParams,
EventsMap,
TypedEventBroadcaster,
} from "./typed-events";
import { encode } from "@msgpack/msgpack";
const debug = debugModule("socket.io-postgres-emitter");
const EMITTER_UID = "emitter";
const hasBinary = (obj: any, toJSON?: boolean): boolean => {
if (!obj || typeof obj !== "object") {
return false;
}
if (obj instanceof ArrayBuffer || ArrayBuffer.isView(obj)) {
return true;
}
if (Array.isArray(obj)) {
for (let i = 0, l = obj.length; i < l; i++) {
if (hasBinary(obj[i])) {
return true;
}
}
return false;
}
for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key) && hasBinary(obj[key])) {
return true;
}
}
if (obj.toJSON && typeof obj.toJSON === "function" && !toJSON) {
return hasBinary(obj.toJSON(), true);
}
return false;
};
/**
* Event types, for messages between nodes
*/
enum EventType {
INITIAL_HEARTBEAT = 1,
HEARTBEAT,
BROADCAST,
SOCKETS_JOIN,
SOCKETS_LEAVE,
DISCONNECT_SOCKETS,
FETCH_SOCKETS,
FETCH_SOCKETS_RESPONSE,
SERVER_SIDE_EMIT,
SERVER_SIDE_EMIT_RESPONSE,
}
interface BroadcastFlags {
volatile?: boolean;
compress?: boolean;
}
export interface PostgresEmitterOptions {
/**
* The prefix of the notification channel
* @default "socket.io"
*/
channelPrefix: string;
/**
* The name of the table for payloads over the 8000 bytes limit or containing binary data
*/
tableName: string;
/**
* The threshold for the payload size in bytes (see https://www.postgresql.org/docs/current/sql-notify.html)
* @default 8000
*/
payloadThreshold: number;
}
export class Emitter<
EmitEvents extends EventsMap = DefaultEventsMap,
ServerSideEvents extends EventsMap = DefaultEventsMap,
> {
public readonly channel: string;
public readonly tableName: string;
public payloadThreshold: number;
constructor(
readonly pool: any,
readonly nsp: string = "/",
opts: Partial<PostgresEmitterOptions> = {},
) {
const channelPrefix = opts.channelPrefix || "socket.io";
this.channel = `${channelPrefix}#${nsp}`;
this.tableName = opts.tableName || "socket_io_attachments";
this.payloadThreshold = opts.payloadThreshold || 8000;
}
/**
* Return a new emitter for the given namespace.
*
* @param nsp - namespace
* @public
*/
public of(nsp: string): Emitter {
return new Emitter(this.pool, (nsp[0] !== "/" ? "/" : "") + nsp);
}
/**
* Emits to all clients.
*
* @return Always true
* @public
*/
public emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
): true {
return new BroadcastOperator<EmitEvents, ServerSideEvents>(this).emit(
ev,
...args,
);
}
/**
* Targets a room when emitting.
*
* @param room
* @return BroadcastOperator
* @public
*/
public to(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return new BroadcastOperator(this).to(room);
}
/**
* Targets a room when emitting.
*
* @param room
* @return BroadcastOperator
* @public
*/
public in(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return new BroadcastOperator(this).in(room);
}
/**
* Excludes a room when emitting.
*
* @param room
* @return BroadcastOperator
* @public
*/
public except(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return new BroadcastOperator(this).except(room);
}
/**
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
* receive messages (because of network slowness or other issues, or because theyre connected through long polling
* and is in the middle of a request-response cycle).
*
* @return BroadcastOperator
* @public
*/
public get volatile(): BroadcastOperator<EmitEvents, ServerSideEvents> {
return new BroadcastOperator(this).volatile;
}
/**
* Sets the compress flag.
*
* @param compress - if `true`, compresses the sending data
* @return BroadcastOperator
* @public
*/
public compress(
compress: boolean,
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return new BroadcastOperator(this).compress(compress);
}
/**
* Makes the matching socket instances join the specified rooms
*
* @param rooms
* @public
*/
public socketsJoin(rooms: string | string[]): void {
return new BroadcastOperator(this).socketsJoin(rooms);
}
/**
* Makes the matching socket instances leave the specified rooms
*
* @param rooms
* @public
*/
public socketsLeave(rooms: string | string[]): void {
return new BroadcastOperator(this).socketsLeave(rooms);
}
/**
* Makes the matching socket instances disconnect
*
* @param close - whether to close the underlying connection
* @public
*/
public disconnectSockets(close: boolean = false): void {
return new BroadcastOperator(this).disconnectSockets(close);
}
/**
* Send a packet to the Socket.IO servers in the cluster
*
* @param ev - the event name
* @param args - any number of serializable arguments
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): void {
return new BroadcastOperator<EmitEvents, ServerSideEvents>(
this,
).serverSideEmit(ev, ...args);
}
}
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set(<const>[
"connect",
"connect_error",
"disconnect",
"disconnecting",
"newListener",
"removeListener",
]);
export class BroadcastOperator<
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap,
> implements TypedEventBroadcaster<EmitEvents>
{
constructor(
private readonly emitter: Emitter,
private readonly rooms: Set<string> = new Set<string>(),
private readonly exceptRooms: Set<string> = new Set<string>(),
private readonly flags: BroadcastFlags = {},
) {}
/**
* Targets a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public to(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
const rooms = new Set(this.rooms);
if (Array.isArray(room)) {
room.forEach((r) => rooms.add(r));
} else {
rooms.add(room);
}
return new BroadcastOperator(
this.emitter,
rooms,
this.exceptRooms,
this.flags,
);
}
/**
* Targets a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public in(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
return this.to(room);
}
/**
* Excludes a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public except(
room: string | string[],
): BroadcastOperator<EmitEvents, ServerSideEvents> {
const exceptRooms = new Set(this.exceptRooms);
if (Array.isArray(room)) {
room.forEach((r) => exceptRooms.add(r));
} else {
exceptRooms.add(room);
}
return new BroadcastOperator(
this.emitter,
this.rooms,
exceptRooms,
this.flags,
);
}
/**
* Sets the compress flag.
*
* @param compress - if `true`, compresses the sending data
* @return a new BroadcastOperator instance
* @public
*/
public compress(
compress: boolean,
): BroadcastOperator<EmitEvents, ServerSideEvents> {
const flags = Object.assign({}, this.flags, { compress });
return new BroadcastOperator(
this.emitter,
this.rooms,
this.exceptRooms,
flags,
);
}
/**
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
* receive messages (because of network slowness or other issues, or because theyre connected through long polling
* and is in the middle of a request-response cycle).
*
* @return a new BroadcastOperator instance
* @public
*/
public get volatile(): BroadcastOperator<EmitEvents, ServerSideEvents> {
const flags = Object.assign({}, this.flags, { volatile: true });
return new BroadcastOperator(
this.emitter,
this.rooms,
this.exceptRooms,
flags,
);
}
private async publish(document: any) {
document.uid = EMITTER_UID;
try {
if (
[
EventType.BROADCAST,
EventType.SERVER_SIDE_EMIT,
EventType.SERVER_SIDE_EMIT_RESPONSE,
].includes(document.type) &&
hasBinary(document)
) {
return await this.publishWithAttachment(document);
}
const payload = JSON.stringify(document);
if (Buffer.byteLength(payload) > this.emitter.payloadThreshold) {
return await this.publishWithAttachment(document);
}
debug(
"sending event of type %s to channel %s",
document.type,
this.emitter.channel,
);
await this.emitter.pool.query(
`NOTIFY "${this.emitter.channel}", '${payload}'`,
);
} catch (err) {
// @ts-ignore
this.emit("error", err);
}
}
private async publishWithAttachment(document: any) {
const payload = encode(document);
debug(
"sending event of type %s with attachment to channel %s",
document.type,
this.emitter.channel,
);
const result = await this.emitter.pool.query(
`INSERT INTO ${this.emitter.tableName} (payload) VALUES ($1) RETURNING id;`,
[payload],
);
const attachmentId = result.rows[0].id;
const headerPayload = JSON.stringify({
uid: document.uid,
type: document.type,
attachmentId,
});
this.emitter.pool.query(
`NOTIFY "${this.emitter.channel}", '${headerPayload}'`,
);
}
/**
* Emits to all clients.
*
* @return Always true
* @public
*/
public emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
): true {
if (RESERVED_EVENTS.has(ev)) {
throw new Error(`"${String(ev)}" is a reserved event name`);
}
// set up packet object
const data = [ev, ...args];
const packet = {
type: 2, // EVENT
data: data,
nsp: this.emitter.nsp,
};
const opts = {
rooms: [...this.rooms],
flags: this.flags,
except: [...this.exceptRooms],
};
this.publish({
type: EventType.BROADCAST,
data: {
packet,
opts,
},
});
return true;
}
/**
* Makes the matching socket instances join the specified rooms
*
* @param rooms
* @public
*/
public socketsJoin(rooms: string | string[]): void {
this.publish({
type: EventType.SOCKETS_JOIN,
data: {
opts: {
rooms: [...this.rooms],
except: [...this.exceptRooms],
},
rooms: Array.isArray(rooms) ? rooms : [rooms],
},
});
}
/**
* Makes the matching socket instances leave the specified rooms
*
* @param rooms
* @public
*/
public socketsLeave(rooms: string | string[]): void {
this.publish({
type: EventType.SOCKETS_LEAVE,
data: {
opts: {
rooms: [...this.rooms],
except: [...this.exceptRooms],
},
rooms: Array.isArray(rooms) ? rooms : [rooms],
},
});
}
/**
* Makes the matching socket instances disconnect
*
* @param close - whether to close the underlying connection
* @public
*/
public disconnectSockets(close: boolean = false): void {
this.publish({
type: EventType.DISCONNECT_SOCKETS,
data: {
opts: {
rooms: [...this.rooms],
except: [...this.exceptRooms],
},
close,
},
});
}
/**
* Send a packet to the Socket.IO servers in the cluster
*
* @param ev - the event name
* @param args - any number of serializable arguments
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): void {
const withAck = args.length && typeof args[args.length - 1] === "function";
if (withAck) {
throw new Error("Acknowledgements are not supported");
}
this.publish({
type: EventType.SERVER_SIDE_EMIT,
data: {
packet: [ev, ...args],
},
});
}
}

View File

@@ -0,0 +1,37 @@
/**
* An events map is an interface that maps event names to their value, which
* represents the type of the `on` listener.
*/
export interface EventsMap {
[event: string]: any;
}
/**
* The default events map, used if no EventsMap is given. Using this EventsMap
* is equivalent to accepting all event names, and any data.
*/
export interface DefaultEventsMap {
[event: string]: (...args: any[]) => void;
}
/**
* Returns a union type containing all the keys of an event map.
*/
export type EventNames<Map extends EventsMap> = keyof Map & (string | symbol);
/** The tuple type representing the parameters of an event listener */
export type EventParams<
Map extends EventsMap,
Ev extends EventNames<Map>,
> = Parameters<Map[Ev]>;
/**
* Interface for classes that aren't `EventEmitter`s, but still expose a
* strictly typed `emit` method.
*/
export interface TypedEventBroadcaster<EmitEvents extends EventsMap> {
emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
): boolean;
}

View File

@@ -0,0 +1,33 @@
{
"name": "@socket.io/postgres-emitter",
"version": "0.1.0",
"description": "The Socket.IO Postgres emitter, allowing to communicate with a group of Socket.IO servers from another Node.js process",
"license": "MIT",
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-postgres-emitter#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/socketio/socket.io.git"
},
"files": [
"dist/"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"compile": "rimraf ./dist && tsc",
"test": "npm run format:check && npm run compile && nyc mocha --require ts-node/register --timeout 5000 test/index.ts",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
"format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'",
"prepack": "npm run compile"
},
"dependencies": {
"@msgpack/msgpack": "^2.7.0",
"debug": "~4.3.1"
},
"keywords": [
"socket.io",
"postgres",
"postgresql",
"emitter"
]
}

View File

@@ -0,0 +1,278 @@
import { createServer } from "http";
import { Server, Socket as ServerSocket } from "socket.io";
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
import expect = require("expect.js");
import { createAdapter } from "@socket.io/postgres-adapter";
import type { AddressInfo } from "net";
import { Pool } from "pg";
import { times, sleep } from "./util";
import { Emitter } from "..";
const NODES_COUNT = 3;
describe("@socket.io/postgres-emitter", () => {
let servers: Server[],
serverSockets: ServerSocket[],
clientSockets: ClientSocket[],
pool: Pool,
emitter: Emitter;
beforeEach((done) => {
servers = [];
serverSockets = [];
clientSockets = [];
pool = new Pool({
user: "postgres",
host: "localhost",
database: "postgres",
password: "changeit",
port: 5432,
});
pool.query(
`
CREATE TABLE IF NOT EXISTS socket_io_attachments (
id bigserial UNIQUE,
created_at timestamptz DEFAULT NOW(),
payload bytea
);
`,
() => {},
);
emitter = new Emitter(pool);
for (let i = 1; i <= NODES_COUNT; i++) {
const httpServer = createServer();
const io = new Server(httpServer);
// @ts-ignore
io.adapter(createAdapter(pool));
httpServer.listen(() => {
const port = (httpServer.address() as AddressInfo).port;
const clientSocket = ioc(`http://localhost:${port}`);
io.on("connection", async (socket) => {
clientSockets.push(clientSocket);
serverSockets.push(socket);
servers.push(io);
if (servers.length === NODES_COUNT) {
// ensure all nodes know each other
servers[0].emit("ping");
servers[1].emit("ping");
servers[2].emit("ping");
await sleep(200);
done();
}
});
});
}
});
afterEach((done) => {
servers.forEach((server) => {
// @ts-ignore
server.httpServer.close();
server.of("/").adapter.close();
});
clientSockets.forEach((socket) => {
socket.disconnect();
});
pool.end(done);
});
describe("broadcast", function () {
it("broadcasts to all clients", (done) => {
const partialDone = times(3, done);
clientSockets.forEach((clientSocket) => {
clientSocket.on("test", (arg1, arg2, arg3) => {
expect(arg1).to.eql(1);
expect(arg2).to.eql("2");
expect(Buffer.isBuffer(arg3)).to.be(true);
partialDone();
});
});
emitter.emit("test", 1, "2", Buffer.from([3, 4]));
});
it("broadcasts to all clients in a namespace", (done) => {
const partialDone = times(3, () => {
servers.forEach((server) => server.of("/custom").adapter.close());
done();
});
servers.forEach((server) => server.of("/custom"));
const onConnect = times(3, async () => {
await sleep(200);
emitter.of("/custom").emit("test");
});
clientSockets.forEach((clientSocket) => {
const socket = clientSocket.io.socket("/custom");
socket.on("connect", onConnect);
socket.on("test", () => {
socket.disconnect();
partialDone();
});
});
});
it("broadcasts to all clients in a room", (done) => {
serverSockets[1].join("room1");
clientSockets[0].on("test", () => {
done(new Error("should not happen"));
});
clientSockets[1].on("test", () => {
done();
});
clientSockets[2].on("test", () => {
done(new Error("should not happen"));
});
emitter.to("room1").emit("test");
});
it("broadcasts to all clients except in room", (done) => {
const partialDone = times(2, done);
serverSockets[1].join("room1");
clientSockets[0].on("test", () => {
partialDone();
});
clientSockets[1].on("test", () => {
done(new Error("should not happen"));
});
clientSockets[2].on("test", () => {
partialDone();
});
emitter.of("/").except("room1").emit("test");
});
});
describe("socketsJoin", () => {
it("makes all socket instances join the specified room", async () => {
emitter.socketsJoin("room1");
await sleep(200);
expect(serverSockets[0].rooms.has("room1")).to.be(true);
expect(serverSockets[1].rooms.has("room1")).to.be(true);
expect(serverSockets[2].rooms.has("room1")).to.be(true);
});
it("makes the matching socket instances join the specified room", async () => {
serverSockets[0].join("room1");
serverSockets[2].join("room1");
emitter.in("room1").socketsJoin("room2");
await sleep(200);
expect(serverSockets[0].rooms.has("room2")).to.be(true);
expect(serverSockets[1].rooms.has("room2")).to.be(false);
expect(serverSockets[2].rooms.has("room2")).to.be(true);
});
it("makes the given socket instance join the specified room", async () => {
emitter.in(serverSockets[1].id).socketsJoin("room3");
await sleep(200);
expect(serverSockets[0].rooms.has("room3")).to.be(false);
expect(serverSockets[1].rooms.has("room3")).to.be(true);
expect(serverSockets[2].rooms.has("room3")).to.be(false);
});
});
describe("socketsLeave", () => {
it("makes all socket instances leave the specified room", async () => {
serverSockets[0].join("room1");
serverSockets[2].join("room1");
emitter.socketsLeave("room1");
await sleep(200);
expect(serverSockets[0].rooms.has("room1")).to.be(false);
expect(serverSockets[1].rooms.has("room1")).to.be(false);
expect(serverSockets[2].rooms.has("room1")).to.be(false);
});
it("makes the matching socket instances leave the specified room", async () => {
serverSockets[0].join(["room1", "room2"]);
serverSockets[1].join(["room1", "room2"]);
serverSockets[2].join(["room2"]);
emitter.in("room1").socketsLeave("room2");
await sleep(200);
expect(serverSockets[0].rooms.has("room2")).to.be(false);
expect(serverSockets[1].rooms.has("room2")).to.be(false);
expect(serverSockets[2].rooms.has("room2")).to.be(true);
});
it("makes the given socket instance leave the specified room", async () => {
serverSockets[0].join("room3");
serverSockets[1].join("room3");
serverSockets[2].join("room3");
emitter.in(serverSockets[1].id).socketsLeave("room3");
await sleep(200);
expect(serverSockets[0].rooms.has("room3")).to.be(true);
expect(serverSockets[1].rooms.has("room3")).to.be(false);
expect(serverSockets[2].rooms.has("room3")).to.be(true);
});
});
describe("disconnectSockets", () => {
it("makes all socket instances disconnect", (done) => {
const partialDone = times(3, done);
clientSockets.forEach((clientSocket) => {
clientSocket.on("disconnect", (reason) => {
expect(reason).to.eql("io server disconnect");
partialDone();
});
});
emitter.disconnectSockets();
});
});
describe("serverSideEmit", () => {
it("sends an event to other server instances", (done) => {
const partialDone = times(3, done);
emitter.serverSideEmit("hello", "world", 1, "2");
servers[0].on("hello", (arg1, arg2, arg3) => {
expect(arg1).to.eql("world");
expect(arg2).to.eql(1);
expect(arg3).to.eql("2");
partialDone();
});
servers[1].on("hello", (arg1, arg2, arg3) => {
partialDone();
});
servers[2].of("/").on("hello", () => {
partialDone();
});
});
});
});

View File

@@ -0,0 +1,13 @@
export function times(count: number, fn: () => void) {
let i = 0;
return () => {
i++;
if (i === count) {
fn();
}
};
}
export function sleep(duration: number) {
return new Promise((resolve) => setTimeout(resolve, duration));
}

View File

@@ -0,0 +1,12 @@
{
"compilerOptions": {
"outDir": "./dist",
"allowJs": false,
"target": "es2017",
"module": "commonjs",
"declaration": true
},
"include": [
"./lib/**/*"
]
}