mirror of
https://github.com/ChainSafe/lodestar.git
synced 2026-01-09 15:48:08 -05:00
fix: self rate limiter to handle dead requests (#8295)
**Motivation** - node slow to sync due to dead requests tracked in self rate limiter **Description** - track request ids in self rate limiter - if it's > 30s, it's considered dead and we remove Closes #8263 part of #8256 **Test** - was able to sync fusaka-devnet-3 with no selft rate limited errors <img width="1677" height="645" alt="Screenshot 2025-08-29 at 20 38 45" src="https://github.com/user-attachments/assets/2388639e-7232-4941-a1bf-4b9ecac55a58" /> --------- Co-authored-by: Tuyen Nguyen <twoeths@users.noreply.github.com>
This commit is contained in:
@@ -164,6 +164,8 @@ export class ReqResp {
|
||||
|
||||
const protocols: (MixedProtocol | DialOnlyProtocol)[] = [];
|
||||
const protocolIDs: string[] = [];
|
||||
// don't increase this.reqCount until we know request will be sent
|
||||
const requestId = this.reqCount + 1;
|
||||
|
||||
for (const version of versions) {
|
||||
const protocolID = this.formatProtocolID({method, version, encoding});
|
||||
@@ -172,7 +174,7 @@ export class ReqResp {
|
||||
throw Error(`Request to send to protocol ${protocolID} but it has not been declared`);
|
||||
}
|
||||
|
||||
if (!this.selfRateLimiter.allows(peerIdStr, protocolID)) {
|
||||
if (!this.selfRateLimiter.allows(peerIdStr, protocolID, requestId)) {
|
||||
// we technically don't send request in this case but would be nice just to track this in the same `outgoingErrorReasons` metric
|
||||
this.metrics?.outgoingErrorReasons.inc({reason: RequestErrorCode.REQUEST_SELF_RATE_LIMITED});
|
||||
throw new RequestError({code: RequestErrorCode.REQUEST_SELF_RATE_LIMITED});
|
||||
@@ -183,6 +185,9 @@ export class ReqResp {
|
||||
protocolIDs.push(protocolID);
|
||||
}
|
||||
|
||||
// requestId is now the same to reqCount
|
||||
this.reqCount++;
|
||||
|
||||
try {
|
||||
yield* sendRequest(
|
||||
{logger: this.logger, libp2p: this.libp2p, metrics: this.metrics, peerClient},
|
||||
@@ -192,7 +197,7 @@ export class ReqResp {
|
||||
body,
|
||||
this.controller.signal,
|
||||
this.opts,
|
||||
this.reqCount++
|
||||
requestId
|
||||
);
|
||||
} catch (e) {
|
||||
this.metrics?.outgoingErrors.inc({method});
|
||||
@@ -209,7 +214,7 @@ export class ReqResp {
|
||||
throw e;
|
||||
} finally {
|
||||
for (const protocolID of protocolIDs) {
|
||||
this.selfRateLimiter.requestCompleted(peerIdStr, protocolID);
|
||||
this.selfRateLimiter.requestCompleted(peerIdStr, protocolID, requestId);
|
||||
}
|
||||
timer?.();
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import {MapDef} from "@lodestar/utils";
|
||||
import {Logger, MapDef} from "@lodestar/utils";
|
||||
|
||||
type PeerIdStr = string;
|
||||
type ProtocolID = string;
|
||||
@@ -11,12 +11,22 @@ export const CHECK_DISCONNECTED_PEERS_INTERVAL_MS = 2 * 60 * 1000;
|
||||
/** Given PING_INTERVAL constants of 15s/20s, we consider a peer is disconnected if there is no request in 1 minute */
|
||||
const DISCONNECTED_TIMEOUT_MS = 60 * 1000;
|
||||
|
||||
/**
|
||||
* Timeout to consider a request is no longer in progress
|
||||
* this is to cover the case where `requestCompleted()` is not called due to unexpected errors
|
||||
* for example https://github.com/ChainSafe/lodestar/issues/8256
|
||||
**/
|
||||
export const REQUEST_TIMEOUT_MS = 30 * 1000;
|
||||
|
||||
type RequestId = number;
|
||||
type RequestIdMs = number;
|
||||
|
||||
/**
|
||||
* Simple rate limiter that allows a maximum of 2 concurrent requests per protocol per peer.
|
||||
* The consumer should either prevent requests from being sent when the limit is reached or handle the case when the request is not allowed.
|
||||
*/
|
||||
export class SelfRateLimiter {
|
||||
private readonly rateLimitersPerPeer: MapDef<PeerIdStr, MapDef<ProtocolID, number>>;
|
||||
private readonly rateLimitersPerPeer: MapDef<PeerIdStr, MapDef<ProtocolID, Map<RequestId, RequestIdMs>>>;
|
||||
/**
|
||||
* It's not convenient to handle a peer disconnected event so we track the last seen requests by peer.
|
||||
* This is the same design to `ReqRespRateLimiter`.
|
||||
@@ -25,9 +35,9 @@ export class SelfRateLimiter {
|
||||
/** Interval to check lastSeenMessagesByPeer */
|
||||
private cleanupInterval: NodeJS.Timeout | undefined = undefined;
|
||||
|
||||
constructor() {
|
||||
this.rateLimitersPerPeer = new MapDef<PeerIdStr, MapDef<ProtocolID, number>>(
|
||||
() => new MapDef<ProtocolID, number>(() => 0)
|
||||
constructor(private readonly logger?: Logger) {
|
||||
this.rateLimitersPerPeer = new MapDef<PeerIdStr, MapDef<ProtocolID, Map<RequestId, RequestIdMs>>>(
|
||||
() => new MapDef<ProtocolID, Map<RequestId, RequestIdMs>>(() => new Map())
|
||||
);
|
||||
this.lastSeenRequestsByPeer = new Map();
|
||||
}
|
||||
@@ -46,15 +56,33 @@ export class SelfRateLimiter {
|
||||
/**
|
||||
* called before we send a request to a peer.
|
||||
*/
|
||||
allows(peerId: PeerIdStr, protocolId: ProtocolID): boolean {
|
||||
allows(peerId: PeerIdStr, protocolId: ProtocolID, requestId: RequestId): boolean {
|
||||
const now = Date.now();
|
||||
const peerRateLimiter = this.rateLimitersPerPeer.getOrDefault(peerId);
|
||||
const inProgressRequests = peerRateLimiter.getOrDefault(protocolId);
|
||||
this.lastSeenRequestsByPeer.set(peerId, Date.now());
|
||||
const trackedRequests = peerRateLimiter.getOrDefault(protocolId);
|
||||
this.lastSeenRequestsByPeer.set(peerId, now);
|
||||
|
||||
let inProgressRequests = 0;
|
||||
for (const [trackedRequestId, trackedRequestTimeMs] of trackedRequests.entries()) {
|
||||
if (trackedRequestTimeMs + REQUEST_TIMEOUT_MS <= now) {
|
||||
// request timed out, remove it
|
||||
trackedRequests.delete(trackedRequestId);
|
||||
this.logger?.debug("SelfRateLimiter: request timed out, removing it", {
|
||||
requestId: trackedRequestId,
|
||||
requestTime: trackedRequestTimeMs,
|
||||
peerId,
|
||||
protocolId,
|
||||
});
|
||||
} else {
|
||||
inProgressRequests++;
|
||||
}
|
||||
}
|
||||
|
||||
if (inProgressRequests >= MAX_CONCURRENT_REQUESTS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
peerRateLimiter.set(protocolId, inProgressRequests + 1);
|
||||
trackedRequests.set(requestId, now);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -62,10 +90,10 @@ export class SelfRateLimiter {
|
||||
* called when a request to a peer is completed, regardless of success or failure.
|
||||
* This should NOT be called when the request was not allowed
|
||||
*/
|
||||
requestCompleted(peerId: PeerIdStr, protocolId: ProtocolID): void {
|
||||
requestCompleted(peerId: PeerIdStr, protocolId: ProtocolID, requestId: RequestId): void {
|
||||
const peerRateLimiter = this.rateLimitersPerPeer.getOrDefault(peerId);
|
||||
const inProgressRequests = peerRateLimiter.getOrDefault(protocolId);
|
||||
peerRateLimiter.set(protocolId, Math.max(0, inProgressRequests - 1));
|
||||
const trackedRequests = peerRateLimiter.getOrDefault(protocolId);
|
||||
trackedRequests.delete(requestId);
|
||||
}
|
||||
|
||||
getPeerCount(): number {
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import {afterEach, beforeEach, describe, expect, it, vi} from "vitest";
|
||||
import {CHECK_DISCONNECTED_PEERS_INTERVAL_MS, SelfRateLimiter} from "../../../src/rate_limiter/selfRateLimiter.js";
|
||||
import {
|
||||
CHECK_DISCONNECTED_PEERS_INTERVAL_MS,
|
||||
REQUEST_TIMEOUT_MS,
|
||||
SelfRateLimiter,
|
||||
} from "../../../src/rate_limiter/selfRateLimiter.js";
|
||||
|
||||
describe("SelfRateLimiter", () => {
|
||||
let selfRateLimiter: SelfRateLimiter;
|
||||
@@ -16,30 +20,39 @@ describe("SelfRateLimiter", () => {
|
||||
});
|
||||
|
||||
it("allows requests under the limit", () => {
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1")).toBe(true);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1")).toBe(true);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 1)).toBe(true);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 2)).toBe(true);
|
||||
});
|
||||
|
||||
it("blocks requests over the limit", () => {
|
||||
selfRateLimiter.allows("peer1", "protocol1");
|
||||
selfRateLimiter.allows("peer1", "protocol1");
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1")).toBe(false);
|
||||
selfRateLimiter.allows("peer1", "protocol1", 1);
|
||||
selfRateLimiter.allows("peer1", "protocol1", 2);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 3)).toBe(false);
|
||||
// but allows a different protocol
|
||||
expect(selfRateLimiter.allows("peer1", "protocol2")).toBe(true);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol2", 4)).toBe(true);
|
||||
// allows a different peer
|
||||
expect(selfRateLimiter.allows("peer2", "protocol1")).toBe(true);
|
||||
expect(selfRateLimiter.allows("peer2", "protocol1", 5)).toBe(true);
|
||||
|
||||
// allow after request completed
|
||||
selfRateLimiter.requestCompleted("peer1", "protocol1");
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1")).toBe(true);
|
||||
selfRateLimiter.requestCompleted("peer1", "protocol1", 1);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 6)).toBe(true);
|
||||
});
|
||||
|
||||
it("should timeout requests after REQUEST_TIMEOUT_MS", () => {
|
||||
selfRateLimiter.allows("peer1", "protocol1", 1);
|
||||
selfRateLimiter.allows("peer1", "protocol1", 2);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 3)).toBe(false);
|
||||
|
||||
vi.advanceTimersByTime(REQUEST_TIMEOUT_MS);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 4)).toBe(true);
|
||||
});
|
||||
|
||||
it("should remove disconnected peers after interval", () => {
|
||||
selfRateLimiter.allows("peer1", "protocol1");
|
||||
selfRateLimiter.allows("peer1", "protocol1");
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1")).toBe(false);
|
||||
selfRateLimiter.allows("peer1", "protocol1", 1);
|
||||
selfRateLimiter.allows("peer1", "protocol1", 2);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 3)).toBe(false);
|
||||
|
||||
vi.advanceTimersByTime(CHECK_DISCONNECTED_PEERS_INTERVAL_MS + 1);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1")).toBe(true);
|
||||
expect(selfRateLimiter.allows("peer1", "protocol1", 4)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user