fix: make shard id required for the hub subscriber (#2623)

## Why is this change needed?
It's hard for the client to determine where to resume from if they're
subscribed to multiple shards.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on making the `shardIndex` parameter required for the
`hubSubscriber` and removing the optional `totalShards` parameter. It
ensures that the `shardIndex` is always provided when creating instances
of `BaseHubSubscriber` and `EventStreamHubSubscriber`.

### Detailed summary
- Made `shardIndex` a required parameter in the constructors of
`BaseHubSubscriber` and `EventStreamHubSubscriber`.
- Removed the optional `totalShards` parameter from both constructors.
- Updated the constructor calls to reflect the removal of `totalShards`.

>  Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
This commit is contained in:
Aditi Srinivasan
2025-07-07 16:56:49 -04:00
committed by GitHub
parent 74149586c9
commit 83a9586844
2 changed files with 10 additions and 10 deletions

View File

@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---
fix: make shard id required for the hub subscriber

View File

@@ -59,24 +59,21 @@ export class BaseHubSubscriber extends HubSubscriber {
protected eventTypes: HubEventType[];
private stream: ClientReadableStream<HubEvent> | null = null;
private totalShards: number | undefined;
private shardIndex: number | undefined;
private shardIndex: number;
private connectionTimeout: number; // milliseconds
constructor(
label: string,
hubClient: HubRpcClient,
shardIndex: number,
log: Logger,
eventTypes?: HubEventType[],
totalShards?: number,
shardIndex?: number,
connectionTimeout = 30000,
) {
super();
this.label = label;
this.hubClient = hubClient;
this.log = log;
this.totalShards = totalShards;
this.shardIndex = shardIndex;
this.eventTypes = eventTypes || DEFAULT_EVENT_TYPES;
this.connectionTimeout = connectionTimeout;
@@ -120,7 +117,6 @@ export class BaseHubSubscriber extends HubSubscriber {
const subscribeParams = {
eventTypes: this.eventTypes,
totalShards: this.totalShards,
shardIndex: this.shardIndex,
fromId,
};
@@ -131,7 +127,7 @@ export class BaseHubSubscriber extends HubSubscriber {
this.log.info(
`HubSubscriber ${this.label} subscribed to hub events (types ${JSON.stringify(this.eventTypes)}, shard: ${
this.shardIndex
}/${this.totalShards})`,
})`,
);
this.stream = stream;
this.stopped = false;
@@ -217,17 +213,16 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
constructor(
label: string,
hubClient: HubClient,
shardIndex: number,
eventStream: EventStreamConnection,
redis: RedisClient,
shardKey: string,
log: Logger,
eventTypes?: HubEventType[],
totalShards?: number,
shardIndex?: number,
connectionTimeout?: number,
options?: EventStreamHubSubscriberOptions,
) {
super(label, hubClient.client, log, eventTypes, totalShards, shardIndex, connectionTimeout);
super(label, hubClient.client, shardIndex, log, eventTypes, connectionTimeout);
this.eventStream = eventStream;
this.redis = redis;
this.streamKey = `hub:${hubClient.host}:evt:msg:${shardKey}`;