test: fix few e2e flaky tests (#7762)

**Motivation**

Make our tests more stable on CI. 

**Description**

Fix following flaky tests 

- `packages/beacon-node/test/e2e/network/reqresp.test.ts` 
- [AxiosError: Request failed with status code
429](https://github.com/ChainSafe/lodestar/actions/runs/14731164867/job/41349996153?pr=7762#step:6:10755)


Partially Closes #6358

**Steps to test or reproduce**

- Run all tests on CI
This commit is contained in:
Nazar Hussain
2025-05-07 14:20:08 +02:00
committed by GitHub
parent 0f6a8b1a77
commit 0eb947d0d0
8 changed files with 90 additions and 36 deletions

View File

@@ -19,6 +19,10 @@ export const e2eProject = defineProject({
singleFork: true,
},
},
sequence: {
concurrent: false,
shuffle: false,
},
},
});
@@ -41,5 +45,9 @@ export const e2eMainnetProject = defineProject({
singleFork: true,
},
},
sequence: {
concurrent: false,
shuffle: false,
},
},
});

View File

@@ -84,7 +84,7 @@ export async function createNodeJsLibp2p(
},
}),
],
streamMuxers: [mplex({maxInboundStreams: 256})],
streamMuxers: [mplex({maxInboundStreams: 256, disconnectThreshold: networkOpts.disconnectThreshold})],
peerDiscovery,
metrics: nodeJsLibp2pOpts.metrics
? prometheusMetrics({

View File

@@ -221,10 +221,10 @@ export class Network implements INetwork {
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
await this.core.close();
this.logger.debug("network core closed");
// Used only for sleep() statements
this.controller.abort();
this.logger.debug("network core closed");
}
async scrapeMetrics(): Promise<string> {

View File

@@ -23,6 +23,25 @@ export interface NetworkOptions
useWorker?: boolean;
maxYoungGenerationSizeMb?: number;
disableLightClientServer?: boolean;
/**
* During E2E tests observe a lot of following `missing stream`:
*
* > libp2p:mplex receiver stream with id 2 and protocol /eth2/beacon_chain/req/metadata/2/ssz_snappy ended
* > libp2p:mplex initiator stream with id 4 and protocol /eth2/beacon_chain/req/metadata/2/ssz_snappy ended
* > libp2p:mplex initiator stream with id 2 and protocol /eth2/beacon_chain/req/metadata/2/ssz_snappy ended
* > libp2p:mplex missing stream 2 for message type CLOSE_INITIATOR
* > libp2p:mplex missing stream 2 for message type CLOSE_RECEIVER
* > libp2p:mplex missing stream 4 for message type CLOSE_INITIATOR
*
* which results in following rate-limit error and cause the connection to close and fail the e2e tests
* > libp2p:mplex rate limit hit when receiving messages for streams that do not exist - closing remote connection
* > libp2p:mplex:stream:initiator:3 abort with error Error: Too many messages for missing streams
*
* The default value for `disconnectThreshold` in libp2p is set to `5`.
* We need to increase this only for the testing purpose
*/
disconnectThreshold?: number;
}
export const defaultNetworkOptions: NetworkOptions = {

View File

@@ -3,7 +3,7 @@ import {chainConfig} from "@lodestar/config/default";
import {ForkName} from "@lodestar/params";
import {RequestError, RequestErrorCode, ResponseOutgoing} from "@lodestar/reqresp";
import {Root, SignedBeaconBlock, altair, phase0, ssz} from "@lodestar/types";
import {sleep as _sleep} from "@lodestar/utils";
import {sleep} from "@lodestar/utils";
import {afterEach, beforeEach, describe, expect, it, vi} from "vitest";
import {Network, ReqRespBeaconNodeOpts} from "../../../src/network/index.js";
import {GetReqRespHandlerFn, ReqRespMethod} from "../../../src/network/reqresp/types.js";
@@ -33,8 +33,14 @@ function runTests({useWorker}: {useWorker: boolean}): void {
...chainConfig,
ALTAIR_FORK_EPOCH: 0,
});
let controller: AbortController;
const afterEachCallbacks: (() => Promise<void> | void)[] = [];
beforeEach(() => {
controller = new AbortController();
});
afterEach(async () => {
while (afterEachCallbacks.length > 0) {
const callback = afterEachCallbacks.pop();
@@ -42,15 +48,6 @@ function runTests({useWorker}: {useWorker: boolean}): void {
}
});
let controller: AbortController;
beforeEach(() => {
controller = new AbortController();
});
afterEach(() => controller.abort());
async function sleep(ms: number): Promise<void> {
await _sleep(ms, controller.signal);
}
async function createAndConnectPeers(
getReqRespHandler?: GetReqRespHandlerFn,
opts?: ReqRespBeaconNodeOpts
@@ -68,11 +65,15 @@ function runTests({useWorker}: {useWorker: boolean}): void {
await closeA();
await closeB();
});
const connected = Promise.all([onPeerConnect(netA), onPeerConnect(netB)]);
await connect(netA, netB);
await connect(netA, netB, controller.signal);
await connected;
controller.signal.addEventListener("abort", async () => {
await closeA();
await closeB();
});
return [netA, netB, await getPeerIdOf(netA), await getPeerIdOf(netB)];
}
@@ -242,7 +243,7 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});
it("trigger a TTFB_TIMEOUT error", async () => {
it("should trigger TTFB_TIMEOUT error if first response is delayed", async () => {
const ttfbTimeoutMs = 250;
const [netA, _, _0, peerIdB] = await createAndConnectPeers(
@@ -250,7 +251,7 @@ function runTests({useWorker}: {useWorker: boolean}): void {
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
// Wait for too long before sending first response chunk
await sleep(ttfbTimeoutMs * 10);
await sleep(ttfbTimeoutMs * 10, controller.signal);
yield wrapBlockAsEncodedPayload(config, config.getForkTypes(0).SignedBeaconBlock.defaultValue());
}
},
@@ -263,8 +264,9 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});
it("trigger a RESP_TIMEOUT error", async () => {
const respTimeoutMs = 250;
it("should trigger a RESP_TIMEOUT error if first byte is on time but later delayed", async () => {
const ttfbTimeoutMs = 250;
const respTimeoutMs = 300;
const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
@@ -272,11 +274,11 @@ function runTests({useWorker}: {useWorker: boolean}): void {
if (method === ReqRespMethod.BeaconBlocksByRange) {
yield getEmptyEncodedPayloadSignedBeaconBlock(config);
// Wait for too long before sending second response chunk
await sleep(respTimeoutMs * 5);
await sleep(respTimeoutMs * 5, controller.signal);
yield getEmptyEncodedPayloadSignedBeaconBlock(config);
}
},
{respTimeoutMs}
{ttfbTimeoutMs, respTimeoutMs}
);
await expectRejectedWithLodestarError(
@@ -285,16 +287,19 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});
it("Sleep infinite on first byte", async () => {
it("should trigger TTFB_TIMEOUT error if respTimeoutMs and ttfbTimeoutMs is the same", async () => {
const ttfbTimeoutMs = 250;
const respTimeoutMs = 250;
const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
// biome-ignore lint/correctness/useYield: No need for yield in test context
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
await sleep(100000000);
await sleep(100000000, controller.signal);
}
},
{respTimeoutMs: 250, ttfbTimeoutMs: 250}
{respTimeoutMs, ttfbTimeoutMs}
);
await expectRejectedWithLodestarError(
@@ -303,13 +308,13 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});
it("Sleep infinite on second response chunk", async () => {
it("should trigger a RESP_TIMEOUT error if first byte is on time but sleep infinite", async () => {
const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
yield getEmptyEncodedPayloadSignedBeaconBlock(config);
await sleep(100000000);
await sleep(100000000, controller.signal);
}
},
{respTimeoutMs: 250, ttfbTimeoutMs: 250}

View File

@@ -3,6 +3,7 @@ import {generateKeyPair} from "@libp2p/crypto/keys";
import {PrivateKey} from "@libp2p/interface";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {SubnetID} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {INetwork, Network, NetworkEvent} from "../../src/network/index.js";
import {Libp2p} from "../../src/network/interface.js";
import {createNodeJsLibp2p} from "../../src/network/libp2p/index.js";
@@ -35,9 +36,13 @@ type INetworkDebug = Pick<INetwork, "connectToPeer" | "disconnectPeer" | "getNet
// Helpers to manipulate network's libp2p instance for testing only
export async function connect(netDial: INetworkDebug, netServer: INetworkDebug): Promise<void> {
export async function connect(netDial: INetworkDebug, netServer: INetworkDebug, signal?: AbortSignal): Promise<void> {
const netServerId = await netServer.getNetworkIdentity();
await netDial.connectToPeer(netServerId.peerId, netServerId.p2pAddresses);
// We see a lot of "Muxer already closed" in e2e tests on CI
// This is a way to give a grace period for connections to open and exchange metadata
await sleep(50, signal);
}
export async function disconnect(network: INetworkDebug, peer: string): Promise<void> {

View File

@@ -1,6 +1,7 @@
import {generateKeyPair} from "@libp2p/crypto/keys";
import {ChainForkConfig, createBeaconConfig} from "@lodestar/config";
import {ssz} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {BeaconChain} from "../../src/chain/chain.js";
import {Eth1ForBlockProductionDisabled} from "../../src/eth1/index.js";
import {ExecutionEngineDisabled} from "../../src/execution/index.js";
@@ -89,15 +90,18 @@ export async function getNetworkForTest(
privateKey: await generateKeyPair("secp256k1"),
opts: {
...defaultNetworkOptions,
maxPeers: 1,
maxPeers: 10,
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: ["/ip4/127.0.0.1/tcp/0"],
localMultiaddrs: ["/ip4/0.0.0.0/tcp/0"],
discv5FirstQueryDelayMs: 0,
discv5: null,
skipParamsLog: true,
// Disable rate limiting
rateLimitMultiplier: 0,
// Increase of following value is just to circumvent the following error in e2e tests
// > libp2p:mplex rate limit hit when receiving messages
disconnectThreshold: 255,
...opts.opts,
},
logger,
@@ -108,6 +112,14 @@ export async function getNetworkForTest(
async function closeAll() {
await network.close();
await chain.close();
/**
* We choose random port for the libp2p network. Though our libp2p instance is closed the
* system still hold the port momentarily. And if next test randomly select the same port
* it failed with ERR_CONNECTION_REFUSED. To avoid such situation giving a grace period
* for the system to also cleanup resources.
*/
await sleep(100);
},
];
}

View File

@@ -52,14 +52,19 @@ function assertCorrectPreset(localPreset: BeaconPreset, remotePreset: BeaconPres
}
async function downloadRemoteConfig(preset: "mainnet" | "minimal", commit: string): Promise<BeaconPreset> {
const downloadedParams = await Promise.all(
Object.values(ForkName).map((forkName) =>
axios({
url: `https://raw.githubusercontent.com/ethereum/consensus-specs/${commit}/presets/${preset}/${forkName}.yaml`,
timeout: 30 * 1000,
}).then((response) => loadConfigYaml(response.data))
)
);
const downloadedParams: Record<string, unknown>[] = [];
for (const forkName of Object.values(ForkName)) {
const response = await axios({
url: `https://raw.githubusercontent.com/ethereum/consensus-specs/${commit}/presets/${preset}/${forkName}.yaml`,
timeout: 30 * 1000,
});
downloadedParams.push(loadConfigYaml(response.data));
// We get error `Request failed with status code 429`
// which is `Too Many Request` so we added a bit delay between each request
await new Promise((resolve) => setTimeout(resolve, 200));
}
// Merge all the fetched yamls for the different forks
const beaconPresetRaw: Record<string, unknown> = Object.assign(