fix: prune checkpoint states in processSlotsToNearestCheckpoint (#7497)

**Motivation**

- in holesky, there was a single block killing lodestar because we have
to process multiple epoch transitions at the same time, see
https://github.com/ChainSafe/lodestar/issues/7495#issuecomment-2680800898

**Description**

- for single epoch transition, let the next block prune it at the last
1/3 slot of next epoch
- if there are multiple epoch transitions, it means network is not
healthy so prune it right away or we'll get OOM
- also fix "Can only get block root in the past" in #7495

Closes #7495

**Steps to test or reproduce**

Test holesky

---------

Co-authored-by: Tuyen Nguyen <twoeths@users.noreply.github.com>
Co-authored-by: Cayman <caymannava@gmail.com>
This commit is contained in:
twoeths
2025-02-25 23:43:16 +07:00
committed by GitHub
parent b2ebe4921b
commit 2fc54906b3
3 changed files with 181 additions and 7 deletions

View File

@@ -11,7 +11,7 @@ import {
processSlots,
stateTransition,
} from "@lodestar/state-transition";
import {BeaconBlock, RootHex, SignedBeaconBlock, Slot, phase0} from "@lodestar/types";
import {BeaconBlock, RootHex, SignedBeaconBlock, Slot, phase0, ssz} from "@lodestar/types";
import {Logger, fromHex, toRootHex} from "@lodestar/utils";
import {IBeaconDb} from "../../db/index.js";
import {Metrics} from "../../metrics/index.js";
@@ -318,7 +318,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
* emitting "checkpoint" events after every epoch processed.
*/
async function processSlotsByCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
modules: {
checkpointStateCache: CheckpointStateCache;
metrics: Metrics | null;
emitter: ChainEventEmitter;
logger: Logger;
},
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
@@ -338,8 +343,13 @@ async function processSlotsByCheckpoint(
*
* Stops processing after no more full epochs can be processed.
*/
async function processSlotsToNearestCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
export async function processSlotsToNearestCheckpoint(
modules: {
checkpointStateCache: CheckpointStateCache;
metrics: Metrics | null;
emitter: ChainEventEmitter | null;
logger: Logger | null;
},
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
@@ -349,16 +359,23 @@ async function processSlotsToNearestCheckpoint(
const postSlot = slot;
const preEpoch = computeEpochAtSlot(preSlot);
let postState = preState;
const {checkpointStateCache, emitter, metrics} = modules;
const {checkpointStateCache, emitter, metrics, logger} = modules;
let count = 0;
for (
let nextEpochSlot = computeStartSlotAtEpoch(preEpoch + 1);
nextEpochSlot <= postSlot;
nextEpochSlot += SLOTS_PER_EPOCH
) {
logger?.verbose("Processing slots over epochs", {
slot: postState.slot,
nextEpochSlot,
postSlot,
caller: regenCaller,
});
// processSlots calls .clone() before mutating
postState = processSlots(postState, nextEpochSlot, opts, metrics);
modules.metrics?.epochTransitionByCaller.inc({caller: regenCaller});
metrics?.epochTransitionByCaller.inc({caller: regenCaller});
// this is usually added when we prepare for next slot or validate gossip block
// then when we process the 1st block of epoch, we don't have to do state transition again
@@ -368,7 +385,31 @@ async function processSlotsToNearestCheckpoint(
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
// consumers should not mutate or get the transfered cache
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));
emitter?.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));
if (count >= 1) {
// in normal condition, we only process 1 epoch so never reach this
// in that case, we want to prune state at the last 1/3 slot of slot 0 of the next epoch after importing the 1st block of epoch
// in non-finality time, we may process a lot of epochs so need to prune the cache to keep the node healthy
// this happened to holesky on Feb 2025, see https://github.com/ChainSafe/lodestar/issues/7495#issuecomment-2680800898
// cannot use getBlockRootAtSlot() because nextEpochSlot = postState
const latestBlockHex = toRootHex(cp.root);
try {
const persistCount = await checkpointStateCache.processState(latestBlockHex, checkpointState);
logger?.verbose("pruning checkpointStateCache during processSlotsToNearestCheckpoint", {
root: latestBlockHex,
epoch: cp.epoch,
persistCount,
});
} catch (e) {
logger?.debug(
"CheckpointStateCache failed to process checkpoint state",
{root: latestBlockHex, epoch: cp.epoch},
e as Error
);
}
}
count++;
// this avoids keeping our node busy processing blocks
await nextEventLoop();

View File

@@ -474,6 +474,12 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory);
for (const lowestEpoch of persistEpochs) {
if (state.slot < computeStartSlotAtEpoch(lowestEpoch)) {
// there is no checkpoint states of epochs newer than this state
// otherwise get "Can only get block root in the past" error from getBlockRootAtSlot() api below
// see https://github.com/ChainSafe/lodestar/issues/7495
break;
}
// usually there is only 0 or 1 epoch to persist in this loop
persistCount += await this.processPastEpoch(blockRootHex, state, lowestEpoch);
}

View File

@@ -0,0 +1,127 @@
import {SLOTS_PER_EPOCH, SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {beforeAll, beforeEach, describe, expect, it} from "vitest";
import {RegenCaller} from "../../../../src/chain/regen/interface.js";
import {processSlotsToNearestCheckpoint} from "../../../../src/chain/regen/regen.js";
import {FIFOBlockStateCache} from "../../../../src/chain/stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCache} from "../../../../src/chain/stateCache/persistentCheckpointsCache.js";
import {getTestDatastore} from "../../../utils/chain/stateCache/datastore.js";
import {testLogger} from "../../../utils/logger.js";
import {generateCachedState} from "../../../utils/state.js";
describe("regen", () => {
//
// epoch: 19 20 21 22 23
// |-----------|-----------|-----------|-----------|
// ^^ ^
// || |
// |0b--------root1
// |
// 0a
describe("processSlotsToNearestCheckpoint", () => {
const fileApisBuffer = new Map();
const datastore = getTestDatastore(fileApisBuffer);
const root0a = Buffer.alloc(32);
const root0b = Buffer.alloc(32, 1);
const root1 = Buffer.alloc(32, 2);
// root0a is of the last slot of epoch 19
const cp0a = {epoch: 20, root: root0a};
// root0b is of the first slot of epoch 20
const cp0b = {epoch: 20, root: root0b};
const cp1 = {epoch: 21, root: root1};
const startSlotEpoch20 = computeStartSlotAtEpoch(20);
const startSlotEpoch21 = computeStartSlotAtEpoch(21);
const startSlotEpoch22 = computeStartSlotAtEpoch(22);
const allStates = [cp0a, cp0b, cp1]
.map((cp) => generateCachedState({slot: cp.epoch * SLOTS_PER_EPOCH}))
.map((state, i) => {
const stateEpoch = computeEpochAtSlot(state.slot);
if (stateEpoch === 20 && i === 0) {
// cp0a
state.blockRoots.set((startSlotEpoch20 - 1) % SLOTS_PER_HISTORICAL_ROOT, root0a);
state.blockRoots.set(startSlotEpoch20 % SLOTS_PER_HISTORICAL_ROOT, root0a);
return state;
}
// other states based on cp0b
state.blockRoots.set((startSlotEpoch20 - 1) % SLOTS_PER_HISTORICAL_ROOT, root0a);
state.blockRoots.set(startSlotEpoch20 % SLOTS_PER_HISTORICAL_ROOT, root0b);
if (stateEpoch >= 21) {
state.blockRoots.set(startSlotEpoch21 % SLOTS_PER_HISTORICAL_ROOT, root1);
}
// if (stateEpoch >= 22) {
// state.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root2);
// }
return state;
});
const states = {
// Previous Root Checkpoint State of epoch 20
cp0a: allStates[0],
// Current Root Checkpoint State of epoch 20
cp0b: allStates[1],
// Current Root Checkpoint State of epoch 21
cp1: allStates[2],
// // Current Root Checkpoint State of epoch 22
// cp2: allStates[3],
};
let cache: PersistentCheckpointStateCache;
beforeEach(() => {
cache = new PersistentCheckpointStateCache(
{
datastore,
logger: testLogger(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
cache.add(cp1, states["cp1"]);
});
/**
* PreState is root1 at epoch 21 and dial to epoch 22
* There are single epoch transitions so it'd not prune/persist states
*/
it("should not prune checkpoint states when processing a single epoch transition", async () => {
// no state is persisted at the beginning
expect(fileApisBuffer.size).toEqual(0);
const modules = {checkpointStateCache: cache, metrics: null, emitter: null, logger: null};
const preState = states["cp1"];
await processSlotsToNearestCheckpoint(modules, preState, startSlotEpoch22, RegenCaller.processBlocksInEpoch, {
dontTransferCache: true,
});
// even through there are 3 epochs in memory, no state is pruned/persisted
// it'll do it at the last 1/3 slot of epoch 22
expect(fileApisBuffer.size).toEqual(0);
});
/**
* PreState is 0b at epoch 20 and dial to epoch 22
* There are multiple epoch transitions so it'd prune/persist states if needed
* There are 3 epochs in memory but maxCPStateEpochsInMemory = 2 so there should be 1 persisted
*/
it("should prune checkpoint states when processing multiple epoch transitions", async () => {
// no state is persisted at the beginning
expect(fileApisBuffer.size).toEqual(0);
const modules = {checkpointStateCache: cache, metrics: null, emitter: null, logger: null};
const preState = states["cp0b"];
await processSlotsToNearestCheckpoint(modules, preState, startSlotEpoch22, RegenCaller.processBlocksInEpoch, {
dontTransferCache: true,
});
// there are 2 epoch transitions, so the checkpoint states of epoch 20 should be pruned
expect(fileApisBuffer.size).toEqual(1);
});
});
});