mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-01-23 03:58:07 -05:00
fix(hubble): reduce hub bandwidth with floodsub toggle (#1851)
## Motivation - Hub bandwidth usage has increased to around 400-500 KiB per second, or 40-50 GiB per day - Needless bandwidth utilization can increase resource requirements to run Hub ## Context - The default settings for our libp2p library publish to floodsub and subscribe to floodsub topic - Floodsub is a p2p protocol where every message a node receives gets forwarded to all known peers in the network - While there are benefits to Floodsub, it has significant network amplification factors and hits scalability limits as the number of hubs in the network grows ## Change Summary - By default, disable `floodPublish` and `fallbackToFloodsub` - Expose environment variables `GOSSIPSUB_FALLBACK_TO_FLOODSUB` and `GOSSIPSUB_FLOOD_PUBLISH` to toggle the values for gossipsub p2p ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. - [x] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) ## Additional Context If this is a relatively large or complex change, provide more details here that will help reviewers <!-- start pr-codex --> --- ## PR-Codex overview This PR reduces hub bandwidth in the `hubble` app by adding toggles for `GOSSIPSUB_FALLBACK_TO_FLOODSUB` and `GOSSIPSUB_FLOOD_PUBLISH`. ### Detailed summary - Added toggles for `GOSSIPSUB_FALLBACK_TO_FLOODSUB` and `GOSSIPSUB_FLOOD_PUBLISH` in `gossipNode.ts` - Refactored callback handling in `server.ts` - Added environment variable checks for `fallbackToFloodsub` and `floodPublish` in `gossipNodeWorker.ts` - Added stats tracking for message sizes in `gossipNodeWorker.ts` > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` <!-- end pr-codex -->
This commit is contained in:
5
.changeset/flat-seahorses-pay.md
Normal file
5
.changeset/flat-seahorses-pay.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@farcaster/hubble": patch
|
||||
---
|
||||
|
||||
fix(hubble): reduce hub bandwidth, can be toggled with GOSSIPSUB_FALLBACK_TO_FLOODSUB and GOSSIPSUB_FLOOD_PUBLISH
|
||||
@@ -516,6 +516,8 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
|
||||
data = Buffer.from(Object.values(detail.msg.data as unknown as Record<string, number>));
|
||||
}
|
||||
|
||||
statsd().gauge("gossip.message_size_bytes", data.length, { topic: detail.msg.topic });
|
||||
|
||||
const tags: { [key: string]: string } = {
|
||||
topic: detail.msg.topic,
|
||||
};
|
||||
|
||||
@@ -124,15 +124,25 @@ export class LibP2PNode {
|
||||
: LIBP2P_CONNECT_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
const fallbackToFloodsub = process.env["GOSSIPSUB_FALLBACK_TO_FLOODSUB"]
|
||||
? process.env["GOSSIPSUB_FALLBACK_TO_FLOODSUB"] === "true"
|
||||
: false;
|
||||
|
||||
const floodPublish = process.env["GOSSIPSUB_FLOOD_PUBLISH"]
|
||||
? process.env["GOSSIPSUB_FLOOD_PUBLISH"] === "true"
|
||||
: false;
|
||||
|
||||
const gossip = gossipsub({
|
||||
emitSelf: false,
|
||||
gossipsubIWantFollowupMs: gossipsubIWantFollowupMs,
|
||||
allowPublishToZeroPeers: true,
|
||||
asyncValidation: true, // Do not forward messages until we've merged it (prevents forwarding known bad messages)
|
||||
canRelayMessage: true,
|
||||
directPeers: options.directPeers || [],
|
||||
emitSelf: false,
|
||||
fallbackToFloodsub: fallbackToFloodsub,
|
||||
floodPublish: floodPublish,
|
||||
gossipsubIWantFollowupMs: gossipsubIWantFollowupMs,
|
||||
globalSignaturePolicy: options.strictNoSign ? "StrictNoSign" : "StrictSign",
|
||||
msgIdFn: this.getMessageId.bind(this),
|
||||
directPeers: options.directPeers || [],
|
||||
canRelayMessage: true,
|
||||
seenTTL: GOSSIP_SEEN_TTL, // Bump up the default to handle large flood of messages. 2 mins was not sufficient to prevent a loop
|
||||
scoreThresholds: { ...options.scoreThresholds },
|
||||
scoreParams: {
|
||||
@@ -663,6 +673,8 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => {
|
||||
const specificMsg = msg as LibP2PNodeMessage<"gossipMessage">;
|
||||
const [message] = specificMsg.args;
|
||||
|
||||
statsd().gauge("gossip.worker.gossip_message_size_bytes", message.length, 1, { method: "gossipSubmitMessage" });
|
||||
|
||||
const publishResult = Result.combine(await libp2pNode.gossipMessage(Message.decode(message)));
|
||||
const flattenedPeerIds = publishResult.isOk() ? publishResult.value.flatMap((r) => r.recipients) : [];
|
||||
|
||||
@@ -681,6 +693,8 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => {
|
||||
const specificMsg = msg as LibP2PNodeMessage<"gossipContactInfo">;
|
||||
const [contactInfo] = specificMsg.args;
|
||||
|
||||
statsd().gauge("gossip.worker.gossip_message_size_bytes", contactInfo.length, 1, { method: "gossipContactInfo" });
|
||||
|
||||
const publishResult = Result.combine(await libp2pNode.gossipContactInfo(ContactInfoContent.decode(contactInfo)));
|
||||
const flattenedPeerIds = publishResult.isOk() ? publishResult.value.flatMap((r) => r.recipients) : [];
|
||||
|
||||
|
||||
@@ -485,7 +485,8 @@ export default class Server {
|
||||
messages = messages.filter((message) => message.data !== undefined && message.hash.length > 0);
|
||||
}
|
||||
|
||||
callback(null, MessagesResponse.create({ messages }));
|
||||
const response = MessagesResponse.create({ messages });
|
||||
callback(null, response);
|
||||
},
|
||||
(err: HubError) => {
|
||||
callback(toServiceError(err));
|
||||
|
||||
Reference in New Issue
Block a user