feat: support fromId param in subscribe method (#604)

* support fromId param in subscribe method

* add fromId to js event filters

* support filtering by fromId and eventTypes

* add changeset
This commit is contained in:
Paul Fletcher-Hill
2023-02-23 12:32:05 -05:00
committed by GitHub
parent 22a9d46015
commit d04d5d4a3c
10 changed files with 292 additions and 92 deletions

View File

@@ -0,0 +1,7 @@
---
'@farcaster/protobufs': patch
'@farcaster/utils': patch
'@farcaster/js': patch
---
add fromId to SubscribeRequest protobuf and subscribe gRPC method

View File

@@ -531,9 +531,9 @@ export class Hub implements HubInterface {
const mergeResult = await this.engine.mergeMessage(message);
mergeResult.match(
() => {
(eventId) => {
logMessage.info(
`submitMessage success: fid ${message.data?.fid} merged ${messageTypeToName(
`submitMessage success ${eventId}: fid ${message.data?.fid} merged ${messageTypeToName(
message.data?.type
)} ${bytesToHexString(message.hash)._unsafeUnwrap()}`
);
@@ -552,9 +552,9 @@ export class Hub implements HubInterface {
const mergeResult = await this.engine.mergeIdRegistryEvent(event);
mergeResult.match(
() => {
(eventId) => {
logEvent.info(
`submitIdRegistryEvent success: fid ${event.fid} assigned to ${bytesToHexString(
`submitIdRegistryEvent success ${eventId}: fid ${event.fid} assigned to ${bytesToHexString(
event.to
)._unsafeUnwrap()} in block ${event.blockNumber}`
);
@@ -573,9 +573,9 @@ export class Hub implements HubInterface {
const mergeResult = await this.engine.mergeNameRegistryEvent(event);
mergeResult.match(
() => {
(eventId) => {
logEvent.info(
`submitNameRegistryEvent success: fname ${bytesToUtf8String(
`submitNameRegistryEvent success ${eventId}: fname ${bytesToUtf8String(
event.fname
)._unsafeUnwrap()} assigned to ${bytesToHexString(event.to)._unsafeUnwrap()} in block ${event.blockNumber}`
);

View File

@@ -544,13 +544,34 @@ export default class Server {
}
);
},
getEvent: async (call, callback) => {
const result = await this.engine?.getEvent(call.request.id);
result?.match(
(event: HubEvent) => callback(null, event),
(err: HubError) => callback(toServiceError(err))
);
},
subscribe: async (stream) => {
const { request } = stream;
if (this.engine && request.fromId) {
const eventsIterator = this.engine.eventHandler.getEventsIterator(request.fromId);
if (eventsIterator.isErr()) {
stream.destroy(eventsIterator.error);
return;
}
for await (const [, value] of eventsIterator.value) {
const event = HubEvent.decode(Uint8Array.from(value as Buffer));
if (request.eventTypes.length === 0 || request.eventTypes.includes(event.type)) {
stream.write(event);
}
}
}
const eventListener = (event: HubEvent) => {
stream.write(event);
};
const { request } = stream;
// if no type filters are provided, subscribe to all event types
if (request.eventTypes.length === 0) {
this.engine?.eventHandler.on('mergeMessage', eventListener);
@@ -585,10 +606,6 @@ export default class Server {
this.engine?.eventHandler.off('mergeIdRegistryEvent', eventListener);
this.engine?.eventHandler.off('mergeNameRegistryEvent', eventListener);
});
const readyMetadata = new Metadata();
readyMetadata.add('status', 'ready');
stream.sendMetadata(readyMetadata);
},
};
};

View File

@@ -32,6 +32,20 @@ let custodyEvent: protobufs.IdRegistryEvent;
let nameRegistryEvent: protobufs.NameRegistryEvent;
let signerAdd: protobufs.SignerAddMessage;
let castAdd: protobufs.CastAddMessage;
let reactionAdd: protobufs.ReactionAddMessage;
let events: [protobufs.HubEventType, any][];
let stream: protobufs.ClientReadableStream<protobufs.HubEvent>;
beforeEach(() => {
events = [];
});
afterEach(() => {
if (stream) {
stream.cancel();
}
});
beforeAll(async () => {
custodyEvent = Factories.IdRegistryEvent.build({ to: ethSigner.signerKey, fid });
@@ -41,51 +55,49 @@ beforeAll(async () => {
{ transient: { signer: ethSigner } }
);
castAdd = await Factories.CastAddMessage.create({ data: { fid } }, { transient: { signer } });
reactionAdd = await Factories.ReactionAddMessage.create({ data: { fid } }, { transient: { signer } });
});
const setupSubscription = async (
events: [protobufs.HubEventType, any][],
options: { eventTypes?: protobufs.HubEventType[]; fromId?: number } = {}
): Promise<protobufs.ClientReadableStream<protobufs.HubEvent>> => {
const request = protobufs.SubscribeRequest.create(options);
const streamResult = await client.subscribe(request);
expect(streamResult.isOk()).toBeTruthy();
const stream = streamResult._unsafeUnwrap();
stream.on('data', (event: protobufs.HubEvent) => {
if (protobufs.isMergeMessageHubEvent(event)) {
events.push([event.type, protobufs.Message.toJSON(event.mergeMessageBody.message!)]);
} else if (protobufs.isPruneMessageHubEvent(event)) {
events.push([event.type, protobufs.Message.toJSON(event.pruneMessageBody.message!)]);
} else if (protobufs.isRevokeMessageHubEvent(event)) {
events.push([event.type, protobufs.Message.toJSON(event.revokeMessageBody.message!)]);
} else if (protobufs.isMergeIdRegistryEventHubEvent(event)) {
events.push([event.type, protobufs.IdRegistryEvent.toJSON(event.mergeIdRegistryEventBody.idRegistryEvent!)]);
} else if (protobufs.isMergeNameRegistryEventHubEvent(event)) {
events.push([
event.type,
protobufs.NameRegistryEvent.toJSON(event.mergeNameRegistryEventBody.nameRegistryEvent!),
]);
}
});
await sleep(100); // Wait for server to start listeners
return stream;
};
describe('subscribe', () => {
const setupSubscription = (eventTypes?: protobufs.HubEventType[]) => {
let stream: protobufs.ClientReadableStream<protobufs.HubEvent>;
const events: [protobufs.HubEventType, any][] = [];
beforeEach(async () => {
const request = protobufs.SubscribeRequest.create({ eventTypes: eventTypes ?? [] });
stream = (await client.subscribe(request))._unsafeUnwrap();
stream.on('data', (event: protobufs.HubEvent) => {
// events.push(protobufs.HubEvent.toJSON(event));
if (protobufs.isMergeMessageHubEvent(event)) {
events.push([event.type, protobufs.Message.toJSON(event.mergeMessageBody.message!)]);
} else if (protobufs.isPruneMessageHubEvent(event)) {
events.push([event.type, protobufs.Message.toJSON(event.pruneMessageBody.message!)]);
} else if (protobufs.isRevokeMessageHubEvent(event)) {
events.push([event.type, protobufs.Message.toJSON(event.revokeMessageBody.message!)]);
} else if (protobufs.isMergeIdRegistryEventHubEvent(event)) {
events.push([event.type, protobufs.IdRegistryEvent.toJSON(event.mergeIdRegistryEventBody.idRegistryEvent!)]);
} else if (protobufs.isMergeNameRegistryEventHubEvent(event)) {
events.push([
event.type,
protobufs.NameRegistryEvent.toJSON(event.mergeNameRegistryEventBody.nameRegistryEvent!),
]);
}
});
});
afterEach(async () => {
await stream?.cancel();
});
return { events };
};
describe('without type filters', () => {
const { events } = setupSubscription();
test('emits event', async () => {
test('emits events', async () => {
stream = await setupSubscription(events);
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
await engine.mergeMessage(castAdd);
await sleep(1_000); // Wait for server to send events over stream
await sleep(100); // Wait for server to send events over stream
expect(events).toEqual([
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT, protobufs.IdRegistryEvent.toJSON(custodyEvent)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(signerAdd)],
@@ -95,14 +107,16 @@ describe('subscribe', () => {
});
describe('with one type filter', () => {
const { events } = setupSubscription([protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE]);
test('emits events', async () => {
stream = await setupSubscription(events, {
eventTypes: [protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE],
});
test('emits event', async () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeNameRegistryEvent(nameRegistryEvent);
await engine.mergeMessage(signerAdd);
await engine.mergeMessage(castAdd);
await sleep(1_000); // Wait for server to send events over stream
await sleep(100); // Wait for server to send events over stream
expect(events).toEqual([
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(signerAdd)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(castAdd)],
@@ -111,18 +125,20 @@ describe('subscribe', () => {
});
describe('with multiple type filters', () => {
const { events } = setupSubscription([
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE,
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_NAME_REGISTRY_EVENT,
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT,
]);
test('emits events', async () => {
stream = await setupSubscription(events, {
eventTypes: [
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE,
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_NAME_REGISTRY_EVENT,
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT,
],
});
test('emits event', async () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeNameRegistryEvent(nameRegistryEvent);
await engine.mergeMessage(signerAdd);
await engine.mergeMessage(castAdd);
await sleep(1_000); // Wait for server to send events over stream
await sleep(100); // Wait for server to send events over stream
expect(events).toEqual([
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT, protobufs.IdRegistryEvent.toJSON(custodyEvent)],
[
@@ -134,4 +150,60 @@ describe('subscribe', () => {
]);
});
});
describe('with fromId', () => {
test('emits events from id onwards', async () => {
await engine.mergeIdRegistryEvent(custodyEvent);
const idResult = await engine.mergeMessage(signerAdd);
await engine.mergeMessage(castAdd);
stream = await setupSubscription(events, { fromId: idResult._unsafeUnwrap() });
await engine.mergeNameRegistryEvent(nameRegistryEvent);
await engine.mergeMessage(reactionAdd);
await sleep(100);
expect(events).toEqual([
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(signerAdd)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(castAdd)],
[
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_NAME_REGISTRY_EVENT,
protobufs.NameRegistryEvent.toJSON(nameRegistryEvent),
],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(reactionAdd)],
]);
});
test('emits events with early id', async () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
stream = await setupSubscription(events, { fromId: 1 });
expect(events).toEqual([
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT, protobufs.IdRegistryEvent.toJSON(custodyEvent)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(signerAdd)],
]);
});
});
describe('with fromId and type filters', () => {
test('emits events', async () => {
const idResult = await engine.mergeNameRegistryEvent(nameRegistryEvent);
await engine.mergeIdRegistryEvent(custodyEvent);
stream = await setupSubscription(events, {
fromId: idResult._unsafeUnwrap(),
eventTypes: [
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE,
protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT,
],
});
await engine.mergeMessage(signerAdd);
await engine.mergeMessages([castAdd, reactionAdd]);
await sleep(100);
expect(events).toEqual([
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_ID_REGISTRY_EVENT, protobufs.IdRegistryEvent.toJSON(custodyEvent)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(signerAdd)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(castAdd)],
[protobufs.HubEventType.HUB_EVENT_TYPE_MERGE_MESSAGE, protobufs.Message.toJSON(reactionAdd)],
]);
});
});
});

View File

@@ -109,6 +109,14 @@ class Engine {
return ok(undefined);
}
/* -------------------------------------------------------------------------- */
/* Event Methods */
/* -------------------------------------------------------------------------- */
async getEvent(id: number): HubAsyncResult<protobufs.HubEvent> {
return this.eventHandler.getEvent(id);
}
/* -------------------------------------------------------------------------- */
/* Sync Methods */
/* -------------------------------------------------------------------------- */

View File

@@ -15,7 +15,7 @@ import {
PruneMessageHubEvent,
RevokeMessageHubEvent,
} from '@farcaster/protobufs';
import { FARCASTER_EPOCH, HubAsyncResult, HubError, HubResult } from '@farcaster/utils';
import { bytesIncrement, FARCASTER_EPOCH, HubAsyncResult, HubError, HubResult } from '@farcaster/utils';
import { err, ok, ResultAsync } from 'neverthrow';
import AbstractRocksDB from 'rocksdb';
import { TypedEmitter } from 'tiny-typed-emitter';
@@ -126,14 +126,28 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
this._generator = new HubEventIdGenerator({ epoch: FARCASTER_EPOCH });
}
getEventsIterator(): AbstractRocksDB.Iterator {
const prefix = Buffer.from([RootPrefix.HubEvents]);
return this._db.iteratorByPrefix(prefix, { keys: false, valueAsBuffer: true });
async getEvent(id: number): HubAsyncResult<HubEvent> {
const key = makeEventKey(id);
const result = await ResultAsync.fromPromise(this._db.get(key), (e) => e as HubError);
return result.map((buffer) => HubEvent.decode(new Uint8Array(buffer as Buffer)));
}
async getEvents(): HubAsyncResult<HubEvent[]> {
getEventsIterator(fromId?: number): HubResult<AbstractRocksDB.Iterator> {
const minKey = makeEventKey(fromId);
const maxKey = bytesIncrement(Uint8Array.from(makeEventKey()));
if (maxKey.isErr()) {
return err(maxKey.error);
}
return ok(this._db.iterator({ gte: minKey, lt: Buffer.from(maxKey.value), keys: false, valueAsBuffer: true }));
}
async getEvents(fromId?: number): HubAsyncResult<HubEvent[]> {
const events: HubEvent[] = [];
for await (const [, value] of this.getEventsIterator()) {
const iterator = this.getEventsIterator(fromId);
if (iterator.isErr()) {
return err(iterator.error);
}
for await (const [, value] of iterator.value) {
const event = HubEvent.decode(Uint8Array.from(value as Buffer));
events.push(event);
}

View File

@@ -6,6 +6,7 @@ import * as utils from './utils';
export type EventFilters = {
eventTypes?: protobufs.HubEventType[];
fromId?: number;
};
const deserializeCall = async <TDeserialized, TProtobuf>(
@@ -228,7 +229,7 @@ export class Client {
/* -------------------------------------------------------------------------- */
/**
* Data from this stream can be parsed using `deserializeEventResponse`.
* Data from this stream can be parsed using `deserializeHubEvent`.
*/
async subscribe(filters: EventFilters = {}) {
const request = protobufs.SubscribeRequest.create({ ...filters });

View File

@@ -34,6 +34,11 @@ export interface Empty {
export interface SubscribeRequest {
eventTypes: HubEventType[];
fromId: number;
}
export interface EventRequest {
id: number;
}
/** Response Types for the Sync RPC Methods */
@@ -157,7 +162,7 @@ export const Empty = {
};
function createBaseSubscribeRequest(): SubscribeRequest {
return { eventTypes: [] };
return { eventTypes: [], fromId: 0 };
}
export const SubscribeRequest = {
@@ -167,6 +172,9 @@ export const SubscribeRequest = {
writer.int32(v);
}
writer.ldelim();
if (message.fromId !== 0) {
writer.uint32(16).uint64(message.fromId);
}
return writer;
},
@@ -187,6 +195,9 @@ export const SubscribeRequest = {
message.eventTypes.push(reader.int32() as any);
}
break;
case 2:
message.fromId = longToNumber(reader.uint64() as Long);
break;
default:
reader.skipType(tag & 7);
break;
@@ -198,6 +209,7 @@ export const SubscribeRequest = {
fromJSON(object: any): SubscribeRequest {
return {
eventTypes: Array.isArray(object?.eventTypes) ? object.eventTypes.map((e: any) => hubEventTypeFromJSON(e)) : [],
fromId: isSet(object.fromId) ? Number(object.fromId) : 0,
};
},
@@ -208,6 +220,7 @@ export const SubscribeRequest = {
} else {
obj.eventTypes = [];
}
message.fromId !== undefined && (obj.fromId = Math.round(message.fromId));
return obj;
},
@@ -218,6 +231,58 @@ export const SubscribeRequest = {
fromPartial<I extends Exact<DeepPartial<SubscribeRequest>, I>>(object: I): SubscribeRequest {
const message = createBaseSubscribeRequest();
message.eventTypes = object.eventTypes?.map((e) => e) || [];
message.fromId = object.fromId ?? 0;
return message;
},
};
function createBaseEventRequest(): EventRequest {
return { id: 0 };
}
export const EventRequest = {
encode(message: EventRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.id !== 0) {
writer.uint32(8).uint64(message.id);
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): EventRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseEventRequest();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.id = longToNumber(reader.uint64() as Long);
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): EventRequest {
return { id: isSet(object.id) ? Number(object.id) : 0 };
},
toJSON(message: EventRequest): unknown {
const obj: any = {};
message.id !== undefined && (obj.id = Math.round(message.id));
return obj;
},
create<I extends Exact<DeepPartial<EventRequest>, I>>(base?: I): EventRequest {
return EventRequest.fromPartial(base ?? {});
},
fromPartial<I extends Exact<DeepPartial<EventRequest>, I>>(object: I): EventRequest {
const message = createBaseEventRequest();
message.id = object.id ?? 0;
return message;
},
};
@@ -1230,6 +1295,15 @@ export const HubServiceService = {
responseSerialize: (value: HubEvent) => Buffer.from(HubEvent.encode(value).finish()),
responseDeserialize: (value: Buffer) => HubEvent.decode(value),
},
getEvent: {
path: "/HubService/GetEvent",
requestStream: false,
responseStream: false,
requestSerialize: (value: EventRequest) => Buffer.from(EventRequest.encode(value).finish()),
requestDeserialize: (value: Buffer) => EventRequest.decode(value),
responseSerialize: (value: HubEvent) => Buffer.from(HubEvent.encode(value).finish()),
responseDeserialize: (value: Buffer) => HubEvent.decode(value),
},
/** Casts */
getCast: {
path: "/HubService/GetCast",
@@ -1482,6 +1556,7 @@ export interface HubServiceServer extends UntypedServiceImplementation {
submitNameRegistryEvent: handleUnaryCall<NameRegistryEvent, NameRegistryEvent>;
/** Event Methods */
subscribe: handleServerStreamingCall<SubscribeRequest, HubEvent>;
getEvent: handleUnaryCall<EventRequest, HubEvent>;
/** Casts */
getCast: handleUnaryCall<CastId, Message>;
getCastsByFid: handleUnaryCall<FidRequest, MessagesResponse>;
@@ -1568,6 +1643,18 @@ export interface HubServiceClient extends Client {
metadata?: Metadata,
options?: Partial<CallOptions>,
): ClientReadableStream<HubEvent>;
getEvent(request: EventRequest, callback: (error: ServiceError | null, response: HubEvent) => void): ClientUnaryCall;
getEvent(
request: EventRequest,
metadata: Metadata,
callback: (error: ServiceError | null, response: HubEvent) => void,
): ClientUnaryCall;
getEvent(
request: EventRequest,
metadata: Metadata,
options: Partial<CallOptions>,
callback: (error: ServiceError | null, response: HubEvent) => void,
): ClientUnaryCall;
/** Casts */
getCast(request: CastId, callback: (error: ServiceError | null, response: Message) => void): ClientUnaryCall;
getCast(

View File

@@ -9,6 +9,11 @@ message Empty {}
message SubscribeRequest {
repeated HubEventType event_types = 1;
uint64 from_id = 2;
}
message EventRequest {
uint64 id = 1;
}
// Response Types for the Sync RPC Methods
@@ -20,25 +25,25 @@ message HubInfoResponse {
}
message TrieNodeMetadataResponse {
bytes prefix = 1;
uint64 num_messages = 2;
string hash = 3;
repeated TrieNodeMetadataResponse children = 4;
bytes prefix = 1;
uint64 num_messages = 2;
string hash = 3;
repeated TrieNodeMetadataResponse children = 4;
}
message TrieNodeSnapshotResponse {
bytes prefix = 1;
repeated string excluded_hashes = 2;
uint64 num_messages = 3;
string root_hash = 4;
bytes prefix = 1;
repeated string excluded_hashes = 2;
uint64 num_messages = 3;
string root_hash = 4;
}
message TrieNodePrefix {
bytes prefix = 1;
bytes prefix = 1;
}
message SyncIds {
repeated bytes sync_ids = 1;
repeated bytes sync_ids = 1;
}
message FidRequest {
@@ -96,6 +101,7 @@ service HubService {
// Event Methods
rpc Subscribe(SubscribeRequest) returns (stream HubEvent);
rpc GetEvent(EventRequest) returns (HubEvent);
// Casts
rpc GetCast(CastId) returns (Message);

View File

@@ -83,22 +83,10 @@ const promisifyClient = <C extends Client>(client: C) => {
const stream = func.call(target, ...args);
stream.on('error', (e: unknown) => {
return e; // Suppress exceptions
return e; // TODO: improve stream error handling
});
const timeout = setTimeout(() => {
stream.cancel(); // Cancel if not connected within timeout
resolve(err(new HubError('unavailable.network_failure', 'subscribe timed out')));
}, 1_000);
stream.on('metadata', (metadata: Metadata) => {
clearTimeout(timeout);
if (metadata.get('status')[0] === 'ready') {
resolve(ok(stream));
} else {
resolve(err(new HubError('unavailable.network_failure', 'subscribe failed')));
}
});
resolve(ok(stream));
});
};
}