diff --git a/lib/broadcast-operator.ts b/lib/broadcast-operator.ts index fb78ad68..0426e4d0 100644 --- a/lib/broadcast-operator.ts +++ b/lib/broadcast-operator.ts @@ -7,8 +7,10 @@ import type { EventNames, EventsMap, TypedEventBroadcaster, - DecorateAcknowledgements, DecorateAcknowledgementsWithTimeoutAndMultipleResponses, + AllButLast, + Last, + SecondArg, } from "./typed-events"; export class BroadcastOperator @@ -276,6 +278,36 @@ export class BroadcastOperator return true; } + /** + * Emits an event and waits for an acknowledgement from all clients. + * + * @example + * try { + * const responses = await io.timeout(1000).emitWithAck("some-event"); + * console.log(responses); // one response per client + * } catch (e) { + * // some clients did not acknowledge the event in the given delay + * } + * + * @return a Promise that will be fulfilled when all clients have acknowledged the event + */ + public emitWithAck>( + ev: Ev, + ...args: AllButLast> + ): Promise>>> { + return new Promise((resolve, reject) => { + args.push((err, responses) => { + if (err) { + err.responses = responses; + return reject(err); + } else { + return resolve(responses); + } + }); + this.emit(ev, ...(args as any[] as EventParams)); + }); + } + /** * Gets a list of clients. * diff --git a/lib/index.ts b/lib/index.ts index 1b7e54c3..00807164 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -34,8 +34,11 @@ import { EventParams, StrictEventEmitter, EventNames, - DecorateAcknowledgements, DecorateAcknowledgementsWithTimeoutAndMultipleResponses, + AllButLast, + Last, + FirstArg, + SecondArg, } from "./typed-events"; import { patchAdapter, restoreAdapter, serveFile } from "./uws"; import type { BaseServer } from "engine.io/build/server"; @@ -811,6 +814,26 @@ export class Server< return this.sockets.except(room); } + /** + * Emits an event and waits for an acknowledgement from all clients. + * + * @example + * try { + * const responses = await io.timeout(1000).emitWithAck("some-event"); + * console.log(responses); // one response per client + * } catch (e) { + * // some clients did not acknowledge the event in the given delay + * } + * + * @return a Promise that will be fulfilled when all clients have acknowledged the event + */ + public emitWithAck>( + ev: Ev, + ...args: AllButLast> + ): Promise>>> { + return this.sockets.emitWithAck(ev, ...args); + } + /** * Sends a `message` event to all clients. * @@ -854,9 +877,9 @@ export class Server< * // acknowledgements (without binary content) are supported too: * io.serverSideEmit("ping", (err, responses) => { * if (err) { - * // some clients did not acknowledge the event in the given delay + * // some servers did not acknowledge the event in the given delay * } else { - * console.log(responses); // one response per client + * console.log(responses); // one response per server (except the current one) * } * }); * @@ -877,6 +900,29 @@ export class Server< return this.sockets.serverSideEmit(ev, ...args); } + /** + * Sends a message and expect an acknowledgement from the other Socket.IO servers of the cluster. + * + * @example + * try { + * const responses = await io.serverSideEmitWithAck("ping"); + * console.log(responses); // one response per server (except the current one) + * } catch (e) { + * // some servers did not acknowledge the event in the given delay + * } + * + * @param ev - the event name + * @param args - an array of arguments + * + * @return a Promise that will be fulfilled when all servers have acknowledged the event + */ + public serverSideEmitWithAck>( + ev: Ev, + ...args: AllButLast> + ): Promise>>[]> { + return this.sockets.serverSideEmitWithAck(ev, ...args); + } + /** * Gets a list of socket ids. * diff --git a/lib/namespace.ts b/lib/namespace.ts index fd298e6e..c5658f76 100644 --- a/lib/namespace.ts +++ b/lib/namespace.ts @@ -7,6 +7,10 @@ import { StrictEventEmitter, DefaultEventsMap, DecorateAcknowledgementsWithTimeoutAndMultipleResponses, + AllButLast, + Last, + FirstArg, + SecondArg, } from "./typed-events"; import type { Client } from "./client"; import debugModule from "debug"; @@ -433,6 +437,30 @@ export class Namespace< ); } + /** + * Emits an event and waits for an acknowledgement from all clients. + * + * @example + * const myNamespace = io.of("/my-namespace"); + * + * try { + * const responses = await myNamespace.timeout(1000).emitWithAck("some-event"); + * console.log(responses); // one response per client + * } catch (e) { + * // some clients did not acknowledge the event in the given delay + * } + * + * @return a Promise that will be fulfilled when all clients have acknowledged the event + */ + public emitWithAck>( + ev: Ev, + ...args: AllButLast> + ): Promise>>> { + return new BroadcastOperator( + this.adapter + ).emitWithAck(ev, ...args); + } + /** * Sends a `message` event to all clients. * @@ -480,9 +508,9 @@ export class Namespace< * // acknowledgements (without binary content) are supported too: * myNamespace.serverSideEmit("ping", (err, responses) => { * if (err) { - * // some clients did not acknowledge the event in the given delay + * // some servers did not acknowledge the event in the given delay * } else { - * console.log(responses); // one response per client + * console.log(responses); // one response per server (except the current one) * } * }); * @@ -508,6 +536,44 @@ export class Namespace< return true; } + /** + * Sends a message and expect an acknowledgement from the other Socket.IO servers of the cluster. + * + * @example + * const myNamespace = io.of("/my-namespace"); + * + * try { + * const responses = await myNamespace.serverSideEmitWithAck("ping"); + * console.log(responses); // one response per server (except the current one) + * } catch (e) { + * // some servers did not acknowledge the event in the given delay + * } + * + * @param ev - the event name + * @param args - an array of arguments + * + * @return a Promise that will be fulfilled when all servers have acknowledged the event + */ + public serverSideEmitWithAck>( + ev: Ev, + ...args: AllButLast> + ): Promise>>[]> { + return new Promise((resolve, reject) => { + args.push((err, responses) => { + if (err) { + err.responses = responses; + return reject(err); + } else { + return resolve(responses); + } + }); + this.serverSideEmit( + ev, + ...(args as any[] as EventParams) + ); + }); + } + /** * Called when a packet is received from another Socket.IO server * diff --git a/lib/socket.ts b/lib/socket.ts index 8dd5d4ff..d2056bc7 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -2,12 +2,15 @@ import { Packet, PacketType } from "socket.io-parser"; import debugModule from "debug"; import type { Server } from "./index"; import { + AllButLast, DecorateAcknowledgements, DecorateAcknowledgementsWithMultipleResponses, DefaultEventsMap, EventNames, EventParams, EventsMap, + FirstArg, + Last, StrictEventEmitter, } from "./typed-events"; import type { Client } from "./client"; @@ -357,6 +360,42 @@ export class Socket< return true; } + /** + * Emits an event and waits for an acknowledgement + * + * @example + * io.on("connection", async (socket) => { + * // without timeout + * const response = await socket.emitWithAck("hello", "world"); + * + * // with a specific timeout + * try { + * const response = await socket.timeout(1000).emitWithAck("hello", "world"); + * } catch (err) { + * // the client did not acknowledge the event in the given delay + * } + * }); + * + * @return a Promise that will be fulfilled when the client acknowledges the event + */ + public emitWithAck>( + ev: Ev, + ...args: AllButLast> + ): Promise>>> { + // the timeout flag is optional + const withErr = this.flags.timeout !== undefined; + return new Promise((resolve, reject) => { + args.push((arg1, arg2) => { + if (withErr) { + return arg1 ? reject(arg1) : resolve(arg2); + } else { + return resolve(arg1); + } + }); + this.emit(ev, ...(args as any[] as EventParams)); + }); + } + /** * @private */ diff --git a/lib/typed-events.ts b/lib/typed-events.ts index 20b71226..0dca57a6 100644 --- a/lib/typed-events.ts +++ b/lib/typed-events.ts @@ -179,6 +179,20 @@ export abstract class StrictEventEmitter< } } +export type Last = T extends [...infer H, infer L] ? L : any; +export type AllButLast = T extends [...infer H, infer L] + ? H + : any[]; +export type FirstArg = T extends (arg: infer Param) => infer Result + ? Param + : any; +export type SecondArg = T extends ( + err: Error, + arg: infer Param +) => infer Result + ? Param + : any; + type PrependTimeoutError = { [K in keyof T]: T[K] extends (...args: infer Params) => infer Result ? (err: Error, ...args: Params) => Result diff --git a/test/messaging-many.ts b/test/messaging-many.ts index ec0f617a..983ea236 100644 --- a/test/messaging-many.ts +++ b/test/messaging-many.ts @@ -471,6 +471,74 @@ describe("messaging many", () => { }); }); + it("should broadcast and expect multiple acknowledgements (promise)", (done) => { + const io = new Server(0); + const socket1 = createClient(io, "/", { multiplex: false }); + const socket2 = createClient(io, "/", { multiplex: false }); + const socket3 = createClient(io, "/", { multiplex: false }); + + socket1.on("some event", (cb) => { + cb(1); + }); + + socket2.on("some event", (cb) => { + cb(2); + }); + + socket3.on("some event", (cb) => { + cb(3); + }); + + Promise.all([ + waitFor(socket1, "connect"), + waitFor(socket2, "connect"), + waitFor(socket3, "connect"), + ]).then(async () => { + const responses = await io.timeout(2000).emitWithAck("some event"); + expect(responses).to.contain(1, 2, 3); + + success(done, io, socket1, socket2, socket3); + }); + }); + + it("should fail when a client does not acknowledge the event in the given delay (promise)", (done) => { + const io = new Server(0); + const socket1 = createClient(io, "/", { multiplex: false }); + const socket2 = createClient(io, "/", { multiplex: false }); + const socket3 = createClient(io, "/", { multiplex: false }); + + socket1.on("some event", (cb) => { + cb(1); + }); + + socket2.on("some event", (cb) => { + cb(2); + }); + + socket3.on("some event", () => { + // timeout + }); + + Promise.all([ + waitFor(socket1, "connect"), + waitFor(socket2, "connect"), + waitFor(socket3, "connect"), + ]).then(async () => { + try { + await io.timeout(200).emitWithAck("some event"); + expect.fail(); + } catch (err) { + expect(err).to.be.an(Error); + // @ts-ignore + expect(err.responses).to.have.length(2); + // @ts-ignore + expect(err.responses).to.contain(1, 2); + + success(done, io, socket1, socket2, socket3); + } + }); + }); + it("should broadcast and return if the packet is sent to 0 client", (done) => { const io = new Server(0); const socket1 = createClient(io, "/", { multiplex: false }); diff --git a/test/socket-timeout.ts b/test/socket-timeout.ts index ee682eae..813ec9a2 100644 --- a/test/socket-timeout.ts +++ b/test/socket-timeout.ts @@ -54,4 +54,34 @@ describe("timeout", () => { }); }); }); + + it("should timeout if the client does not acknowledge the event (promise)", (done) => { + const io = new Server(0); + const client = createClient(io, "/"); + + io.on("connection", async (socket) => { + try { + await socket.timeout(50).emitWithAck("unknown"); + expect.fail(); + } catch (err) { + expect(err).to.be.an(Error); + success(done, io, client); + } + }); + }); + + it("should not timeout if the client does acknowledge the event (promise)", (done) => { + const io = new Server(0); + const client = createClient(io, "/"); + + client.on("echo", (arg, cb) => { + cb(arg); + }); + + io.on("connection", async (socket) => { + const value = await socket.timeout(50).emitWithAck("echo", 42); + expect(value).to.be(42); + success(done, io, client); + }); + }); }); diff --git a/test/socket.io.test-d.ts b/test/socket.io.test-d.ts index 1785da34..b3dcde96 100644 --- a/test/socket.io.test-d.ts +++ b/test/socket.io.test-d.ts @@ -92,6 +92,28 @@ describe("server", () => { }); }); }); + + describe("emitWithAck", () => { + it("accepts any parameters", () => { + const srv = createServer(); + const sio = new Server(srv); + srv.listen(async () => { + const value = await sio + .timeout(1000) + .emitWithAck("ackFromServerSingleArg", true, "123"); + expectType(value); + + sio.on("connection", async (s) => { + const value1 = await s.emitWithAck( + "ackFromServerSingleArg", + true, + "123" + ); + expectType(value1); + }); + }); + }); + }); }); describe("single event map", () => { @@ -181,6 +203,13 @@ describe("server", () => { b: string, ack: (c: boolean, d: string) => void ) => void; + + ackFromServerSingleArg: ( + a: boolean, + b: string, + ack: (c: string) => void + ) => void; + multipleAckFromServer: ( a: boolean, b: string, @@ -295,6 +324,42 @@ describe("server", () => { }); }); }); + + describe("emitWithAck", () => { + it("accepts arguments of the correct types", (done) => { + const srv = createServer(); + const sio = new Server(srv); + srv.listen(async () => { + const value = await sio + .timeout(1000) + .emitWithAck("multipleAckFromServer", true, "123"); + expectType(value); + + sio.on("connection", async (s) => { + const value1 = await s + .timeout(1000) + .to("room") + .emitWithAck("multipleAckFromServer", true, "123"); + expectType(value1); + + const value2 = await s + .to("room") + .timeout(1000) + .emitWithAck("multipleAckFromServer", true, "123"); + expectType(value2); + + const value3 = await s.emitWithAck( + "ackFromServerSingleArg", + true, + "123" + ); + expectType(value3); + + done(); + }); + }); + }); + }); }); describe("listen and emit event maps for the serverSideEmit method", () => { @@ -323,7 +388,7 @@ describe("server", () => { expectType< Server >(sio); - srv.listen(() => { + srv.listen(async () => { sio.serverSideEmit("helloFromServerToServer", "hello", 10); sio .of("/test") @@ -343,6 +408,12 @@ describe("server", () => { expectType(bar); }); + const value = await sio.serverSideEmitWithAck( + "ackFromServerToServer", + "foo" + ); + expectType(value); + sio.on("ackFromServerToServer", (foo, cb) => { expectType(foo); expectType<(bar: number) => void>(cb); diff --git a/test/socket.ts b/test/socket.ts index a815679e..94613c53 100644 --- a/test/socket.ts +++ b/test/socket.ts @@ -605,6 +605,24 @@ describe("socket", () => { }); }); + it("should emit an event and wait for the acknowledgement", (done) => { + const io = new Server(0); + const socket = createClient(io); + + io.on("connection", async (s) => { + socket.on("hi", (a, b, fn) => { + expect(a).to.be(1); + expect(b).to.be(2); + fn(3); + }); + + const val = await s.emitWithAck("hi", 1, 2); + expect(val).to.be(3); + + success(done, io, socket); + }); + }); + it("should have access to the client", (done) => { const io = new Server(0); const socket = createClient(io);