diff --git a/.changeset/tall-mirrors-smile.md b/.changeset/tall-mirrors-smile.md new file mode 100644 index 00000000..3ad80fd7 --- /dev/null +++ b/.changeset/tall-mirrors-smile.md @@ -0,0 +1,8 @@ +--- +'@farcaster/flatbuffers': patch +'@farcaster/utils': patch +'@farcaster/grpc': patch +'@farcaster/js': patch +--- + +feat: add support for /subscribe gRPC method in @farcaster/js diff --git a/apps/hub/src/rpc/server/serviceImplementations/eventImplementation.ts b/apps/hub/src/rpc/server/serviceImplementations/eventImplementation.ts index 4af89d75..409cc5b9 100644 --- a/apps/hub/src/rpc/server/serviceImplementations/eventImplementation.ts +++ b/apps/hub/src/rpc/server/serviceImplementations/eventImplementation.ts @@ -44,11 +44,32 @@ export const eventImplementation = (engine: Engine) => { packAndWriteEventResponse(unpackedResponse, stream); }; - engine.eventHandler.on('mergeMessage', mergeMessageListener); - engine.eventHandler.on('pruneMessage', pruneMessageListener); - engine.eventHandler.on('revokeMessage', revokeMessageListener); - engine.eventHandler.on('mergeIdRegistryEvent', mergeIdRegistryEventListener); - engine.eventHandler.on('mergeNameRegistryEvent', mergeNameRegistryEventListener); + const { request } = stream; + + // if no type filters are provided, subscribe to all event types + if (request.eventTypesLength() === 0) { + engine.eventHandler.on('mergeMessage', mergeMessageListener); + engine.eventHandler.on('pruneMessage', pruneMessageListener); + engine.eventHandler.on('revokeMessage', revokeMessageListener); + engine.eventHandler.on('mergeIdRegistryEvent', mergeIdRegistryEventListener); + engine.eventHandler.on('mergeNameRegistryEvent', mergeNameRegistryEventListener); + } else { + for (let i = 0; i < request.eventTypesLength(); i++) { + const type = request.eventTypes(i); + + if (type === EventType.MergeMessage) { + engine.eventHandler.on('mergeMessage', mergeMessageListener); + } else if (type === EventType.PruneMessage) { + engine.eventHandler.on('pruneMessage', pruneMessageListener); + } else if (type === EventType.RevokeMessage) { + engine.eventHandler.on('revokeMessage', revokeMessageListener); + } else if (type === EventType.MergeIdRegistryEvent) { + engine.eventHandler.on('mergeIdRegistryEvent', mergeIdRegistryEventListener); + } else if (type === EventType.MergeNameRegistryEvent) { + engine.eventHandler.on('mergeNameRegistryEvent', mergeNameRegistryEventListener); + } + } + } stream.on('cancelled', () => { stream.destroy(); diff --git a/apps/hub/src/rpc/test/eventService.test.ts b/apps/hub/src/rpc/test/eventService.test.ts index c635d6fa..f23c9f1f 100644 --- a/apps/hub/src/rpc/test/eventService.test.ts +++ b/apps/hub/src/rpc/test/eventService.test.ts @@ -33,9 +33,11 @@ afterAll(async () => { }); const fid = Factories.FID.build(); +const fname = Factories.Fname.build(); const ethSigner = Factories.Eip712Signer.build(); const signer = Factories.Ed25519Signer.build(); let custodyEvent: IdRegistryEventModel; +let nameRegistryEvent: NameRegistryEventModel; let signerAdd: SignerAddModel; let castAdd: CastAddModel; @@ -43,6 +45,9 @@ beforeAll(async () => { custodyEvent = new IdRegistryEventModel( await Factories.IdRegistryEvent.create({ to: Array.from(ethSigner.signerKey), fid: Array.from(fid) }) ); + nameRegistryEvent = new NameRegistryEventModel( + await Factories.NameRegistryEvent.create({ to: Array.from(ethSigner.signerKey), fname: Array.from(fname) }) + ); const signerAddData = await Factories.SignerAddData.create({ body: Factories.SignerBody.build({ signer: Array.from(signer.signerKey) }), @@ -61,40 +66,85 @@ beforeAll(async () => { }); describe('subscribe', () => { - let stream: ClientReadableStream; - let events: [EventType, MessageModel | IdRegistryEventModel | NameRegistryEventModel][]; + const setupSubscription = (eventTypes?: EventType[]) => { + let stream: ClientReadableStream | undefined; + const events: [EventType, MessageModel | IdRegistryEventModel | NameRegistryEventModel][] = []; - beforeEach(async () => { - stream = (await client.subscribe())._unsafeUnwrap(); - events = []; - stream.on('data', (response: EventResponse) => { - if ( - response.type() === EventType.MergeMessage || - response.type() === EventType.PruneMessage || - response.type() === EventType.RevokeMessage - ) { - events.push([response.type(), MessageModel.from(response.bytesArray() ?? new Uint8Array())]); - } else if (response.type() === EventType.MergeIdRegistryEvent) { - events.push([response.type(), IdRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]); - } else if (response.type() === EventType.MergeNameRegistryEvent) { - events.push([response.type(), NameRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]); - } + beforeEach(async () => { + stream = (await client.subscribe(eventTypes))._unsafeUnwrap(); + stream.on('data', (response: EventResponse) => { + if ( + response.type() === EventType.MergeMessage || + response.type() === EventType.PruneMessage || + response.type() === EventType.RevokeMessage + ) { + events.push([response.type(), MessageModel.from(response.bytesArray() ?? new Uint8Array())]); + } else if (response.type() === EventType.MergeIdRegistryEvent) { + events.push([response.type(), IdRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]); + } else if (response.type() === EventType.MergeNameRegistryEvent) { + events.push([response.type(), NameRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]); + } + }); + }); + + afterEach(async () => { + await stream?.cancel(); + }); + + return { stream, events }; + }; + + describe('without type filters', () => { + const { events } = setupSubscription(); + + test('emits event', async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeMessage(signerAdd); + await engine.mergeMessage(castAdd); + await sleep(1_000); // Wait for server to send events over stream + expect(events).toEqual([ + [EventType.MergeIdRegistryEvent, custodyEvent], + [EventType.MergeMessage, signerAdd], + [EventType.MergeMessage, castAdd], + ]); }); }); - afterEach(async () => { - await stream.cancel(); + describe('with one type filter', () => { + const { events } = setupSubscription([EventType.MergeMessage]); + + test('emits event', async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeNameRegistryEvent(nameRegistryEvent); + await engine.mergeMessage(signerAdd); + await engine.mergeMessage(castAdd); + await sleep(1_000); // Wait for server to send events over stream + expect(events).toEqual([ + [EventType.MergeMessage, signerAdd], + [EventType.MergeMessage, castAdd], + ]); + }); }); - test('emits event', async () => { - await engine.mergeIdRegistryEvent(custodyEvent); - await engine.mergeMessage(signerAdd); - await engine.mergeMessage(castAdd); - await sleep(1_000); // Wait for server to send events over stream - expect(events).toEqual([ - [EventType.MergeIdRegistryEvent, custodyEvent], - [EventType.MergeMessage, signerAdd], - [EventType.MergeMessage, castAdd], + describe('with multiple type filters', () => { + const { events } = setupSubscription([ + EventType.MergeMessage, + EventType.MergeNameRegistryEvent, + EventType.MergeIdRegistryEvent, ]); + + test('emits event', async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeNameRegistryEvent(nameRegistryEvent); + await engine.mergeMessage(signerAdd); + await engine.mergeMessage(castAdd); + await sleep(1_000); // Wait for server to send events over stream + expect(events).toEqual([ + [EventType.MergeIdRegistryEvent, custodyEvent], + [EventType.MergeNameRegistryEvent, nameRegistryEvent], + [EventType.MergeMessage, signerAdd], + [EventType.MergeMessage, castAdd], + ]); + }); }); }); diff --git a/packages/flatbuffers/src/generated/rpc_generated.ts b/packages/flatbuffers/src/generated/rpc_generated.ts index d0af43dd..ec76038b 100644 --- a/packages/flatbuffers/src/generated/rpc_generated.ts +++ b/packages/flatbuffers/src/generated/rpc_generated.ts @@ -3047,8 +3047,39 @@ static getSizePrefixedRootAsSubscribeRequest(bb:flatbuffers.ByteBuffer, obj?:Sub return (obj || new SubscribeRequest()).__init(bb.readInt32(bb.position()) + bb.position(), bb); } +eventTypes(index: number):EventType|null { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0; +} + +eventTypesLength():number { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0; +} + +eventTypesArray():Uint8Array|null { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null; +} + static startSubscribeRequest(builder:flatbuffers.Builder) { - builder.startObject(0); + builder.startObject(1); +} + +static addEventTypes(builder:flatbuffers.Builder, eventTypesOffset:flatbuffers.Offset) { + builder.addFieldOffset(0, eventTypesOffset, 0); +} + +static createEventTypesVector(builder:flatbuffers.Builder, data:EventType[]):flatbuffers.Offset { + builder.startVector(1, data.length, 1); + for (let i = data.length - 1; i >= 0; i--) { + builder.addInt8(data[i]!); + } + return builder.endVector(); +} + +static startEventTypesVector(builder:flatbuffers.Builder, numElems:number) { + builder.startVector(1, numElems, 1); } static endSubscribeRequest(builder:flatbuffers.Builder):flatbuffers.Offset { @@ -3056,25 +3087,36 @@ static endSubscribeRequest(builder:flatbuffers.Builder):flatbuffers.Offset { return offset; } -static createSubscribeRequest(builder:flatbuffers.Builder):flatbuffers.Offset { +static createSubscribeRequest(builder:flatbuffers.Builder, eventTypesOffset:flatbuffers.Offset):flatbuffers.Offset { SubscribeRequest.startSubscribeRequest(builder); + SubscribeRequest.addEventTypes(builder, eventTypesOffset); return SubscribeRequest.endSubscribeRequest(builder); } unpack(): SubscribeRequestT { - return new SubscribeRequestT(); + return new SubscribeRequestT( + this.bb!.createScalarList(this.eventTypes.bind(this), this.eventTypesLength()) + ); } -unpackTo(_o: SubscribeRequestT): void {} +unpackTo(_o: SubscribeRequestT): void { + _o.eventTypes = this.bb!.createScalarList(this.eventTypes.bind(this), this.eventTypesLength()); +} } export class SubscribeRequestT implements flatbuffers.IGeneratedObject { -constructor(){} +constructor( + public eventTypes: (EventType)[] = [] +){} pack(builder:flatbuffers.Builder): flatbuffers.Offset { - return SubscribeRequest.createSubscribeRequest(builder); + const eventTypes = SubscribeRequest.createEventTypesVector(builder, this.eventTypes); + + return SubscribeRequest.createSubscribeRequest(builder, + eventTypes + ); } } diff --git a/packages/flatbuffers/src/schemas/rpc.fbs b/packages/flatbuffers/src/schemas/rpc.fbs index d3c1a8da..9570b711 100644 --- a/packages/flatbuffers/src/schemas/rpc.fbs +++ b/packages/flatbuffers/src/schemas/rpc.fbs @@ -42,7 +42,7 @@ table TrieNodeMetadataResponse { prefix: [ubyte]; num_messages: uint64; hash: [ubyte]; - children: [TrieNodeMetadataResponse]; + children: [TrieNodeMetadataResponse]; } table TrieNodeSnapshotResponse { @@ -170,5 +170,7 @@ table GetTrieNodesByPrefixRequest { // Events Requests -table SubscribeRequest {} +table SubscribeRequest { + event_types: [EventType]; +} diff --git a/packages/grpc/src/client.ts b/packages/grpc/src/client.ts index 259c5500..fb070cfe 100644 --- a/packages/grpc/src/client.ts +++ b/packages/grpc/src/client.ts @@ -261,32 +261,13 @@ export class Client { /* Event Methods */ /* -------------------------------------------------------------------------- */ - async subscribe(): HubAsyncResult> { - const method = definitions.eventDefinition().subscribe; - const request = new flatbuffers.SubscribeRequest(); - const stream = this.client.makeServerStreamRequest( - method.path, - method.requestSerialize, - method.responseDeserialize, - request + async subscribe( + eventTypes?: flatbuffers.EventType[] + ): HubAsyncResult> { + return this.makeServerStreamRequest( + definitions.eventDefinition().subscribe, + requests.eventRequests.subscribeRequest(eventTypes) ); - stream.on('error', (e) => { - return e; // Suppress exceptions - }); - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - stream.cancel(); // Cancel if not connected within timeout - reject(err(new HubError('unavailable.network_failure', 'subscribe timed out'))); - }, 1_000); - stream.on('metadata', (metadata: Metadata) => { - clearTimeout(timeout); - if (metadata.get('status')[0] === ('ready' as MetadataValue)) { - resolve(ok(stream)); - } else { - reject(err(new HubError('unavailable.network_failure', 'subscribe failed'))); - } - }); - }); } /* -------------------------------------------------------------------------- */ @@ -340,4 +321,36 @@ export class Client { ); }); } + + async makeServerStreamRequest( + method: grpc.MethodDefinition, + request: RequestType + ): HubAsyncResult> { + const stream = this.client.makeServerStreamRequest( + method.path, + method.requestSerialize, + method.responseDeserialize, + request + ); + + stream.on('error', (e) => { + return e; // Suppress exceptions + }); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + stream.cancel(); // Cancel if not connected within timeout + reject(err(new HubError('unavailable.network_failure', 'subscribe timed out'))); + }, 1_000); + + stream.on('metadata', (metadata: Metadata) => { + clearTimeout(timeout); + if (metadata.get('status')[0] === ('ready' as MetadataValue)) { + resolve(ok(stream)); + } else { + reject(err(new HubError('unavailable.network_failure', 'subscribe failed'))); + } + }); + }); + } } diff --git a/packages/grpc/src/requests/eventRequests.test.ts b/packages/grpc/src/requests/eventRequests.test.ts new file mode 100644 index 00000000..bafab67e --- /dev/null +++ b/packages/grpc/src/requests/eventRequests.test.ts @@ -0,0 +1,20 @@ +import * as flatbuffers from '@farcaster/flatbuffers'; +import { eventRequests } from './eventRequests'; + +describe('eventRequests', () => { + describe('subscribeRequest', () => { + test('succeeds with no eventTypes', () => { + const subscribeRequest = eventRequests.subscribeRequest(); + expect(subscribeRequest).toBeInstanceOf(flatbuffers.SubscribeRequest); + expect(subscribeRequest.eventTypesLength()).toBe(0); + }); + + test('succeeds with eventTypes', () => { + const eventTypes = [flatbuffers.EventType.MergeMessage, flatbuffers.EventType.PruneMessage]; + const subscribeRequest = eventRequests.subscribeRequest(eventTypes); + expect(subscribeRequest).toBeInstanceOf(flatbuffers.SubscribeRequest); + expect(subscribeRequest.eventTypesLength()).toBe(eventTypes.length); + eventTypes.forEach((eventType, index) => expect(subscribeRequest.eventTypes(index)).toEqual(eventType)); + }); + }); +}); diff --git a/packages/grpc/src/requests/eventRequests.ts b/packages/grpc/src/requests/eventRequests.ts new file mode 100644 index 00000000..e476b32e --- /dev/null +++ b/packages/grpc/src/requests/eventRequests.ts @@ -0,0 +1,11 @@ +import * as flatbuffers from '@farcaster/flatbuffers'; +import { Builder, ByteBuffer } from 'flatbuffers'; + +export const eventRequests = { + subscribeRequest: (eventTypes?: flatbuffers.EventType[]): flatbuffers.SubscribeRequest => { + const builder = new Builder(1); + const requestT = new flatbuffers.SubscribeRequestT(eventTypes); + builder.finish(requestT.pack(builder)); + return flatbuffers.SubscribeRequest.getRootAsSubscribeRequest(new ByteBuffer(builder.asUint8Array())); + }, +}; diff --git a/packages/grpc/src/requests/index.ts b/packages/grpc/src/requests/index.ts index a72a526a..7f56614f 100644 --- a/packages/grpc/src/requests/index.ts +++ b/packages/grpc/src/requests/index.ts @@ -1,6 +1,7 @@ export * from './ampRequests'; export * from './bulkRequests'; export * from './castRequests'; +export * from './eventRequests'; export * from './reactionRequests'; export * from './signerRequests'; export * from './userDataRequests'; diff --git a/packages/js/src/client.ts b/packages/js/src/client.ts index 6e6b883a..27e6fe08 100644 --- a/packages/js/src/client.ts +++ b/packages/js/src/client.ts @@ -16,6 +16,10 @@ import { serializeUserId, } from './utils'; +export type EventFilters = { + eventTypes?: flatbuffers.EventType[]; +}; + const deserializeCall = async ( call: HubAsyncResult, deserialize: (fbb: TFlatbuffer) => HubResult @@ -353,5 +357,10 @@ export class Client { /* Event Methods */ /* -------------------------------------------------------------------------- */ - // TODO: subscribe + /** + * Data from this stream can be parsed using `deserializeEventResponse`. + */ + async subscribe(filters: EventFilters = {}) { + return this._grpcClient.subscribe(filters.eventTypes); + } } diff --git a/packages/js/src/types.ts b/packages/js/src/types.ts index ce63b74a..9fcfec92 100644 --- a/packages/js/src/types.ts +++ b/packages/js/src/types.ts @@ -136,3 +136,23 @@ export type NameRegistryEvent = Readonly<{ from: string; // Hex string expiry: number; }>; + +export type MessageEventResponse = { + flatbuffer: flatbuffers.EventResponse; + type: flatbuffers.EventType.MergeMessage | flatbuffers.EventType.PruneMessage | flatbuffers.EventType.RevokeMessage; + message: Message; +}; + +export type IdRegistryEventResponse = { + flatbuffer: flatbuffers.EventResponse; + type: flatbuffers.EventType.MergeIdRegistryEvent; + idRegistryEvent: IdRegistryEvent; +}; + +export type NameRegistryEventResponse = { + flatbuffer: flatbuffers.EventResponse; + type: flatbuffers.EventType.MergeNameRegistryEvent; + nameRegistryEvent: NameRegistryEvent; +}; + +export type EventResponse = NameRegistryEventResponse | IdRegistryEventResponse | MessageEventResponse; diff --git a/packages/js/src/utils.test.ts b/packages/js/src/utils.test.ts index 5a86181d..e33cc2c3 100644 --- a/packages/js/src/utils.test.ts +++ b/packages/js/src/utils.test.ts @@ -806,3 +806,154 @@ describe('deserializeIdRegistryEvent', () => { expect(idRegistryEvent.type).toEqual(idRegistryEventFbb.type()); }); }); + +describe('deserializeEventResponse', () => { + let eventResponseFbb: flatbuffers.EventResponse; + + describe('MergeIdRegistryEvent event', () => { + const type = flatbuffers.EventType.MergeIdRegistryEvent; + let idRegistryEventFbb: flatbuffers.IdRegistryEvent; + let idRegistryEvent: types.IdRegistryEvent; + let eventResponse: types.IdRegistryEventResponse; + + beforeAll(async () => { + idRegistryEventFbb = await Factories.IdRegistryEvent.create(); + idRegistryEvent = await utils.deserializeIdRegistryEvent(idRegistryEventFbb)._unsafeUnwrap(); + eventResponseFbb = await Factories.EventResponse.create({ + type, + bytes: Array.from(idRegistryEventFbb.bb?.bytes() ?? new Uint8Array()), + }); + eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.IdRegistryEventResponse; + }); + + test('flatbuffer', () => { + expect(eventResponse.flatbuffer).toBe(eventResponseFbb); + }); + + test('type', () => { + expect(eventResponse.type).toBe(type); + }); + + test('idRegistryEvent', () => { + expect(eventResponse.idRegistryEvent).toMatchObject(idRegistryEvent); + }); + }); + + describe('MergeNameRegistryEvent event', () => { + const type = flatbuffers.EventType.MergeNameRegistryEvent; + let nameRegistryEventFbb: flatbuffers.NameRegistryEvent; + let nameRegistryEvent: types.NameRegistryEvent; + let eventResponse: types.NameRegistryEventResponse; + + beforeAll(async () => { + nameRegistryEventFbb = await Factories.NameRegistryEvent.create(); + nameRegistryEvent = await utils.deserializeNameRegistryEvent(nameRegistryEventFbb)._unsafeUnwrap(); + eventResponseFbb = await Factories.EventResponse.create({ + type, + bytes: Array.from(nameRegistryEventFbb.bb?.bytes() ?? new Uint8Array()), + }); + eventResponse = utils + .deserializeEventResponse(eventResponseFbb) + ._unsafeUnwrap() as types.NameRegistryEventResponse; + }); + + test('flatbuffer', () => { + expect(eventResponse.flatbuffer).toBe(eventResponseFbb); + }); + + test('type', () => { + expect(eventResponse.type).toBe(type); + }); + + test('nameRegistryEvent', () => { + expect(eventResponse.nameRegistryEvent).toMatchObject(nameRegistryEvent); + }); + }); + + describe('MergeMessage event', () => { + const type = flatbuffers.EventType.MergeMessage; + let messageFbb: flatbuffers.Message; + let message: types.Message; + let eventResponse: types.MessageEventResponse; + + beforeAll(async () => { + messageFbb = await Factories.Message.create(); + message = await utils.deserializeMessage(messageFbb)._unsafeUnwrap(); + eventResponseFbb = await Factories.EventResponse.create({ + type, + bytes: Array.from(messageFbb.bb?.bytes() ?? new Uint8Array()), + }); + eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.MessageEventResponse; + }); + + test('flatbuffer', () => { + expect(eventResponse.flatbuffer).toBe(eventResponseFbb); + }); + + test('type', () => { + expect(eventResponse.type).toBe(type); + }); + + test('message', () => { + expect(eventResponse.message).toMatchObject(message); + }); + }); + + describe('PruneMessage event', () => { + const type = flatbuffers.EventType.PruneMessage; + let messageFbb: flatbuffers.Message; + let message: types.Message; + let eventResponse: types.MessageEventResponse; + + beforeAll(async () => { + messageFbb = await Factories.Message.create(); + message = await utils.deserializeMessage(messageFbb)._unsafeUnwrap(); + eventResponseFbb = await Factories.EventResponse.create({ + type, + bytes: Array.from(messageFbb.bb?.bytes() ?? new Uint8Array()), + }); + eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.MessageEventResponse; + }); + + test('flatbuffer', () => { + expect(eventResponse.flatbuffer).toBe(eventResponseFbb); + }); + + test('type', () => { + expect(eventResponse.type).toBe(type); + }); + + test('message', () => { + expect(eventResponse.message).toMatchObject(message); + }); + }); + + describe('RevokeMessage event', () => { + const type = flatbuffers.EventType.RevokeMessage; + let messageFbb: flatbuffers.Message; + let message: types.Message; + let eventResponse: types.MessageEventResponse; + + beforeAll(async () => { + messageFbb = await Factories.Message.create(); + message = await utils.deserializeMessage(messageFbb)._unsafeUnwrap(); + eventResponseFbb = await Factories.EventResponse.create({ + type, + bytes: Array.from(messageFbb.bb?.bytes() ?? new Uint8Array()), + }); + eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.MessageEventResponse; + }); + + test('flatbuffer', () => { + expect(eventResponse.flatbuffer).toBe(eventResponseFbb); + }); + + test('type', () => { + expect(eventResponse.type).toBe(type); + }); + + test('message', () => { + expect(eventResponse.message).toMatchObject(message); + }); + }); +}); diff --git a/packages/js/src/utils.ts b/packages/js/src/utils.ts index 0d3600a7..07ada1e9 100644 --- a/packages/js/src/utils.ts +++ b/packages/js/src/utils.ts @@ -17,6 +17,54 @@ import { ByteBuffer } from 'flatbuffers'; import { err, ok, Result } from 'neverthrow'; import * as types from './types'; +/* -------------------------------------------------------------------------- */ +/* Event Response */ +/* -------------------------------------------------------------------------- */ + +export const deserializeEventResponse = (fbb: flatbuffers.EventResponse): HubResult => { + const type = fbb.type(); + + switch (type) { + case flatbuffers.EventType.MergeMessage: + case flatbuffers.EventType.PruneMessage: + case flatbuffers.EventType.RevokeMessage: { + return deserializeMessage( + flatbuffers.Message.getRootAsMessage(new ByteBuffer(fbb.bytesArray() ?? new Uint8Array())) + ).map((message) => { + return { + flatbuffer: fbb, + type, + message, + }; + }); + } + case flatbuffers.EventType.MergeIdRegistryEvent: { + return deserializeIdRegistryEvent( + flatbuffers.IdRegistryEvent.getRootAsIdRegistryEvent(new ByteBuffer(fbb.bytesArray() ?? new Uint8Array())) + ).map((idRegistryEvent) => { + return { + flatbuffer: fbb, + type, + idRegistryEvent, + }; + }); + } + case flatbuffers.EventType.MergeNameRegistryEvent: { + return deserializeNameRegistryEvent( + flatbuffers.NameRegistryEvent.getRootAsNameRegistryEvent(new ByteBuffer(fbb.bytesArray() ?? new Uint8Array())) + ).map((nameRegistryEvent) => { + return { + flatbuffer: fbb, + type, + nameRegistryEvent, + }; + }); + } + default: + return err(new HubError('bad_request.invalid_param', `unknown EventType '${type}'`)); + } +}; + /* -------------------------------------------------------------------------- */ /* Registry Events */ /* -------------------------------------------------------------------------- */ diff --git a/packages/utils/src/factories.test.ts b/packages/utils/src/factories.test.ts index a51f6790..afdadeed 100644 --- a/packages/utils/src/factories.test.ts +++ b/packages/utils/src/factories.test.ts @@ -111,3 +111,25 @@ describe('BytesFactory', () => { }); }); }); + +describe('EventResponseFactory', () => { + describe('build', () => { + const eventResponse = Factories.EventResponse.build(); + + test('generates an EventResponseT', () => { + expect(eventResponse).toBeInstanceOf(flatbuffers.EventResponseT); + }); + }); + + describe('create', () => { + let eventResponse: flatbuffers.EventResponse; + + beforeAll(async () => { + eventResponse = await Factories.EventResponse.create(); + }); + + test('generates an EventResponse', () => { + expect(eventResponse).toBeInstanceOf(flatbuffers.EventResponse); + }); + }); +}); diff --git a/packages/utils/src/factories.ts b/packages/utils/src/factories.ts index 8adf61fb..d438916e 100644 --- a/packages/utils/src/factories.ts +++ b/packages/utils/src/factories.ts @@ -547,6 +547,29 @@ const TsHashHexFactory = Factory.define(() => { return faker.datatype.hexadecimal({ length: 40, case: 'lower' }); }); +const EventTypeFactory = Factory.define(() => { + return faker.helpers.arrayElement([ + flatbuffers.EventType.MergeIdRegistryEvent, + flatbuffers.EventType.MergeNameRegistryEvent, + flatbuffers.EventType.MergeMessage, + flatbuffers.EventType.RevokeMessage, + flatbuffers.EventType.PruneMessage, + ]); +}); + +const EventResponseFactory = Factory.define( + ({ onCreate }) => { + onCreate((params) => { + const builder = new Builder(1); + builder.finish(params.pack(builder)); + return flatbuffers.EventResponse.getRootAsEventResponse(new ByteBuffer(builder.asUint8Array())); + }); + + const type = EventTypeFactory.build(); + return new flatbuffers.EventResponseT(type); + } +); + export const Factories = { Bytes: BytesFactory, FID: FIDFactory, @@ -596,4 +619,5 @@ export const Factories = { EthAddressHex: EthAddressHexFactory, Ed25519PublicKeyHex: Ed25519PublicKeyHexFactory, TsHashHex: TsHashHexFactory, + EventResponse: EventResponseFactory, };