feat: add promise-based acknowledgements

This commit adds some syntactic sugar around acknowledgements:

- `emitWithAck()`

```js
try {
  const responses = await io.timeout(1000).emitWithAck("some-event");
  console.log(responses); // one response per client
} catch (e) {
  // some clients did not acknowledge the event in the given delay
}

io.on("connection", async (socket) => {
    // without timeout
  const response = await socket.emitWithAck("hello", "world");

  // with a specific timeout
  try {
    const response = await socket.timeout(1000).emitWithAck("hello", "world");
  } catch (err) {
    // the client did not acknowledge the event in the given delay
  }
});
```

- `serverSideEmitWithAck()`

```js
try {
  const responses = await io.timeout(1000).serverSideEmitWithAck("some-event");
  console.log(responses); // one response per server (except itself)
} catch (e) {
  // some servers did not acknowledge the event in the given delay
}
```

Related:

- https://github.com/socketio/socket.io/issues/4175
- https://github.com/socketio/socket.io/issues/4577
- https://github.com/socketio/socket.io/issues/4583
This commit is contained in:
Damien Arrachequesne
2023-01-20 08:17:15 +01:00
parent 5d9220b69a
commit 184f3cf7af
9 changed files with 391 additions and 7 deletions

View File

@@ -7,8 +7,10 @@ import type {
EventNames,
EventsMap,
TypedEventBroadcaster,
DecorateAcknowledgements,
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
AllButLast,
Last,
SecondArg,
} from "./typed-events";
export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
@@ -276,6 +278,36 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
return true;
}
/**
* Emits an event and waits for an acknowledgement from all clients.
*
* @example
* try {
* const responses = await io.timeout(1000).emitWithAck("some-event");
* console.log(responses); // one response per client
* } catch (e) {
* // some clients did not acknowledge the event in the given delay
* }
*
* @return a Promise that will be fulfilled when all clients have acknowledged the event
*/
public emitWithAck<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<SecondArg<Last<EventParams<EmitEvents, Ev>>>> {
return new Promise((resolve, reject) => {
args.push((err, responses) => {
if (err) {
err.responses = responses;
return reject(err);
} else {
return resolve(responses);
}
});
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>));
});
}
/**
* Gets a list of clients.
*

View File

@@ -34,8 +34,11 @@ import {
EventParams,
StrictEventEmitter,
EventNames,
DecorateAcknowledgements,
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
AllButLast,
Last,
FirstArg,
SecondArg,
} from "./typed-events";
import { patchAdapter, restoreAdapter, serveFile } from "./uws";
import type { BaseServer } from "engine.io/build/server";
@@ -811,6 +814,26 @@ export class Server<
return this.sockets.except(room);
}
/**
* Emits an event and waits for an acknowledgement from all clients.
*
* @example
* try {
* const responses = await io.timeout(1000).emitWithAck("some-event");
* console.log(responses); // one response per client
* } catch (e) {
* // some clients did not acknowledge the event in the given delay
* }
*
* @return a Promise that will be fulfilled when all clients have acknowledged the event
*/
public emitWithAck<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<SecondArg<Last<EventParams<EmitEvents, Ev>>>> {
return this.sockets.emitWithAck(ev, ...args);
}
/**
* Sends a `message` event to all clients.
*
@@ -854,9 +877,9 @@ export class Server<
* // acknowledgements (without binary content) are supported too:
* io.serverSideEmit("ping", (err, responses) => {
* if (err) {
* // some clients did not acknowledge the event in the given delay
* // some servers did not acknowledge the event in the given delay
* } else {
* console.log(responses); // one response per client
* console.log(responses); // one response per server (except the current one)
* }
* });
*
@@ -877,6 +900,29 @@ export class Server<
return this.sockets.serverSideEmit(ev, ...args);
}
/**
* Sends a message and expect an acknowledgement from the other Socket.IO servers of the cluster.
*
* @example
* try {
* const responses = await io.serverSideEmitWithAck("ping");
* console.log(responses); // one response per server (except the current one)
* } catch (e) {
* // some servers did not acknowledge the event in the given delay
* }
*
* @param ev - the event name
* @param args - an array of arguments
*
* @return a Promise that will be fulfilled when all servers have acknowledged the event
*/
public serverSideEmitWithAck<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: AllButLast<EventParams<ServerSideEvents, Ev>>
): Promise<FirstArg<Last<EventParams<ServerSideEvents, Ev>>>[]> {
return this.sockets.serverSideEmitWithAck(ev, ...args);
}
/**
* Gets a list of socket ids.
*

View File

@@ -7,6 +7,10 @@ import {
StrictEventEmitter,
DefaultEventsMap,
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
AllButLast,
Last,
FirstArg,
SecondArg,
} from "./typed-events";
import type { Client } from "./client";
import debugModule from "debug";
@@ -433,6 +437,30 @@ export class Namespace<
);
}
/**
* Emits an event and waits for an acknowledgement from all clients.
*
* @example
* const myNamespace = io.of("/my-namespace");
*
* try {
* const responses = await myNamespace.timeout(1000).emitWithAck("some-event");
* console.log(responses); // one response per client
* } catch (e) {
* // some clients did not acknowledge the event in the given delay
* }
*
* @return a Promise that will be fulfilled when all clients have acknowledged the event
*/
public emitWithAck<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<SecondArg<Last<EventParams<EmitEvents, Ev>>>> {
return new BroadcastOperator<EmitEvents, SocketData>(
this.adapter
).emitWithAck(ev, ...args);
}
/**
* Sends a `message` event to all clients.
*
@@ -480,9 +508,9 @@ export class Namespace<
* // acknowledgements (without binary content) are supported too:
* myNamespace.serverSideEmit("ping", (err, responses) => {
* if (err) {
* // some clients did not acknowledge the event in the given delay
* // some servers did not acknowledge the event in the given delay
* } else {
* console.log(responses); // one response per client
* console.log(responses); // one response per server (except the current one)
* }
* });
*
@@ -508,6 +536,44 @@ export class Namespace<
return true;
}
/**
* Sends a message and expect an acknowledgement from the other Socket.IO servers of the cluster.
*
* @example
* const myNamespace = io.of("/my-namespace");
*
* try {
* const responses = await myNamespace.serverSideEmitWithAck("ping");
* console.log(responses); // one response per server (except the current one)
* } catch (e) {
* // some servers did not acknowledge the event in the given delay
* }
*
* @param ev - the event name
* @param args - an array of arguments
*
* @return a Promise that will be fulfilled when all servers have acknowledged the event
*/
public serverSideEmitWithAck<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: AllButLast<EventParams<ServerSideEvents, Ev>>
): Promise<FirstArg<Last<EventParams<ServerSideEvents, Ev>>>[]> {
return new Promise((resolve, reject) => {
args.push((err, responses) => {
if (err) {
err.responses = responses;
return reject(err);
} else {
return resolve(responses);
}
});
this.serverSideEmit(
ev,
...(args as any[] as EventParams<ServerSideEvents, Ev>)
);
});
}
/**
* Called when a packet is received from another Socket.IO server
*

View File

@@ -2,12 +2,15 @@ import { Packet, PacketType } from "socket.io-parser";
import debugModule from "debug";
import type { Server } from "./index";
import {
AllButLast,
DecorateAcknowledgements,
DecorateAcknowledgementsWithMultipleResponses,
DefaultEventsMap,
EventNames,
EventParams,
EventsMap,
FirstArg,
Last,
StrictEventEmitter,
} from "./typed-events";
import type { Client } from "./client";
@@ -357,6 +360,42 @@ export class Socket<
return true;
}
/**
* Emits an event and waits for an acknowledgement
*
* @example
* io.on("connection", async (socket) => {
* // without timeout
* const response = await socket.emitWithAck("hello", "world");
*
* // with a specific timeout
* try {
* const response = await socket.timeout(1000).emitWithAck("hello", "world");
* } catch (err) {
* // the client did not acknowledge the event in the given delay
* }
* });
*
* @return a Promise that will be fulfilled when the client acknowledges the event
*/
public emitWithAck<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<FirstArg<Last<EventParams<EmitEvents, Ev>>>> {
// the timeout flag is optional
const withErr = this.flags.timeout !== undefined;
return new Promise((resolve, reject) => {
args.push((arg1, arg2) => {
if (withErr) {
return arg1 ? reject(arg1) : resolve(arg2);
} else {
return resolve(arg1);
}
});
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>));
});
}
/**
* @private
*/

View File

@@ -179,6 +179,20 @@ export abstract class StrictEventEmitter<
}
}
export type Last<T extends any[]> = T extends [...infer H, infer L] ? L : any;
export type AllButLast<T extends any[]> = T extends [...infer H, infer L]
? H
: any[];
export type FirstArg<T> = T extends (arg: infer Param) => infer Result
? Param
: any;
export type SecondArg<T> = T extends (
err: Error,
arg: infer Param
) => infer Result
? Param
: any;
type PrependTimeoutError<T extends any[]> = {
[K in keyof T]: T[K] extends (...args: infer Params) => infer Result
? (err: Error, ...args: Params) => Result

View File

@@ -471,6 +471,74 @@ describe("messaging many", () => {
});
});
it("should broadcast and expect multiple acknowledgements (promise)", (done) => {
const io = new Server(0);
const socket1 = createClient(io, "/", { multiplex: false });
const socket2 = createClient(io, "/", { multiplex: false });
const socket3 = createClient(io, "/", { multiplex: false });
socket1.on("some event", (cb) => {
cb(1);
});
socket2.on("some event", (cb) => {
cb(2);
});
socket3.on("some event", (cb) => {
cb(3);
});
Promise.all([
waitFor(socket1, "connect"),
waitFor(socket2, "connect"),
waitFor(socket3, "connect"),
]).then(async () => {
const responses = await io.timeout(2000).emitWithAck("some event");
expect(responses).to.contain(1, 2, 3);
success(done, io, socket1, socket2, socket3);
});
});
it("should fail when a client does not acknowledge the event in the given delay (promise)", (done) => {
const io = new Server(0);
const socket1 = createClient(io, "/", { multiplex: false });
const socket2 = createClient(io, "/", { multiplex: false });
const socket3 = createClient(io, "/", { multiplex: false });
socket1.on("some event", (cb) => {
cb(1);
});
socket2.on("some event", (cb) => {
cb(2);
});
socket3.on("some event", () => {
// timeout
});
Promise.all([
waitFor(socket1, "connect"),
waitFor(socket2, "connect"),
waitFor(socket3, "connect"),
]).then(async () => {
try {
await io.timeout(200).emitWithAck("some event");
expect.fail();
} catch (err) {
expect(err).to.be.an(Error);
// @ts-ignore
expect(err.responses).to.have.length(2);
// @ts-ignore
expect(err.responses).to.contain(1, 2);
success(done, io, socket1, socket2, socket3);
}
});
});
it("should broadcast and return if the packet is sent to 0 client", (done) => {
const io = new Server(0);
const socket1 = createClient(io, "/", { multiplex: false });

View File

@@ -54,4 +54,34 @@ describe("timeout", () => {
});
});
});
it("should timeout if the client does not acknowledge the event (promise)", (done) => {
const io = new Server(0);
const client = createClient(io, "/");
io.on("connection", async (socket) => {
try {
await socket.timeout(50).emitWithAck("unknown");
expect.fail();
} catch (err) {
expect(err).to.be.an(Error);
success(done, io, client);
}
});
});
it("should not timeout if the client does acknowledge the event (promise)", (done) => {
const io = new Server(0);
const client = createClient(io, "/");
client.on("echo", (arg, cb) => {
cb(arg);
});
io.on("connection", async (socket) => {
const value = await socket.timeout(50).emitWithAck("echo", 42);
expect(value).to.be(42);
success(done, io, client);
});
});
});

View File

@@ -92,6 +92,28 @@ describe("server", () => {
});
});
});
describe("emitWithAck", () => {
it("accepts any parameters", () => {
const srv = createServer();
const sio = new Server(srv);
srv.listen(async () => {
const value = await sio
.timeout(1000)
.emitWithAck("ackFromServerSingleArg", true, "123");
expectType<any>(value);
sio.on("connection", async (s) => {
const value1 = await s.emitWithAck(
"ackFromServerSingleArg",
true,
"123"
);
expectType<any>(value1);
});
});
});
});
});
describe("single event map", () => {
@@ -181,6 +203,13 @@ describe("server", () => {
b: string,
ack: (c: boolean, d: string) => void
) => void;
ackFromServerSingleArg: (
a: boolean,
b: string,
ack: (c: string) => void
) => void;
multipleAckFromServer: (
a: boolean,
b: string,
@@ -295,6 +324,42 @@ describe("server", () => {
});
});
});
describe("emitWithAck", () => {
it("accepts arguments of the correct types", (done) => {
const srv = createServer();
const sio = new Server<ClientToServerEvents, ServerToClientEvents>(srv);
srv.listen(async () => {
const value = await sio
.timeout(1000)
.emitWithAck("multipleAckFromServer", true, "123");
expectType<string[]>(value);
sio.on("connection", async (s) => {
const value1 = await s
.timeout(1000)
.to("room")
.emitWithAck("multipleAckFromServer", true, "123");
expectType<string[]>(value1);
const value2 = await s
.to("room")
.timeout(1000)
.emitWithAck("multipleAckFromServer", true, "123");
expectType<string[]>(value2);
const value3 = await s.emitWithAck(
"ackFromServerSingleArg",
true,
"123"
);
expectType<string>(value3);
done();
});
});
});
});
});
describe("listen and emit event maps for the serverSideEmit method", () => {
@@ -323,7 +388,7 @@ describe("server", () => {
expectType<
Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents>
>(sio);
srv.listen(() => {
srv.listen(async () => {
sio.serverSideEmit("helloFromServerToServer", "hello", 10);
sio
.of("/test")
@@ -343,6 +408,12 @@ describe("server", () => {
expectType<number[]>(bar);
});
const value = await sio.serverSideEmitWithAck(
"ackFromServerToServer",
"foo"
);
expectType<number[]>(value);
sio.on("ackFromServerToServer", (foo, cb) => {
expectType<string>(foo);
expectType<(bar: number) => void>(cb);

View File

@@ -605,6 +605,24 @@ describe("socket", () => {
});
});
it("should emit an event and wait for the acknowledgement", (done) => {
const io = new Server(0);
const socket = createClient(io);
io.on("connection", async (s) => {
socket.on("hi", (a, b, fn) => {
expect(a).to.be(1);
expect(b).to.be(2);
fn(3);
});
const val = await s.emitWithAck("hi", 1, 2);
expect(val).to.be(3);
success(done, io, socket);
});
});
it("should have access to the client", (done) => {
const io = new Server(0);
const socket = createClient(io);