From d4a47659a5d3065b6a2a571b4e137bf6f33531d7 Mon Sep 17 00:00:00 2001 From: twoeths <10568965+twoeths@users.noreply.github.com> Date: Tue, 16 Dec 2025 20:47:13 +0700 Subject: [PATCH] feat: transfer pending gossipsub message msg data (#8689) **Motivation** - improve memory by transferirng gossipsub message data from network thread to the main thread - In snappy decompression in #8647 we had to do `Buffer.alloc()` instead of `Buffer.allocUnsafe()`. We don't have to feel bad about that because `Buffer.allocUnsafe()` does not work with this PR, and we don't waste any memory. **Description** - use `transferList` param when posting messages from network thread to the main thread part of #8629 **Testing** I've tested this on `feat2` for 3 days, the previous branch was #8671 so it's basically the current stable, does not see significant improvement but some good data for different nodes - no change on 1k or `novc` - on hoodi `sas` node we have better memory there on main thread with same mesh peers, same memory on network thread Screenshot 2025-12-12 at 11 05 27 - on mainnnet `sas` node, we have better memory on network thread, a little bit worse on the main thread Screenshot 2025-12-12 at 11 08 42 - but for this mainnet node, the most interesting metric is `forward msg avg peers`, we're faster than majority of them Screenshot 2025-12-12 at 11 11 00 --------- Co-authored-by: Tuyen Nguyen --- packages/beacon-node/src/network/core/events.ts | 2 +- packages/beacon-node/src/network/events.ts | 8 +++++++- packages/beacon-node/src/util/workerEvents.ts | 17 +++++++++-------- .../network/onWorker/dataSerialization.test.ts | 2 +- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/packages/beacon-node/src/network/core/events.ts b/packages/beacon-node/src/network/core/events.ts index 31bb0cb5d8..d97341569a 100644 --- a/packages/beacon-node/src/network/core/events.ts +++ b/packages/beacon-node/src/network/core/events.ts @@ -2,7 +2,7 @@ import EventEmitter from "node:events"; import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp"; import {AsyncIterableEventBus, IteratorEvent, RequestEvent} from "../../util/asyncIterableToEvents.js"; import {StrictEventEmitterSingleArg} from "../../util/strictEvents.js"; -import {EventDirection} from "../../util/workerEvents.js"; +import {EventDirection} from "../events.js"; import {IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js"; export enum ReqRespBridgeEvent { diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index a960ade5b8..20d34b9966 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -3,7 +3,6 @@ import {PeerId, TopicValidatorResult} from "@libp2p/interface"; import {CustodyIndex, Status} from "@lodestar/types"; import {PeerIdStr} from "../util/peerId.js"; import {StrictEventEmitterSingleArg} from "../util/strictEvents.js"; -import {EventDirection} from "../util/workerEvents.js"; import {PendingGossipsubMessage} from "./processor/types.js"; import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js"; @@ -38,6 +37,13 @@ export type NetworkEventData = { }; }; +export enum EventDirection { + workerToMain, + mainToWorker, + /** Event not emitted through worker boundary */ + none, +} + export const networkEventDirection: Record = { [NetworkEvent.peerConnected]: EventDirection.workerToMain, [NetworkEvent.peerDisconnected]: EventDirection.workerToMain, diff --git a/packages/beacon-node/src/util/workerEvents.ts b/packages/beacon-node/src/util/workerEvents.ts index 807bf7a306..24941bd316 100644 --- a/packages/beacon-node/src/util/workerEvents.ts +++ b/packages/beacon-node/src/util/workerEvents.ts @@ -1,9 +1,11 @@ import {MessagePort, Worker} from "node:worker_threads"; +import {Message} from "@libp2p/interface"; import {Thread} from "@chainsafe/threads"; import {Logger} from "@lodestar/logger"; import {sleep} from "@lodestar/utils"; import {Metrics} from "../metrics/metrics.js"; import {NetworkCoreWorkerMetrics} from "../network/core/metrics.js"; +import {EventDirection, NetworkEvent} from "../network/events.js"; import {StrictEventEmitterSingleArg} from "./strictEvents.js"; const NANO_TO_SECOND_CONVERSION = 1e9; @@ -15,13 +17,6 @@ export type WorkerBridgeEvent = { data: EventData[keyof EventData]; }; -export enum EventDirection { - workerToMain, - mainToWorker, - /** Event not emitted through worker boundary */ - none, -} - /** * Bridges events from worker to main thread * Each event can only have one direction: @@ -63,7 +58,13 @@ export function wireEventsOnWorkerThread( posted: process.hrtime(), data, }; - parentPort.postMessage(workerEvent); + let transferList: ArrayBuffer[] | undefined = undefined; + if (eventName === NetworkEvent.pendingGossipsubMessage) { + const payload = data as {msg: Message}; + // Transfer the underlying ArrayBuffer to avoid copy for PendingGossipsubMessage + transferList = [payload.msg.data.buffer as ArrayBuffer]; + } + parentPort.postMessage(workerEvent, transferList); }); } } diff --git a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts index 0411c70a5a..35fdc57f4d 100644 --- a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts +++ b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts @@ -9,6 +9,7 @@ import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js"; import {ReqRespBridgeEvent, ReqRespBridgeEventData} from "../../../../src/network/core/events.js"; import {NetworkWorkerApi} from "../../../../src/network/core/index.js"; import { + EventDirection, GossipType, NetworkEvent, NetworkEventData, @@ -18,7 +19,6 @@ import { } from "../../../../src/network/index.js"; import {CommitteeSubscription} from "../../../../src/network/subnets/interface.js"; import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js"; -import {EventDirection} from "../../../../src/util/workerEvents.js"; import {getValidPeerId, validPeerIdStr} from "../../../utils/peer.js"; import {EchoWorker, getEchoWorker} from "./workerEchoHandler.js";