From 83a95868442c266e5ec829ff1e123d52227a1108 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Mon, 7 Jul 2025 16:56:49 -0400 Subject: [PATCH] fix: make shard id required for the hub subscriber (#2623) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why is this change needed? It's hard for the client to determine where to resume from if they're subscribed to multiple shards. ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. --- ## PR-Codex overview This PR focuses on making the `shardIndex` parameter required for the `hubSubscriber` and removing the optional `totalShards` parameter. It ensures that the `shardIndex` is always provided when creating instances of `BaseHubSubscriber` and `EventStreamHubSubscriber`. ### Detailed summary - Made `shardIndex` a required parameter in the constructors of `BaseHubSubscriber` and `EventStreamHubSubscriber`. - Removed the optional `totalShards` parameter from both constructors. - Updated the constructor calls to reflect the removal of `totalShards`. > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` --- .changeset/spotty-icons-move.md | 5 +++++ packages/shuttle/src/shuttle/hubSubscriber.ts | 15 +++++---------- 2 files changed, 10 insertions(+), 10 deletions(-) create mode 100644 .changeset/spotty-icons-move.md diff --git a/.changeset/spotty-icons-move.md b/.changeset/spotty-icons-move.md new file mode 100644 index 00000000..13b91187 --- /dev/null +++ b/.changeset/spotty-icons-move.md @@ -0,0 +1,5 @@ +--- +"@farcaster/shuttle": patch +--- + +fix: make shard id required for the hub subscriber diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index f83c8b76..e7c0f200 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -59,24 +59,21 @@ export class BaseHubSubscriber extends HubSubscriber { protected eventTypes: HubEventType[]; private stream: ClientReadableStream | null = null; - private totalShards: number | undefined; - private shardIndex: number | undefined; + private shardIndex: number; private connectionTimeout: number; // milliseconds constructor( label: string, hubClient: HubRpcClient, + shardIndex: number, log: Logger, eventTypes?: HubEventType[], - totalShards?: number, - shardIndex?: number, connectionTimeout = 30000, ) { super(); this.label = label; this.hubClient = hubClient; this.log = log; - this.totalShards = totalShards; this.shardIndex = shardIndex; this.eventTypes = eventTypes || DEFAULT_EVENT_TYPES; this.connectionTimeout = connectionTimeout; @@ -120,7 +117,6 @@ export class BaseHubSubscriber extends HubSubscriber { const subscribeParams = { eventTypes: this.eventTypes, - totalShards: this.totalShards, shardIndex: this.shardIndex, fromId, }; @@ -131,7 +127,7 @@ export class BaseHubSubscriber extends HubSubscriber { this.log.info( `HubSubscriber ${this.label} subscribed to hub events (types ${JSON.stringify(this.eventTypes)}, shard: ${ this.shardIndex - }/${this.totalShards})`, + })`, ); this.stream = stream; this.stopped = false; @@ -217,17 +213,16 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { constructor( label: string, hubClient: HubClient, + shardIndex: number, eventStream: EventStreamConnection, redis: RedisClient, shardKey: string, log: Logger, eventTypes?: HubEventType[], - totalShards?: number, - shardIndex?: number, connectionTimeout?: number, options?: EventStreamHubSubscriberOptions, ) { - super(label, hubClient.client, log, eventTypes, totalShards, shardIndex, connectionTimeout); + super(label, hubClient.client, shardIndex, log, eventTypes, connectionTimeout); this.eventStream = eventStream; this.redis = redis; this.streamKey = `hub:${hubClient.host}:evt:msg:${shardKey}`;