feat: transfer pending gossipsub message msg data (#8689)

**Motivation**

- improve memory by transferirng gossipsub message data from network
thread to the main thread
- In snappy decompression in #8647 we had to do `Buffer.alloc()` instead
of `Buffer.allocUnsafe()`. We don't have to feel bad about that because
`Buffer.allocUnsafe()` does not work with this PR, and we don't waste
any memory.

**Description**

- use `transferList` param when posting messages from network thread to
the main thread

part of #8629

**Testing**
I've tested this on `feat2` for 3 days, the previous branch was #8671 so
it's basically the current stable, does not see significant improvement
but some good data for different nodes
- no change on 1k or `novc`
- on hoodi `sas` node we have better memory there on main thread with
same mesh peers, same memory on network thread

<img width="851" height="511" alt="Screenshot 2025-12-12 at 11 05 27"
src="https://github.com/user-attachments/assets/8d7b2c2f-8213-4f89-87e0-437d016bc24a"
/>

- on mainnnet `sas` node, we have better memory on network thread, a
little bit worse on the main thread
<img width="854" height="504" alt="Screenshot 2025-12-12 at 11 08 42"
src="https://github.com/user-attachments/assets/7e638149-2dbe-4c7e-849c-ef78f6ff4d6f"
/>

- but for this mainnet node, the most interesting metric is `forward msg
avg peers`, we're faster than majority of them

<img width="1378" height="379" alt="Screenshot 2025-12-12 at 11 11 00"
src="https://github.com/user-attachments/assets/3ba5eeaa-5a11-4cad-adfa-1e0f68a81f16"
/>

---------

Co-authored-by: Tuyen Nguyen <twoeths@users.noreply.github.com>
This commit is contained in:
twoeths
2025-12-16 20:47:13 +07:00
committed by GitHub
parent 3bf4734ba9
commit d4a47659a5
4 changed files with 18 additions and 11 deletions

View File

@@ -2,7 +2,7 @@ import EventEmitter from "node:events";
import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp";
import {AsyncIterableEventBus, IteratorEvent, RequestEvent} from "../../util/asyncIterableToEvents.js";
import {StrictEventEmitterSingleArg} from "../../util/strictEvents.js";
import {EventDirection} from "../../util/workerEvents.js";
import {EventDirection} from "../events.js";
import {IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js";
export enum ReqRespBridgeEvent {

View File

@@ -3,7 +3,6 @@ import {PeerId, TopicValidatorResult} from "@libp2p/interface";
import {CustodyIndex, Status} from "@lodestar/types";
import {PeerIdStr} from "../util/peerId.js";
import {StrictEventEmitterSingleArg} from "../util/strictEvents.js";
import {EventDirection} from "../util/workerEvents.js";
import {PendingGossipsubMessage} from "./processor/types.js";
import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js";
@@ -38,6 +37,13 @@ export type NetworkEventData = {
};
};
export enum EventDirection {
workerToMain,
mainToWorker,
/** Event not emitted through worker boundary */
none,
}
export const networkEventDirection: Record<NetworkEvent, EventDirection> = {
[NetworkEvent.peerConnected]: EventDirection.workerToMain,
[NetworkEvent.peerDisconnected]: EventDirection.workerToMain,

View File

@@ -1,9 +1,11 @@
import {MessagePort, Worker} from "node:worker_threads";
import {Message} from "@libp2p/interface";
import {Thread} from "@chainsafe/threads";
import {Logger} from "@lodestar/logger";
import {sleep} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";
import {NetworkCoreWorkerMetrics} from "../network/core/metrics.js";
import {EventDirection, NetworkEvent} from "../network/events.js";
import {StrictEventEmitterSingleArg} from "./strictEvents.js";
const NANO_TO_SECOND_CONVERSION = 1e9;
@@ -15,13 +17,6 @@ export type WorkerBridgeEvent<EventData> = {
data: EventData[keyof EventData];
};
export enum EventDirection {
workerToMain,
mainToWorker,
/** Event not emitted through worker boundary */
none,
}
/**
* Bridges events from worker to main thread
* Each event can only have one direction:
@@ -63,7 +58,13 @@ export function wireEventsOnWorkerThread<EventData>(
posted: process.hrtime(),
data,
};
parentPort.postMessage(workerEvent);
let transferList: ArrayBuffer[] | undefined = undefined;
if (eventName === NetworkEvent.pendingGossipsubMessage) {
const payload = data as {msg: Message};
// Transfer the underlying ArrayBuffer to avoid copy for PendingGossipsubMessage
transferList = [payload.msg.data.buffer as ArrayBuffer];
}
parentPort.postMessage(workerEvent, transferList);
});
}
}

View File

@@ -9,6 +9,7 @@ import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js";
import {ReqRespBridgeEvent, ReqRespBridgeEventData} from "../../../../src/network/core/events.js";
import {NetworkWorkerApi} from "../../../../src/network/core/index.js";
import {
EventDirection,
GossipType,
NetworkEvent,
NetworkEventData,
@@ -18,7 +19,6 @@ import {
} from "../../../../src/network/index.js";
import {CommitteeSubscription} from "../../../../src/network/subnets/interface.js";
import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js";
import {EventDirection} from "../../../../src/util/workerEvents.js";
import {getValidPeerId, validPeerIdStr} from "../../../utils/peer.js";
import {EchoWorker, getEchoWorker} from "./workerEchoHandler.js";