mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-02-03 01:15:38 -05:00
feat: add support for /subscribe gRPC method in @farcaster/js package (#429)
* feat: add EventResponse types and deserialize fn * expose subscribe on @farcaster/js client * add type filtering to subscribe * refactor event to event_types * increase test coverage * add changeset
This commit is contained in:
8
.changeset/tall-mirrors-smile.md
Normal file
8
.changeset/tall-mirrors-smile.md
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
'@farcaster/flatbuffers': patch
|
||||
'@farcaster/utils': patch
|
||||
'@farcaster/grpc': patch
|
||||
'@farcaster/js': patch
|
||||
---
|
||||
|
||||
feat: add support for /subscribe gRPC method in @farcaster/js
|
||||
@@ -44,11 +44,32 @@ export const eventImplementation = (engine: Engine) => {
|
||||
packAndWriteEventResponse(unpackedResponse, stream);
|
||||
};
|
||||
|
||||
engine.eventHandler.on('mergeMessage', mergeMessageListener);
|
||||
engine.eventHandler.on('pruneMessage', pruneMessageListener);
|
||||
engine.eventHandler.on('revokeMessage', revokeMessageListener);
|
||||
engine.eventHandler.on('mergeIdRegistryEvent', mergeIdRegistryEventListener);
|
||||
engine.eventHandler.on('mergeNameRegistryEvent', mergeNameRegistryEventListener);
|
||||
const { request } = stream;
|
||||
|
||||
// if no type filters are provided, subscribe to all event types
|
||||
if (request.eventTypesLength() === 0) {
|
||||
engine.eventHandler.on('mergeMessage', mergeMessageListener);
|
||||
engine.eventHandler.on('pruneMessage', pruneMessageListener);
|
||||
engine.eventHandler.on('revokeMessage', revokeMessageListener);
|
||||
engine.eventHandler.on('mergeIdRegistryEvent', mergeIdRegistryEventListener);
|
||||
engine.eventHandler.on('mergeNameRegistryEvent', mergeNameRegistryEventListener);
|
||||
} else {
|
||||
for (let i = 0; i < request.eventTypesLength(); i++) {
|
||||
const type = request.eventTypes(i);
|
||||
|
||||
if (type === EventType.MergeMessage) {
|
||||
engine.eventHandler.on('mergeMessage', mergeMessageListener);
|
||||
} else if (type === EventType.PruneMessage) {
|
||||
engine.eventHandler.on('pruneMessage', pruneMessageListener);
|
||||
} else if (type === EventType.RevokeMessage) {
|
||||
engine.eventHandler.on('revokeMessage', revokeMessageListener);
|
||||
} else if (type === EventType.MergeIdRegistryEvent) {
|
||||
engine.eventHandler.on('mergeIdRegistryEvent', mergeIdRegistryEventListener);
|
||||
} else if (type === EventType.MergeNameRegistryEvent) {
|
||||
engine.eventHandler.on('mergeNameRegistryEvent', mergeNameRegistryEventListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stream.on('cancelled', () => {
|
||||
stream.destroy();
|
||||
|
||||
@@ -33,9 +33,11 @@ afterAll(async () => {
|
||||
});
|
||||
|
||||
const fid = Factories.FID.build();
|
||||
const fname = Factories.Fname.build();
|
||||
const ethSigner = Factories.Eip712Signer.build();
|
||||
const signer = Factories.Ed25519Signer.build();
|
||||
let custodyEvent: IdRegistryEventModel;
|
||||
let nameRegistryEvent: NameRegistryEventModel;
|
||||
let signerAdd: SignerAddModel;
|
||||
let castAdd: CastAddModel;
|
||||
|
||||
@@ -43,6 +45,9 @@ beforeAll(async () => {
|
||||
custodyEvent = new IdRegistryEventModel(
|
||||
await Factories.IdRegistryEvent.create({ to: Array.from(ethSigner.signerKey), fid: Array.from(fid) })
|
||||
);
|
||||
nameRegistryEvent = new NameRegistryEventModel(
|
||||
await Factories.NameRegistryEvent.create({ to: Array.from(ethSigner.signerKey), fname: Array.from(fname) })
|
||||
);
|
||||
|
||||
const signerAddData = await Factories.SignerAddData.create({
|
||||
body: Factories.SignerBody.build({ signer: Array.from(signer.signerKey) }),
|
||||
@@ -61,40 +66,85 @@ beforeAll(async () => {
|
||||
});
|
||||
|
||||
describe('subscribe', () => {
|
||||
let stream: ClientReadableStream<EventResponse>;
|
||||
let events: [EventType, MessageModel | IdRegistryEventModel | NameRegistryEventModel][];
|
||||
const setupSubscription = (eventTypes?: EventType[]) => {
|
||||
let stream: ClientReadableStream<EventResponse> | undefined;
|
||||
const events: [EventType, MessageModel | IdRegistryEventModel | NameRegistryEventModel][] = [];
|
||||
|
||||
beforeEach(async () => {
|
||||
stream = (await client.subscribe())._unsafeUnwrap();
|
||||
events = [];
|
||||
stream.on('data', (response: EventResponse) => {
|
||||
if (
|
||||
response.type() === EventType.MergeMessage ||
|
||||
response.type() === EventType.PruneMessage ||
|
||||
response.type() === EventType.RevokeMessage
|
||||
) {
|
||||
events.push([response.type(), MessageModel.from(response.bytesArray() ?? new Uint8Array())]);
|
||||
} else if (response.type() === EventType.MergeIdRegistryEvent) {
|
||||
events.push([response.type(), IdRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]);
|
||||
} else if (response.type() === EventType.MergeNameRegistryEvent) {
|
||||
events.push([response.type(), NameRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]);
|
||||
}
|
||||
beforeEach(async () => {
|
||||
stream = (await client.subscribe(eventTypes))._unsafeUnwrap();
|
||||
stream.on('data', (response: EventResponse) => {
|
||||
if (
|
||||
response.type() === EventType.MergeMessage ||
|
||||
response.type() === EventType.PruneMessage ||
|
||||
response.type() === EventType.RevokeMessage
|
||||
) {
|
||||
events.push([response.type(), MessageModel.from(response.bytesArray() ?? new Uint8Array())]);
|
||||
} else if (response.type() === EventType.MergeIdRegistryEvent) {
|
||||
events.push([response.type(), IdRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]);
|
||||
} else if (response.type() === EventType.MergeNameRegistryEvent) {
|
||||
events.push([response.type(), NameRegistryEventModel.from(response.bytesArray() ?? new Uint8Array())]);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await stream?.cancel();
|
||||
});
|
||||
|
||||
return { stream, events };
|
||||
};
|
||||
|
||||
describe('without type filters', () => {
|
||||
const { events } = setupSubscription();
|
||||
|
||||
test('emits event', async () => {
|
||||
await engine.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine.mergeMessage(signerAdd);
|
||||
await engine.mergeMessage(castAdd);
|
||||
await sleep(1_000); // Wait for server to send events over stream
|
||||
expect(events).toEqual([
|
||||
[EventType.MergeIdRegistryEvent, custodyEvent],
|
||||
[EventType.MergeMessage, signerAdd],
|
||||
[EventType.MergeMessage, castAdd],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await stream.cancel();
|
||||
describe('with one type filter', () => {
|
||||
const { events } = setupSubscription([EventType.MergeMessage]);
|
||||
|
||||
test('emits event', async () => {
|
||||
await engine.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine.mergeNameRegistryEvent(nameRegistryEvent);
|
||||
await engine.mergeMessage(signerAdd);
|
||||
await engine.mergeMessage(castAdd);
|
||||
await sleep(1_000); // Wait for server to send events over stream
|
||||
expect(events).toEqual([
|
||||
[EventType.MergeMessage, signerAdd],
|
||||
[EventType.MergeMessage, castAdd],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
test('emits event', async () => {
|
||||
await engine.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine.mergeMessage(signerAdd);
|
||||
await engine.mergeMessage(castAdd);
|
||||
await sleep(1_000); // Wait for server to send events over stream
|
||||
expect(events).toEqual([
|
||||
[EventType.MergeIdRegistryEvent, custodyEvent],
|
||||
[EventType.MergeMessage, signerAdd],
|
||||
[EventType.MergeMessage, castAdd],
|
||||
describe('with multiple type filters', () => {
|
||||
const { events } = setupSubscription([
|
||||
EventType.MergeMessage,
|
||||
EventType.MergeNameRegistryEvent,
|
||||
EventType.MergeIdRegistryEvent,
|
||||
]);
|
||||
|
||||
test('emits event', async () => {
|
||||
await engine.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine.mergeNameRegistryEvent(nameRegistryEvent);
|
||||
await engine.mergeMessage(signerAdd);
|
||||
await engine.mergeMessage(castAdd);
|
||||
await sleep(1_000); // Wait for server to send events over stream
|
||||
expect(events).toEqual([
|
||||
[EventType.MergeIdRegistryEvent, custodyEvent],
|
||||
[EventType.MergeNameRegistryEvent, nameRegistryEvent],
|
||||
[EventType.MergeMessage, signerAdd],
|
||||
[EventType.MergeMessage, castAdd],
|
||||
]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3047,8 +3047,39 @@ static getSizePrefixedRootAsSubscribeRequest(bb:flatbuffers.ByteBuffer, obj?:Sub
|
||||
return (obj || new SubscribeRequest()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
|
||||
}
|
||||
|
||||
eventTypes(index: number):EventType|null {
|
||||
const offset = this.bb!.__offset(this.bb_pos, 4);
|
||||
return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0;
|
||||
}
|
||||
|
||||
eventTypesLength():number {
|
||||
const offset = this.bb!.__offset(this.bb_pos, 4);
|
||||
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
|
||||
}
|
||||
|
||||
eventTypesArray():Uint8Array|null {
|
||||
const offset = this.bb!.__offset(this.bb_pos, 4);
|
||||
return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null;
|
||||
}
|
||||
|
||||
static startSubscribeRequest(builder:flatbuffers.Builder) {
|
||||
builder.startObject(0);
|
||||
builder.startObject(1);
|
||||
}
|
||||
|
||||
static addEventTypes(builder:flatbuffers.Builder, eventTypesOffset:flatbuffers.Offset) {
|
||||
builder.addFieldOffset(0, eventTypesOffset, 0);
|
||||
}
|
||||
|
||||
static createEventTypesVector(builder:flatbuffers.Builder, data:EventType[]):flatbuffers.Offset {
|
||||
builder.startVector(1, data.length, 1);
|
||||
for (let i = data.length - 1; i >= 0; i--) {
|
||||
builder.addInt8(data[i]!);
|
||||
}
|
||||
return builder.endVector();
|
||||
}
|
||||
|
||||
static startEventTypesVector(builder:flatbuffers.Builder, numElems:number) {
|
||||
builder.startVector(1, numElems, 1);
|
||||
}
|
||||
|
||||
static endSubscribeRequest(builder:flatbuffers.Builder):flatbuffers.Offset {
|
||||
@@ -3056,25 +3087,36 @@ static endSubscribeRequest(builder:flatbuffers.Builder):flatbuffers.Offset {
|
||||
return offset;
|
||||
}
|
||||
|
||||
static createSubscribeRequest(builder:flatbuffers.Builder):flatbuffers.Offset {
|
||||
static createSubscribeRequest(builder:flatbuffers.Builder, eventTypesOffset:flatbuffers.Offset):flatbuffers.Offset {
|
||||
SubscribeRequest.startSubscribeRequest(builder);
|
||||
SubscribeRequest.addEventTypes(builder, eventTypesOffset);
|
||||
return SubscribeRequest.endSubscribeRequest(builder);
|
||||
}
|
||||
|
||||
unpack(): SubscribeRequestT {
|
||||
return new SubscribeRequestT();
|
||||
return new SubscribeRequestT(
|
||||
this.bb!.createScalarList<EventType>(this.eventTypes.bind(this), this.eventTypesLength())
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
unpackTo(_o: SubscribeRequestT): void {}
|
||||
unpackTo(_o: SubscribeRequestT): void {
|
||||
_o.eventTypes = this.bb!.createScalarList<EventType>(this.eventTypes.bind(this), this.eventTypesLength());
|
||||
}
|
||||
}
|
||||
|
||||
export class SubscribeRequestT implements flatbuffers.IGeneratedObject {
|
||||
constructor(){}
|
||||
constructor(
|
||||
public eventTypes: (EventType)[] = []
|
||||
){}
|
||||
|
||||
|
||||
pack(builder:flatbuffers.Builder): flatbuffers.Offset {
|
||||
return SubscribeRequest.createSubscribeRequest(builder);
|
||||
const eventTypes = SubscribeRequest.createEventTypesVector(builder, this.eventTypes);
|
||||
|
||||
return SubscribeRequest.createSubscribeRequest(builder,
|
||||
eventTypes
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ table TrieNodeMetadataResponse {
|
||||
prefix: [ubyte];
|
||||
num_messages: uint64;
|
||||
hash: [ubyte];
|
||||
children: [TrieNodeMetadataResponse];
|
||||
children: [TrieNodeMetadataResponse];
|
||||
}
|
||||
|
||||
table TrieNodeSnapshotResponse {
|
||||
@@ -170,5 +170,7 @@ table GetTrieNodesByPrefixRequest {
|
||||
|
||||
// Events Requests
|
||||
|
||||
table SubscribeRequest {}
|
||||
table SubscribeRequest {
|
||||
event_types: [EventType];
|
||||
}
|
||||
|
||||
|
||||
@@ -261,32 +261,13 @@ export class Client {
|
||||
/* Event Methods */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
async subscribe(): HubAsyncResult<ClientReadableStream<flatbuffers.EventResponse>> {
|
||||
const method = definitions.eventDefinition().subscribe;
|
||||
const request = new flatbuffers.SubscribeRequest();
|
||||
const stream = this.client.makeServerStreamRequest(
|
||||
method.path,
|
||||
method.requestSerialize,
|
||||
method.responseDeserialize,
|
||||
request
|
||||
async subscribe(
|
||||
eventTypes?: flatbuffers.EventType[]
|
||||
): HubAsyncResult<ClientReadableStream<flatbuffers.EventResponse>> {
|
||||
return this.makeServerStreamRequest(
|
||||
definitions.eventDefinition().subscribe,
|
||||
requests.eventRequests.subscribeRequest(eventTypes)
|
||||
);
|
||||
stream.on('error', (e) => {
|
||||
return e; // Suppress exceptions
|
||||
});
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
stream.cancel(); // Cancel if not connected within timeout
|
||||
reject(err(new HubError('unavailable.network_failure', 'subscribe timed out')));
|
||||
}, 1_000);
|
||||
stream.on('metadata', (metadata: Metadata) => {
|
||||
clearTimeout(timeout);
|
||||
if (metadata.get('status')[0] === ('ready' as MetadataValue)) {
|
||||
resolve(ok(stream));
|
||||
} else {
|
||||
reject(err(new HubError('unavailable.network_failure', 'subscribe failed')));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
@@ -340,4 +321,36 @@ export class Client {
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async makeServerStreamRequest<RequestType, ResponseType>(
|
||||
method: grpc.MethodDefinition<RequestType, ResponseType>,
|
||||
request: RequestType
|
||||
): HubAsyncResult<ClientReadableStream<ResponseType>> {
|
||||
const stream = this.client.makeServerStreamRequest(
|
||||
method.path,
|
||||
method.requestSerialize,
|
||||
method.responseDeserialize,
|
||||
request
|
||||
);
|
||||
|
||||
stream.on('error', (e) => {
|
||||
return e; // Suppress exceptions
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
stream.cancel(); // Cancel if not connected within timeout
|
||||
reject(err(new HubError('unavailable.network_failure', 'subscribe timed out')));
|
||||
}, 1_000);
|
||||
|
||||
stream.on('metadata', (metadata: Metadata) => {
|
||||
clearTimeout(timeout);
|
||||
if (metadata.get('status')[0] === ('ready' as MetadataValue)) {
|
||||
resolve(ok(stream));
|
||||
} else {
|
||||
reject(err(new HubError('unavailable.network_failure', 'subscribe failed')));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
20
packages/grpc/src/requests/eventRequests.test.ts
Normal file
20
packages/grpc/src/requests/eventRequests.test.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import * as flatbuffers from '@farcaster/flatbuffers';
|
||||
import { eventRequests } from './eventRequests';
|
||||
|
||||
describe('eventRequests', () => {
|
||||
describe('subscribeRequest', () => {
|
||||
test('succeeds with no eventTypes', () => {
|
||||
const subscribeRequest = eventRequests.subscribeRequest();
|
||||
expect(subscribeRequest).toBeInstanceOf(flatbuffers.SubscribeRequest);
|
||||
expect(subscribeRequest.eventTypesLength()).toBe(0);
|
||||
});
|
||||
|
||||
test('succeeds with eventTypes', () => {
|
||||
const eventTypes = [flatbuffers.EventType.MergeMessage, flatbuffers.EventType.PruneMessage];
|
||||
const subscribeRequest = eventRequests.subscribeRequest(eventTypes);
|
||||
expect(subscribeRequest).toBeInstanceOf(flatbuffers.SubscribeRequest);
|
||||
expect(subscribeRequest.eventTypesLength()).toBe(eventTypes.length);
|
||||
eventTypes.forEach((eventType, index) => expect(subscribeRequest.eventTypes(index)).toEqual(eventType));
|
||||
});
|
||||
});
|
||||
});
|
||||
11
packages/grpc/src/requests/eventRequests.ts
Normal file
11
packages/grpc/src/requests/eventRequests.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import * as flatbuffers from '@farcaster/flatbuffers';
|
||||
import { Builder, ByteBuffer } from 'flatbuffers';
|
||||
|
||||
export const eventRequests = {
|
||||
subscribeRequest: (eventTypes?: flatbuffers.EventType[]): flatbuffers.SubscribeRequest => {
|
||||
const builder = new Builder(1);
|
||||
const requestT = new flatbuffers.SubscribeRequestT(eventTypes);
|
||||
builder.finish(requestT.pack(builder));
|
||||
return flatbuffers.SubscribeRequest.getRootAsSubscribeRequest(new ByteBuffer(builder.asUint8Array()));
|
||||
},
|
||||
};
|
||||
@@ -1,6 +1,7 @@
|
||||
export * from './ampRequests';
|
||||
export * from './bulkRequests';
|
||||
export * from './castRequests';
|
||||
export * from './eventRequests';
|
||||
export * from './reactionRequests';
|
||||
export * from './signerRequests';
|
||||
export * from './userDataRequests';
|
||||
|
||||
@@ -16,6 +16,10 @@ import {
|
||||
serializeUserId,
|
||||
} from './utils';
|
||||
|
||||
export type EventFilters = {
|
||||
eventTypes?: flatbuffers.EventType[];
|
||||
};
|
||||
|
||||
const deserializeCall = async <TDeserialized, TFlatbuffer>(
|
||||
call: HubAsyncResult<TFlatbuffer>,
|
||||
deserialize: (fbb: TFlatbuffer) => HubResult<TDeserialized>
|
||||
@@ -353,5 +357,10 @@ export class Client {
|
||||
/* Event Methods */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
// TODO: subscribe
|
||||
/**
|
||||
* Data from this stream can be parsed using `deserializeEventResponse`.
|
||||
*/
|
||||
async subscribe(filters: EventFilters = {}) {
|
||||
return this._grpcClient.subscribe(filters.eventTypes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,3 +136,23 @@ export type NameRegistryEvent = Readonly<{
|
||||
from: string; // Hex string
|
||||
expiry: number;
|
||||
}>;
|
||||
|
||||
export type MessageEventResponse = {
|
||||
flatbuffer: flatbuffers.EventResponse;
|
||||
type: flatbuffers.EventType.MergeMessage | flatbuffers.EventType.PruneMessage | flatbuffers.EventType.RevokeMessage;
|
||||
message: Message;
|
||||
};
|
||||
|
||||
export type IdRegistryEventResponse = {
|
||||
flatbuffer: flatbuffers.EventResponse;
|
||||
type: flatbuffers.EventType.MergeIdRegistryEvent;
|
||||
idRegistryEvent: IdRegistryEvent;
|
||||
};
|
||||
|
||||
export type NameRegistryEventResponse = {
|
||||
flatbuffer: flatbuffers.EventResponse;
|
||||
type: flatbuffers.EventType.MergeNameRegistryEvent;
|
||||
nameRegistryEvent: NameRegistryEvent;
|
||||
};
|
||||
|
||||
export type EventResponse = NameRegistryEventResponse | IdRegistryEventResponse | MessageEventResponse;
|
||||
|
||||
@@ -806,3 +806,154 @@ describe('deserializeIdRegistryEvent', () => {
|
||||
expect(idRegistryEvent.type).toEqual(idRegistryEventFbb.type());
|
||||
});
|
||||
});
|
||||
|
||||
describe('deserializeEventResponse', () => {
|
||||
let eventResponseFbb: flatbuffers.EventResponse;
|
||||
|
||||
describe('MergeIdRegistryEvent event', () => {
|
||||
const type = flatbuffers.EventType.MergeIdRegistryEvent;
|
||||
let idRegistryEventFbb: flatbuffers.IdRegistryEvent;
|
||||
let idRegistryEvent: types.IdRegistryEvent;
|
||||
let eventResponse: types.IdRegistryEventResponse;
|
||||
|
||||
beforeAll(async () => {
|
||||
idRegistryEventFbb = await Factories.IdRegistryEvent.create();
|
||||
idRegistryEvent = await utils.deserializeIdRegistryEvent(idRegistryEventFbb)._unsafeUnwrap();
|
||||
eventResponseFbb = await Factories.EventResponse.create({
|
||||
type,
|
||||
bytes: Array.from(idRegistryEventFbb.bb?.bytes() ?? new Uint8Array()),
|
||||
});
|
||||
eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.IdRegistryEventResponse;
|
||||
});
|
||||
|
||||
test('flatbuffer', () => {
|
||||
expect(eventResponse.flatbuffer).toBe(eventResponseFbb);
|
||||
});
|
||||
|
||||
test('type', () => {
|
||||
expect(eventResponse.type).toBe(type);
|
||||
});
|
||||
|
||||
test('idRegistryEvent', () => {
|
||||
expect(eventResponse.idRegistryEvent).toMatchObject(idRegistryEvent);
|
||||
});
|
||||
});
|
||||
|
||||
describe('MergeNameRegistryEvent event', () => {
|
||||
const type = flatbuffers.EventType.MergeNameRegistryEvent;
|
||||
let nameRegistryEventFbb: flatbuffers.NameRegistryEvent;
|
||||
let nameRegistryEvent: types.NameRegistryEvent;
|
||||
let eventResponse: types.NameRegistryEventResponse;
|
||||
|
||||
beforeAll(async () => {
|
||||
nameRegistryEventFbb = await Factories.NameRegistryEvent.create();
|
||||
nameRegistryEvent = await utils.deserializeNameRegistryEvent(nameRegistryEventFbb)._unsafeUnwrap();
|
||||
eventResponseFbb = await Factories.EventResponse.create({
|
||||
type,
|
||||
bytes: Array.from(nameRegistryEventFbb.bb?.bytes() ?? new Uint8Array()),
|
||||
});
|
||||
eventResponse = utils
|
||||
.deserializeEventResponse(eventResponseFbb)
|
||||
._unsafeUnwrap() as types.NameRegistryEventResponse;
|
||||
});
|
||||
|
||||
test('flatbuffer', () => {
|
||||
expect(eventResponse.flatbuffer).toBe(eventResponseFbb);
|
||||
});
|
||||
|
||||
test('type', () => {
|
||||
expect(eventResponse.type).toBe(type);
|
||||
});
|
||||
|
||||
test('nameRegistryEvent', () => {
|
||||
expect(eventResponse.nameRegistryEvent).toMatchObject(nameRegistryEvent);
|
||||
});
|
||||
});
|
||||
|
||||
describe('MergeMessage event', () => {
|
||||
const type = flatbuffers.EventType.MergeMessage;
|
||||
let messageFbb: flatbuffers.Message;
|
||||
let message: types.Message;
|
||||
let eventResponse: types.MessageEventResponse;
|
||||
|
||||
beforeAll(async () => {
|
||||
messageFbb = await Factories.Message.create();
|
||||
message = await utils.deserializeMessage(messageFbb)._unsafeUnwrap();
|
||||
eventResponseFbb = await Factories.EventResponse.create({
|
||||
type,
|
||||
bytes: Array.from(messageFbb.bb?.bytes() ?? new Uint8Array()),
|
||||
});
|
||||
eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.MessageEventResponse;
|
||||
});
|
||||
|
||||
test('flatbuffer', () => {
|
||||
expect(eventResponse.flatbuffer).toBe(eventResponseFbb);
|
||||
});
|
||||
|
||||
test('type', () => {
|
||||
expect(eventResponse.type).toBe(type);
|
||||
});
|
||||
|
||||
test('message', () => {
|
||||
expect(eventResponse.message).toMatchObject(message);
|
||||
});
|
||||
});
|
||||
|
||||
describe('PruneMessage event', () => {
|
||||
const type = flatbuffers.EventType.PruneMessage;
|
||||
let messageFbb: flatbuffers.Message;
|
||||
let message: types.Message;
|
||||
let eventResponse: types.MessageEventResponse;
|
||||
|
||||
beforeAll(async () => {
|
||||
messageFbb = await Factories.Message.create();
|
||||
message = await utils.deserializeMessage(messageFbb)._unsafeUnwrap();
|
||||
eventResponseFbb = await Factories.EventResponse.create({
|
||||
type,
|
||||
bytes: Array.from(messageFbb.bb?.bytes() ?? new Uint8Array()),
|
||||
});
|
||||
eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.MessageEventResponse;
|
||||
});
|
||||
|
||||
test('flatbuffer', () => {
|
||||
expect(eventResponse.flatbuffer).toBe(eventResponseFbb);
|
||||
});
|
||||
|
||||
test('type', () => {
|
||||
expect(eventResponse.type).toBe(type);
|
||||
});
|
||||
|
||||
test('message', () => {
|
||||
expect(eventResponse.message).toMatchObject(message);
|
||||
});
|
||||
});
|
||||
|
||||
describe('RevokeMessage event', () => {
|
||||
const type = flatbuffers.EventType.RevokeMessage;
|
||||
let messageFbb: flatbuffers.Message;
|
||||
let message: types.Message;
|
||||
let eventResponse: types.MessageEventResponse;
|
||||
|
||||
beforeAll(async () => {
|
||||
messageFbb = await Factories.Message.create();
|
||||
message = await utils.deserializeMessage(messageFbb)._unsafeUnwrap();
|
||||
eventResponseFbb = await Factories.EventResponse.create({
|
||||
type,
|
||||
bytes: Array.from(messageFbb.bb?.bytes() ?? new Uint8Array()),
|
||||
});
|
||||
eventResponse = utils.deserializeEventResponse(eventResponseFbb)._unsafeUnwrap() as types.MessageEventResponse;
|
||||
});
|
||||
|
||||
test('flatbuffer', () => {
|
||||
expect(eventResponse.flatbuffer).toBe(eventResponseFbb);
|
||||
});
|
||||
|
||||
test('type', () => {
|
||||
expect(eventResponse.type).toBe(type);
|
||||
});
|
||||
|
||||
test('message', () => {
|
||||
expect(eventResponse.message).toMatchObject(message);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -17,6 +17,54 @@ import { ByteBuffer } from 'flatbuffers';
|
||||
import { err, ok, Result } from 'neverthrow';
|
||||
import * as types from './types';
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* Event Response */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
export const deserializeEventResponse = (fbb: flatbuffers.EventResponse): HubResult<types.EventResponse> => {
|
||||
const type = fbb.type();
|
||||
|
||||
switch (type) {
|
||||
case flatbuffers.EventType.MergeMessage:
|
||||
case flatbuffers.EventType.PruneMessage:
|
||||
case flatbuffers.EventType.RevokeMessage: {
|
||||
return deserializeMessage(
|
||||
flatbuffers.Message.getRootAsMessage(new ByteBuffer(fbb.bytesArray() ?? new Uint8Array()))
|
||||
).map((message) => {
|
||||
return {
|
||||
flatbuffer: fbb,
|
||||
type,
|
||||
message,
|
||||
};
|
||||
});
|
||||
}
|
||||
case flatbuffers.EventType.MergeIdRegistryEvent: {
|
||||
return deserializeIdRegistryEvent(
|
||||
flatbuffers.IdRegistryEvent.getRootAsIdRegistryEvent(new ByteBuffer(fbb.bytesArray() ?? new Uint8Array()))
|
||||
).map((idRegistryEvent) => {
|
||||
return {
|
||||
flatbuffer: fbb,
|
||||
type,
|
||||
idRegistryEvent,
|
||||
};
|
||||
});
|
||||
}
|
||||
case flatbuffers.EventType.MergeNameRegistryEvent: {
|
||||
return deserializeNameRegistryEvent(
|
||||
flatbuffers.NameRegistryEvent.getRootAsNameRegistryEvent(new ByteBuffer(fbb.bytesArray() ?? new Uint8Array()))
|
||||
).map((nameRegistryEvent) => {
|
||||
return {
|
||||
flatbuffer: fbb,
|
||||
type,
|
||||
nameRegistryEvent,
|
||||
};
|
||||
});
|
||||
}
|
||||
default:
|
||||
return err(new HubError('bad_request.invalid_param', `unknown EventType '${type}'`));
|
||||
}
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* Registry Events */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
@@ -111,3 +111,25 @@ describe('BytesFactory', () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('EventResponseFactory', () => {
|
||||
describe('build', () => {
|
||||
const eventResponse = Factories.EventResponse.build();
|
||||
|
||||
test('generates an EventResponseT', () => {
|
||||
expect(eventResponse).toBeInstanceOf(flatbuffers.EventResponseT);
|
||||
});
|
||||
});
|
||||
|
||||
describe('create', () => {
|
||||
let eventResponse: flatbuffers.EventResponse;
|
||||
|
||||
beforeAll(async () => {
|
||||
eventResponse = await Factories.EventResponse.create();
|
||||
});
|
||||
|
||||
test('generates an EventResponse', () => {
|
||||
expect(eventResponse).toBeInstanceOf(flatbuffers.EventResponse);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -547,6 +547,29 @@ const TsHashHexFactory = Factory.define<string>(() => {
|
||||
return faker.datatype.hexadecimal({ length: 40, case: 'lower' });
|
||||
});
|
||||
|
||||
const EventTypeFactory = Factory.define<flatbuffers.EventType>(() => {
|
||||
return faker.helpers.arrayElement([
|
||||
flatbuffers.EventType.MergeIdRegistryEvent,
|
||||
flatbuffers.EventType.MergeNameRegistryEvent,
|
||||
flatbuffers.EventType.MergeMessage,
|
||||
flatbuffers.EventType.RevokeMessage,
|
||||
flatbuffers.EventType.PruneMessage,
|
||||
]);
|
||||
});
|
||||
|
||||
const EventResponseFactory = Factory.define<flatbuffers.EventResponseT, unknown, flatbuffers.EventResponse>(
|
||||
({ onCreate }) => {
|
||||
onCreate((params) => {
|
||||
const builder = new Builder(1);
|
||||
builder.finish(params.pack(builder));
|
||||
return flatbuffers.EventResponse.getRootAsEventResponse(new ByteBuffer(builder.asUint8Array()));
|
||||
});
|
||||
|
||||
const type = EventTypeFactory.build();
|
||||
return new flatbuffers.EventResponseT(type);
|
||||
}
|
||||
);
|
||||
|
||||
export const Factories = {
|
||||
Bytes: BytesFactory,
|
||||
FID: FIDFactory,
|
||||
@@ -596,4 +619,5 @@ export const Factories = {
|
||||
EthAddressHex: EthAddressHexFactory,
|
||||
Ed25519PublicKeyHex: Ed25519PublicKeyHexFactory,
|
||||
TsHashHex: TsHashHexFactory,
|
||||
EventResponse: EventResponseFactory,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user