mirror of
https://github.com/ChainSafe/lodestar.git
synced 2026-01-09 15:48:08 -05:00
fix: avoid BeaconState commit() clone() in beacon-node
This commit is contained in:
@@ -2,7 +2,7 @@ import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
|
||||
import {routes} from "@lodestar/api";
|
||||
import {CheckpointWithHex, IForkChoice} from "@lodestar/fork-choice";
|
||||
import {GENESIS_SLOT} from "@lodestar/params";
|
||||
import {BeaconStateAllForks} from "@lodestar/state-transition";
|
||||
import {BeaconStateAllForks, CachedBeaconStateAllForks} from "@lodestar/state-transition";
|
||||
import {BLSPubkey, Epoch, RootHex, Slot, ValidatorIndex, getValidatorStatus, phase0} from "@lodestar/types";
|
||||
import {fromHex} from "@lodestar/utils";
|
||||
import {IBeaconChain} from "../../../../chain/index.js";
|
||||
@@ -41,30 +41,10 @@ export function resolveStateId(
|
||||
return blockSlot;
|
||||
}
|
||||
|
||||
export async function getStateResponse(
|
||||
chain: IBeaconChain,
|
||||
inStateId: routes.beacon.StateId
|
||||
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean}> {
|
||||
const stateId = resolveStateId(chain.forkChoice, inStateId);
|
||||
|
||||
const res =
|
||||
typeof stateId === "string"
|
||||
? await chain.getStateByStateRoot(stateId)
|
||||
: typeof stateId === "number"
|
||||
? await chain.getStateBySlot(stateId)
|
||||
: chain.getStateByCheckpoint(stateId);
|
||||
|
||||
if (!res) {
|
||||
throw new ApiError(404, `State not found for id '${inStateId}'`);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
export async function getStateResponseWithRegen(
|
||||
chain: IBeaconChain,
|
||||
inStateId: routes.beacon.StateId
|
||||
): Promise<{state: BeaconStateAllForks | Uint8Array; executionOptimistic: boolean; finalized: boolean}> {
|
||||
): Promise<{state: CachedBeaconStateAllForks | Uint8Array; executionOptimistic: boolean; finalized: boolean}> {
|
||||
const stateId = resolveStateId(chain.forkChoice, inStateId);
|
||||
|
||||
const res =
|
||||
|
||||
@@ -198,7 +198,7 @@ export function getLodestarApi({
|
||||
const {state, executionOptimistic, finalized} = await getStateResponseWithRegen(chain, stateId);
|
||||
|
||||
const stateView = (
|
||||
state instanceof Uint8Array ? loadState(config, chain.getHeadState(), state).state : state.clone()
|
||||
state instanceof Uint8Array ? loadState(config, chain.getHeadState(), state).state : state
|
||||
) as BeaconStateCapella;
|
||||
|
||||
const fork = config.getForkName(stateView.slot);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import {CompactMultiProof, ProofType, createProof} from "@chainsafe/persistent-merkle-tree";
|
||||
import {routes} from "@lodestar/api";
|
||||
import {ApplicationMethods} from "@lodestar/api/server";
|
||||
import {loadState} from "@lodestar/state-transition";
|
||||
import {ApiOptions} from "../../options.js";
|
||||
import {getBlockResponse} from "../beacon/blocks/utils.js";
|
||||
import {getStateResponseWithRegen} from "../beacon/state/utils.js";
|
||||
@@ -24,11 +23,14 @@ export function getProofApi(
|
||||
|
||||
const res = await getStateResponseWithRegen(chain, stateId);
|
||||
|
||||
const state =
|
||||
res.state instanceof Uint8Array ? loadState(config, chain.getHeadState(), res.state).state : res.state;
|
||||
if (res.state instanceof Uint8Array) {
|
||||
// this should not happen because stateId is a string, check `getStateResponseWithRegen` implementation
|
||||
throw new Error(`State for stateId ${stateId} is not available for proof generation.`);
|
||||
}
|
||||
|
||||
// Commit any changes before computing the state root. In normal cases the state should have no changes here
|
||||
state.commit();
|
||||
const state = res.state;
|
||||
|
||||
// there should be no state changes in beacon-node so no need to commit() here
|
||||
const stateNode = state.node;
|
||||
|
||||
const proof = createProof(stateNode, {type: ProofType.compactMulti, descriptor});
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
|
||||
import {routes} from "@lodestar/api";
|
||||
import {ApplicationMethods} from "@lodestar/api/server";
|
||||
import {ExecutionStatus} from "@lodestar/fork-choice";
|
||||
@@ -24,10 +23,8 @@ import {
|
||||
computeEpochAtSlot,
|
||||
computeStartSlotAtEpoch,
|
||||
computeTimeAtSlot,
|
||||
createCachedBeaconState,
|
||||
getBlockRootAtSlot,
|
||||
getCurrentSlot,
|
||||
loadState,
|
||||
proposerShufflingDecisionRoot,
|
||||
} from "@lodestar/state-transition";
|
||||
import {
|
||||
@@ -1039,21 +1036,12 @@ export function getValidatorApi(
|
||||
} else {
|
||||
const res = await getStateResponseWithRegen(chain, startSlot);
|
||||
|
||||
const stateViewDU =
|
||||
res.state instanceof Uint8Array
|
||||
? loadState(config, chain.getHeadState(), res.state).state
|
||||
: res.state.clone();
|
||||
if (res.state instanceof Uint8Array) {
|
||||
// should not happen because we query a startSlot as a number in getStateResponseWithRegen() api
|
||||
throw new Error(`Cannot load CachedBeaconState for slot ${startSlot} for proposer duties`);
|
||||
}
|
||||
|
||||
state = createCachedBeaconState(
|
||||
stateViewDU,
|
||||
{
|
||||
config: chain.config,
|
||||
// Not required to compute proposers
|
||||
pubkey2index: new PubkeyIndexMap(),
|
||||
index2pubkey: [],
|
||||
},
|
||||
{skipSyncPubkeys: true, skipSyncCommitteeCache: true}
|
||||
);
|
||||
state = res.state;
|
||||
|
||||
if (state.epochCtx.epoch !== epoch) {
|
||||
throw Error(`Loaded state epoch ${state.epochCtx.epoch} does not match requested epoch ${epoch}`);
|
||||
|
||||
@@ -423,8 +423,8 @@ export async function importBlock(
|
||||
const checkpointState = postState;
|
||||
const cp = getCheckpointFromState(checkpointState);
|
||||
this.regen.addCheckpointState(cp, checkpointState);
|
||||
// consumers should not mutate or get the transfered cache
|
||||
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));
|
||||
// consumers should not mutate state ever
|
||||
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
|
||||
|
||||
// Note: in-lined code from previos handler of ChainEvent.checkpoint
|
||||
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
|
||||
|
||||
@@ -59,6 +59,7 @@ export async function verifyBlocksStateTransitionOnly(
|
||||
// if block is trusted don't verify proposer or op signature
|
||||
verifyProposer: !useBlsBatchVerify && !validSignatures && !validProposerSignature,
|
||||
verifySignatures: !useBlsBatchVerify && !validSignatures,
|
||||
dontTransferCache: false,
|
||||
},
|
||||
{metrics, validatorMonitor}
|
||||
);
|
||||
|
||||
@@ -504,7 +504,7 @@ export class BeaconChain implements IBeaconChain {
|
||||
async getStateBySlot(
|
||||
slot: Slot,
|
||||
opts?: StateGetOpts
|
||||
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null> {
|
||||
): Promise<{state: CachedBeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null> {
|
||||
const finalizedBlock = this.forkChoice.getFinalizedBlock();
|
||||
|
||||
if (slot < finalizedBlock.slot) {
|
||||
@@ -559,7 +559,7 @@ export class BeaconChain implements IBeaconChain {
|
||||
async getStateByStateRoot(
|
||||
stateRoot: RootHex,
|
||||
opts?: StateGetOpts
|
||||
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null> {
|
||||
): Promise<{state: CachedBeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null> {
|
||||
if (opts?.allowRegen) {
|
||||
const state = await this.regen.getState(stateRoot, RegenCaller.restApi);
|
||||
const block = this.forkChoice.getBlock(state.latestBlockHeader.hashTreeRoot());
|
||||
@@ -587,8 +587,11 @@ export class BeaconChain implements IBeaconChain {
|
||||
};
|
||||
}
|
||||
|
||||
const data = await this.db.stateArchive.getByRoot(fromHex(stateRoot));
|
||||
return data && {state: data, executionOptimistic: false, finalized: true};
|
||||
// we can do a `this.db.stateArchive.getByRoot()` here, but
|
||||
// returning 1 per 100s of states that are persisted in the archive state is not useful enough
|
||||
// and it causes consumers having to loadState and createCachedBeaconState again
|
||||
// if state is finalized, consumers need to use getHistoricalStateBySlot() api instead
|
||||
return null;
|
||||
}
|
||||
|
||||
async getPersistedCheckpointState(checkpoint?: phase0.Checkpoint): Promise<Uint8Array | null> {
|
||||
@@ -1297,7 +1300,7 @@ export class BeaconChain implements IBeaconChain {
|
||||
|
||||
const postState = this.regen.getStateSync(toRootHex(block.stateRoot)) ?? undefined;
|
||||
|
||||
return computeBlockRewards(this.config, block, preState.clone(), postState?.clone());
|
||||
return computeBlockRewards(this.config, block, preState, postState);
|
||||
}
|
||||
|
||||
async getAttestationsRewards(
|
||||
@@ -1338,6 +1341,6 @@ export class BeaconChain implements IBeaconChain {
|
||||
|
||||
preState = processSlots(preState, block.slot); // Dial preState's slot to block.slot
|
||||
|
||||
return computeSyncCommitteeRewards(this.config, this.index2pubkey, block, preState.clone(), validatorIds);
|
||||
return computeSyncCommitteeRewards(this.config, this.index2pubkey, block, preState, validatorIds);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,12 +168,12 @@ export interface IBeaconChain {
|
||||
getStateBySlot(
|
||||
slot: Slot,
|
||||
opts?: StateGetOpts
|
||||
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null>;
|
||||
): Promise<{state: CachedBeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null>;
|
||||
/** Returns a local state by state root */
|
||||
getStateByStateRoot(
|
||||
stateRoot: RootHex,
|
||||
opts?: StateGetOpts
|
||||
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null>;
|
||||
): Promise<{state: CachedBeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null>;
|
||||
/** Return serialized bytes of a persisted checkpoint state */
|
||||
getPersistedCheckpointState(checkpoint?: phase0.Checkpoint): Promise<Uint8Array | null>;
|
||||
/** Returns a cached state by checkpoint */
|
||||
|
||||
@@ -12,7 +12,6 @@ import {BeaconBlockBody, SSZTypesFor, ssz} from "@lodestar/types";
|
||||
import {SyncCommitteeWitness} from "./types.js";
|
||||
|
||||
export function getSyncCommitteesWitness(fork: ForkName, state: BeaconStateAllForks): SyncCommitteeWitness {
|
||||
state.commit();
|
||||
const n1 = state.node;
|
||||
let witness: Uint8Array[];
|
||||
let currentSyncCommitteeRoot: Uint8Array;
|
||||
@@ -71,7 +70,6 @@ export function getCurrentSyncCommitteeBranch(syncCommitteesWitness: SyncCommitt
|
||||
}
|
||||
|
||||
export function getFinalizedRootProof(state: CachedBeaconStateAllForks): Uint8Array[] {
|
||||
state.commit();
|
||||
const finalizedRootGindex = state.epochCtx.isPostElectra() ? FINALIZED_ROOT_GINDEX_ELECTRA : FINALIZED_ROOT_GINDEX;
|
||||
return new Tree(state.node).getSingleProof(BigInt(finalizedRootGindex));
|
||||
}
|
||||
|
||||
@@ -148,6 +148,7 @@ export class PrepareNextSlotScheduler {
|
||||
updatedPrepareState = (await this.chain.regen.getBlockSlotState(
|
||||
proposerHeadRoot,
|
||||
prepareSlot,
|
||||
// only transfer cache if epoch transition because that's the state we will use to stateTransition() the 1st block of epoch
|
||||
{dontTransferCache: !isEpochTransition},
|
||||
RegenCaller.predictProposerHead
|
||||
)) as CachedBeaconStateExecutions;
|
||||
|
||||
@@ -90,5 +90,5 @@ export interface IStateRegeneratorInternal {
|
||||
/**
|
||||
* Return the exact state with `stateRoot`
|
||||
*/
|
||||
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks>;
|
||||
getState(stateRoot: RootHex, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks>;
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
* This is not for block processing so don't transfer cache
|
||||
*/
|
||||
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null {
|
||||
return this.blockStateCache.get(stateRoot, {dontTransferCache: true});
|
||||
return this.blockStateCache.get(stateRoot);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -90,10 +90,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
* By default, do not transfer cache except for the block at clock slot
|
||||
* which is usually the gossip block.
|
||||
*/
|
||||
getPreStateSync(
|
||||
block: BeaconBlock,
|
||||
opts: StateRegenerationOpts = {dontTransferCache: true}
|
||||
): CachedBeaconStateAllForks | null {
|
||||
getPreStateSync(block: BeaconBlock): CachedBeaconStateAllForks | null {
|
||||
const parentRoot = toRootHex(block.parentRoot);
|
||||
const parentBlock = this.forkChoice.getBlockHex(parentRoot);
|
||||
if (!parentBlock) {
|
||||
@@ -108,7 +105,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
|
||||
// Check the checkpoint cache (if the pre-state is a checkpoint state)
|
||||
if (parentEpoch < blockEpoch) {
|
||||
const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch, opts);
|
||||
const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch);
|
||||
if (checkpointState && computeEpochAtSlot(checkpointState.slot) === blockEpoch) {
|
||||
return checkpointState;
|
||||
}
|
||||
@@ -118,7 +115,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
// Otherwise the state transition may not be cached and wasted. Queue for regen since the
|
||||
// work required will still be significant.
|
||||
if (parentEpoch === blockEpoch) {
|
||||
const state = this.blockStateCache.get(parentBlock.stateRoot, opts);
|
||||
const state = this.blockStateCache.get(parentBlock.stateRoot);
|
||||
if (state) {
|
||||
return state;
|
||||
}
|
||||
@@ -135,18 +132,14 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
* Get checkpoint state from cache, this function is not for block processing so don't transfer cache
|
||||
*/
|
||||
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null {
|
||||
return this.checkpointStateCache.get(cp, {dontTransferCache: true});
|
||||
return this.checkpointStateCache.get(cp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get state closest to head, this function is not for block processing so don't transfer cache
|
||||
*/
|
||||
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null {
|
||||
const opts = {dontTransferCache: true};
|
||||
return (
|
||||
this.checkpointStateCache.getLatest(head.blockRoot, Infinity, opts) ||
|
||||
this.blockStateCache.get(head.stateRoot, opts)
|
||||
);
|
||||
return this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.blockStateCache.get(head.stateRoot);
|
||||
}
|
||||
|
||||
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void {
|
||||
@@ -181,10 +174,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
maybeHeadStateRoot,
|
||||
};
|
||||
const headState =
|
||||
newHeadStateRoot === maybeHeadStateRoot
|
||||
? maybeHeadState
|
||||
: // maybeHeadState was already in block state cache so we don't transfer the cache
|
||||
this.blockStateCache.get(newHeadStateRoot, {dontTransferCache: true});
|
||||
newHeadStateRoot === maybeHeadStateRoot ? maybeHeadState : this.blockStateCache.get(newHeadStateRoot);
|
||||
|
||||
if (headState) {
|
||||
this.blockStateCache.setHeadState(headState);
|
||||
@@ -199,9 +189,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
|
||||
// for the new FIFOBlockStateCache, it's important to reload state to regen head state here if needed
|
||||
const allowDiskReload = true;
|
||||
// transfer cache here because we want to regen state asap
|
||||
const cloneOpts = {dontTransferCache: false};
|
||||
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, cloneOpts, allowDiskReload).then(
|
||||
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, allowDiskReload).then(
|
||||
(headStateRegen) => this.blockStateCache.setHeadState(headStateRegen),
|
||||
(e) => this.logger.error("Error on head state regen", logCtx, e)
|
||||
);
|
||||
@@ -224,7 +212,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState});
|
||||
|
||||
// First attempt to fetch the state from caches before queueing
|
||||
const cachedState = this.getPreStateSync(block, opts);
|
||||
const cachedState = this.getPreStateSync(block);
|
||||
|
||||
if (cachedState !== null) {
|
||||
return cachedState;
|
||||
@@ -243,7 +231,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState});
|
||||
|
||||
// First attempt to fetch the state from cache before queueing
|
||||
const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp), opts);
|
||||
const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp));
|
||||
if (checkpointState) {
|
||||
return checkpointState;
|
||||
}
|
||||
@@ -271,22 +259,18 @@ export class QueuedStateRegenerator implements IStateRegenerator {
|
||||
return this.jobQueue.push({key: "getBlockSlotState", args: [blockRoot, slot, opts, rCaller]});
|
||||
}
|
||||
|
||||
async getState(
|
||||
stateRoot: RootHex,
|
||||
rCaller: RegenCaller,
|
||||
opts: StateRegenerationOpts = {dontTransferCache: true}
|
||||
): Promise<CachedBeaconStateAllForks> {
|
||||
async getState(stateRoot: RootHex, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks> {
|
||||
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});
|
||||
|
||||
// First attempt to fetch the state from cache before queueing
|
||||
const state = this.blockStateCache.get(stateRoot, opts);
|
||||
const state = this.blockStateCache.get(stateRoot);
|
||||
if (state) {
|
||||
return state;
|
||||
}
|
||||
|
||||
// The state is not immediately available in the cache, enqueue the job
|
||||
this.metrics?.regenFnQueuedTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});
|
||||
return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller, opts]});
|
||||
return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller]});
|
||||
}
|
||||
|
||||
private jobQueueProcessor = async (regenRequest: RegenRequest): Promise<CachedBeaconStateAllForks> => {
|
||||
|
||||
@@ -78,7 +78,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
}
|
||||
|
||||
// Otherwise, get the state normally.
|
||||
return this.getState(parentBlock.stateRoot, regenCaller, opts, allowDiskReload);
|
||||
return this.getState(parentBlock.stateRoot, regenCaller, allowDiskReload);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -124,8 +124,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
const {checkpointStateCache} = this.modules;
|
||||
const epoch = computeEpochAtSlot(slot);
|
||||
const latestCheckpointStateCtx = allowDiskReload
|
||||
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch, opts)
|
||||
: checkpointStateCache.getLatest(blockRoot, epoch, opts);
|
||||
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch)
|
||||
: checkpointStateCache.getLatest(blockRoot, epoch);
|
||||
|
||||
// If a checkpoint state exists with the given checkpoint root, it either is in requested epoch
|
||||
// or needs to have empty slots processed until the requested epoch
|
||||
@@ -136,7 +136,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
// Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root
|
||||
// regenerate that state,
|
||||
// then process empty slots until the requested epoch
|
||||
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, opts, allowDiskReload);
|
||||
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, allowDiskReload);
|
||||
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, regenCaller, opts);
|
||||
}
|
||||
|
||||
@@ -148,23 +148,15 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
async getState(
|
||||
stateRoot: RootHex,
|
||||
caller: RegenCaller,
|
||||
opts?: StateRegenerationOpts,
|
||||
// internal option, don't want to expose to external caller
|
||||
allowDiskReload = false
|
||||
): Promise<CachedBeaconStateAllForks> {
|
||||
// Trivial case, state at stateRoot is already cached
|
||||
const cachedStateCtx = this.modules.blockStateCache.get(stateRoot, opts);
|
||||
const cachedStateCtx = this.modules.blockStateCache.get(stateRoot);
|
||||
if (cachedStateCtx) {
|
||||
return cachedStateCtx;
|
||||
}
|
||||
|
||||
// in block gossip validation (getPreState() call), dontTransferCache is specified as true because we only want to transfer cache in verifyBlocksStateTransitionOnly()
|
||||
// but here we want to process blocks as fast as possible so force to transfer cache in this case
|
||||
if (opts && allowDiskReload) {
|
||||
// if there is no `opts` specified, it already means "false"
|
||||
opts.dontTransferCache = false;
|
||||
}
|
||||
|
||||
// Otherwise we have to use the fork choice to traverse backwards, block by block,
|
||||
// searching the state caches
|
||||
// then replay blocks forward to the desired stateRoot
|
||||
@@ -179,7 +171,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
const getSeedStateTimer = this.modules.metrics?.regenGetState.getSeedState.startTimer({caller});
|
||||
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
|
||||
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
|
||||
state = this.modules.blockStateCache.get(b.stateRoot, opts);
|
||||
state = this.modules.blockStateCache.get(b.stateRoot);
|
||||
if (state) {
|
||||
break;
|
||||
}
|
||||
@@ -187,8 +179,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
if (!lastBlockToReplay) continue;
|
||||
const epoch = computeEpochAtSlot(lastBlockToReplay.slot - 1);
|
||||
state = allowDiskReload
|
||||
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch, opts)
|
||||
: checkpointStateCache.getLatest(b.blockRoot, epoch, opts);
|
||||
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch)
|
||||
: checkpointStateCache.getLatest(b.blockRoot, epoch);
|
||||
if (state) {
|
||||
break;
|
||||
}
|
||||
@@ -255,6 +247,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
try {
|
||||
// Only advances state trusting block's signture and hashes.
|
||||
// We are only running the state transition to get a specific state's data.
|
||||
// stateTransition() does the clone() inside, transfer cache to make the regen faster
|
||||
state = stateTransition(
|
||||
state,
|
||||
block,
|
||||
@@ -265,6 +258,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
|
||||
verifyStateRoot: false,
|
||||
verifyProposer: false,
|
||||
verifySignatures: false,
|
||||
dontTransferCache: false,
|
||||
},
|
||||
this.modules
|
||||
);
|
||||
@@ -390,8 +384,8 @@ export async function processSlotsToNearestCheckpoint(
|
||||
const checkpointState = postState;
|
||||
const cp = getCheckpointFromState(checkpointState);
|
||||
checkpointStateCache.add(cp, checkpointState);
|
||||
// consumers should not mutate or get the transfered cache
|
||||
emitter?.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));
|
||||
// consumers should not mutate state ever
|
||||
emitter?.emit(ChainEvent.checkpoint, cp, checkpointState);
|
||||
|
||||
if (count >= 1) {
|
||||
// in normal condition, we only process 1 epoch so never reach this
|
||||
|
||||
@@ -3,7 +3,6 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
|
||||
import {Epoch, RootHex} from "@lodestar/types";
|
||||
import {toRootHex} from "@lodestar/utils";
|
||||
import {Metrics} from "../../metrics/index.js";
|
||||
import {StateRegenerationOpts} from "../regen/interface.js";
|
||||
import {MapTracker} from "./mapMetrics.js";
|
||||
import {BlockStateCache} from "./types.js";
|
||||
|
||||
@@ -39,7 +38,7 @@ export class BlockStateCacheImpl implements BlockStateCache {
|
||||
}
|
||||
}
|
||||
|
||||
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
|
||||
get(rootHex: RootHex): CachedBeaconStateAllForks | null {
|
||||
this.metrics?.lookups.inc();
|
||||
const item = this.head?.stateRoot === rootHex ? this.head.state : this.cache.get(rootHex);
|
||||
if (!item) {
|
||||
@@ -49,7 +48,7 @@ export class BlockStateCacheImpl implements BlockStateCache {
|
||||
this.metrics?.hits.inc();
|
||||
this.metrics?.stateClonedCount.observe(item.clonedCount);
|
||||
|
||||
return item.clone(opts?.dontTransferCache);
|
||||
return item;
|
||||
}
|
||||
|
||||
add(item: CachedBeaconStateAllForks): void {
|
||||
|
||||
@@ -4,7 +4,6 @@ import {RootHex} from "@lodestar/types";
|
||||
import {toRootHex} from "@lodestar/utils";
|
||||
import {Metrics} from "../../metrics/index.js";
|
||||
import {LinkedList} from "../../util/array.js";
|
||||
import {StateRegenerationOpts} from "../regen/interface.js";
|
||||
import {MapTracker} from "./mapMetrics.js";
|
||||
import {BlockStateCache} from "./types.js";
|
||||
|
||||
@@ -86,14 +85,14 @@ export class FIFOBlockStateCache implements BlockStateCache {
|
||||
}
|
||||
|
||||
const firstState = firstValue.value;
|
||||
// don't transfer cache because consumer only use this cache to reload another state from disc
|
||||
return firstState.clone(true);
|
||||
// consumers should not mutate the returned state
|
||||
return firstState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a state from this cache given a state root hex.
|
||||
*/
|
||||
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
|
||||
get(rootHex: RootHex): CachedBeaconStateAllForks | null {
|
||||
this.metrics?.lookups.inc();
|
||||
const item = this.cache.get(rootHex);
|
||||
if (!item) {
|
||||
@@ -103,7 +102,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
|
||||
this.metrics?.hits.inc();
|
||||
this.metrics?.stateClonedCount.observe(item.clonedCount);
|
||||
|
||||
return item.clone(opts?.dontTransferCache);
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -3,7 +3,6 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
|
||||
import {Epoch, RootHex, phase0} from "@lodestar/types";
|
||||
import {MapDef, toRootHex} from "@lodestar/utils";
|
||||
import {Metrics} from "../../metrics/index.js";
|
||||
import {StateRegenerationOpts} from "../regen/interface.js";
|
||||
import {MapTracker} from "./mapMetrics.js";
|
||||
import {CacheItemType, CheckpointStateCache} from "./types.js";
|
||||
|
||||
@@ -42,21 +41,16 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
|
||||
this.maxEpochs = maxEpochs;
|
||||
}
|
||||
|
||||
async getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null> {
|
||||
return this.get(cp, opts);
|
||||
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
|
||||
return this.get(cp);
|
||||
}
|
||||
|
||||
async getStateOrBytes(cp: CheckpointHex): Promise<Uint8Array | CachedBeaconStateAllForks | null> {
|
||||
// no need to transfer cache for this api
|
||||
return this.get(cp, {dontTransferCache: true});
|
||||
return this.get(cp);
|
||||
}
|
||||
|
||||
async getOrReloadLatest(
|
||||
rootHex: string,
|
||||
maxEpoch: number,
|
||||
opts?: StateRegenerationOpts
|
||||
): Promise<CachedBeaconStateAllForks | null> {
|
||||
return this.getLatest(rootHex, maxEpoch, opts);
|
||||
async getOrReloadLatest(rootHex: string, maxEpoch: number): Promise<CachedBeaconStateAllForks | null> {
|
||||
return this.getLatest(rootHex, maxEpoch);
|
||||
}
|
||||
|
||||
async processState(): Promise<number> {
|
||||
@@ -64,7 +58,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
|
||||
return 0;
|
||||
}
|
||||
|
||||
get(cp: CheckpointHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
|
||||
get(cp: CheckpointHex): CachedBeaconStateAllForks | null {
|
||||
this.metrics?.lookups.inc();
|
||||
const cpKey = toCheckpointKey(cp);
|
||||
const item = this.cache.get(cpKey);
|
||||
@@ -81,7 +75,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
|
||||
|
||||
this.metrics?.stateClonedCount.observe(item.clonedCount);
|
||||
|
||||
return item.clone(opts?.dontTransferCache);
|
||||
return item;
|
||||
}
|
||||
|
||||
add(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
|
||||
@@ -98,14 +92,14 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
|
||||
/**
|
||||
* Searches for the latest cached state with a `root`, starting with `epoch` and descending
|
||||
*/
|
||||
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
|
||||
getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null {
|
||||
// sort epochs in descending order, only consider epochs lte `epoch`
|
||||
const epochs = Array.from(this.epochIndex.keys())
|
||||
.sort((a, b) => b - a)
|
||||
.filter((e) => e <= maxEpoch);
|
||||
for (const epoch of epochs) {
|
||||
if (this.epochIndex.get(epoch)?.has(rootHex)) {
|
||||
return this.get({rootHex, epoch}, opts);
|
||||
return this.get({rootHex, epoch});
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
@@ -11,7 +11,6 @@ import {Logger, MapDef, fromHex, sleep, toHex, toRootHex} from "@lodestar/utils"
|
||||
import {Metrics} from "../../metrics/index.js";
|
||||
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
|
||||
import {IClock} from "../../util/clock.js";
|
||||
import {StateRegenerationOpts} from "../regen/interface.js";
|
||||
import {serializeState} from "../serializeState.js";
|
||||
import {CPStateDatastore, DatastoreKey} from "./datastore/index.js";
|
||||
import {MapTracker} from "./mapMetrics.js";
|
||||
@@ -204,10 +203,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
* - Get block for processing
|
||||
* - Regen head state
|
||||
*/
|
||||
async getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null> {
|
||||
const stateOrStateBytesData = await this.getStateOrLoadDb(cp, opts);
|
||||
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
|
||||
const stateOrStateBytesData = await this.getStateOrLoadDb(cp);
|
||||
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
|
||||
return stateOrStateBytesData?.clone(opts?.dontTransferCache) ?? null;
|
||||
return stateOrStateBytesData ?? null;
|
||||
}
|
||||
const {persistedKey, stateBytes} = stateOrStateBytesData;
|
||||
const logMeta = {persistedKey: toHex(persistedKey)};
|
||||
@@ -233,7 +232,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
sszTimer?.();
|
||||
const timer = this.metrics?.cpStateCache.stateReloadDuration.startTimer();
|
||||
const newCachedState = loadCachedBeaconState(seedState, stateBytes, {}, validatorsBytes);
|
||||
newCachedState.commit();
|
||||
// hashTreeRoot() calls the commit() inside
|
||||
// there is no modification inside the state, it's just that we want to compute and cache all roots
|
||||
const stateRoot = toRootHex(newCachedState.hashTreeRoot());
|
||||
timer?.();
|
||||
|
||||
@@ -252,7 +252,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey});
|
||||
this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex);
|
||||
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
|
||||
return newCachedState.clone(opts?.dontTransferCache);
|
||||
return newCachedState;
|
||||
} catch (e) {
|
||||
this.logger.debug("Reload: error loading cached state", logMeta, e as Error);
|
||||
return null;
|
||||
@@ -263,8 +263,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
* Return either state or state bytes loaded from db.
|
||||
*/
|
||||
async getStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null> {
|
||||
// don't have to transfer cache for this specific api
|
||||
const stateOrLoadedState = await this.getStateOrLoadDb(cp, {dontTransferCache: true});
|
||||
const stateOrLoadedState = await this.getStateOrLoadDb(cp);
|
||||
if (stateOrLoadedState === null || isCachedBeaconState(stateOrLoadedState)) {
|
||||
return stateOrLoadedState;
|
||||
}
|
||||
@@ -274,12 +273,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
/**
|
||||
* Return either state or state bytes with persisted key loaded from db.
|
||||
*/
|
||||
async getStateOrLoadDb(
|
||||
cp: CheckpointHex,
|
||||
opts?: StateRegenerationOpts
|
||||
): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
|
||||
async getStateOrLoadDb(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
|
||||
const cpKey = toCacheKey(cp);
|
||||
const inMemoryState = this.get(cpKey, opts);
|
||||
const inMemoryState = this.get(cpKey);
|
||||
if (inMemoryState) {
|
||||
return inMemoryState;
|
||||
}
|
||||
@@ -308,7 +304,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
/**
|
||||
* Similar to get() api without reloading from disk
|
||||
*/
|
||||
get(cpOrKey: CheckpointHex | string, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
|
||||
get(cpOrKey: CheckpointHex | string): CachedBeaconStateAllForks | null {
|
||||
this.metrics?.cpStateCache.lookups.inc();
|
||||
const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey);
|
||||
const cacheItem = this.cache.get(cpKey);
|
||||
@@ -326,7 +322,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
if (isInMemoryCacheItem(cacheItem)) {
|
||||
const {state} = cacheItem;
|
||||
this.metrics?.cpStateCache.stateClonedCount.observe(state.clonedCount);
|
||||
return state.clone(opts?.dontTransferCache);
|
||||
return state;
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -360,14 +356,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
/**
|
||||
* Searches in-memory state for the latest cached state with a `root` without reload, starting with `epoch` and descending
|
||||
*/
|
||||
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
|
||||
getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null {
|
||||
// sort epochs in descending order, only consider epochs lte `epoch`
|
||||
const epochs = Array.from(this.epochIndex.keys())
|
||||
.sort((a, b) => b - a)
|
||||
.filter((e) => e <= maxEpoch);
|
||||
for (const epoch of epochs) {
|
||||
if (this.epochIndex.get(epoch)?.has(rootHex)) {
|
||||
const inMemoryClonedState = this.get({rootHex, epoch}, opts);
|
||||
const inMemoryClonedState = this.get({rootHex, epoch});
|
||||
if (inMemoryClonedState) {
|
||||
return inMemoryClonedState;
|
||||
}
|
||||
@@ -383,11 +379,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
* - Get block for processing
|
||||
* - Regen head state
|
||||
*/
|
||||
async getOrReloadLatest(
|
||||
rootHex: RootHex,
|
||||
maxEpoch: Epoch,
|
||||
opts?: StateRegenerationOpts
|
||||
): Promise<CachedBeaconStateAllForks | null> {
|
||||
async getOrReloadLatest(rootHex: RootHex, maxEpoch: Epoch): Promise<CachedBeaconStateAllForks | null> {
|
||||
// sort epochs in descending order, only consider epochs lte `epoch`
|
||||
const epochs = Array.from(this.epochIndex.keys())
|
||||
.sort((a, b) => b - a)
|
||||
@@ -395,9 +387,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
|
||||
for (const epoch of epochs) {
|
||||
if (this.epochIndex.get(epoch)?.has(rootHex)) {
|
||||
try {
|
||||
const clonedState = await this.getOrReload({rootHex, epoch}, opts);
|
||||
if (clonedState) {
|
||||
return clonedState;
|
||||
const state = await this.getOrReload({rootHex, epoch});
|
||||
if (state) {
|
||||
return state;
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import {routes} from "@lodestar/api";
|
||||
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
|
||||
import {Epoch, RootHex, phase0} from "@lodestar/types";
|
||||
import {StateRegenerationOpts} from "../regen/interface.js";
|
||||
|
||||
export type CheckpointHex = {epoch: Epoch; rootHex: RootHex};
|
||||
|
||||
@@ -21,7 +20,7 @@ export type CheckpointHex = {epoch: Epoch; rootHex: RootHex};
|
||||
* The cache key is state root
|
||||
*/
|
||||
export interface BlockStateCache {
|
||||
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
|
||||
get(rootHex: RootHex): CachedBeaconStateAllForks | null;
|
||||
add(item: CachedBeaconStateAllForks): void;
|
||||
setHeadState(item: CachedBeaconStateAllForks | null): void;
|
||||
/**
|
||||
@@ -60,16 +59,12 @@ export interface BlockStateCache {
|
||||
*/
|
||||
export interface CheckpointStateCache {
|
||||
init?: () => Promise<void>;
|
||||
getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null>;
|
||||
getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null>;
|
||||
getStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
|
||||
get(cpOrKey: CheckpointHex | string, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
|
||||
get(cpOrKey: CheckpointHex | string): CachedBeaconStateAllForks | null;
|
||||
add(cp: phase0.Checkpoint, state: CachedBeaconStateAllForks): void;
|
||||
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
|
||||
getOrReloadLatest(
|
||||
rootHex: RootHex,
|
||||
maxEpoch: Epoch,
|
||||
opts?: StateRegenerationOpts
|
||||
): Promise<CachedBeaconStateAllForks | null>;
|
||||
getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null;
|
||||
getOrReloadLatest(rootHex: RootHex, maxEpoch: Epoch): Promise<CachedBeaconStateAllForks | null>;
|
||||
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
|
||||
prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void;
|
||||
pruneFinalized(finalizedEpoch: Epoch): void;
|
||||
|
||||
@@ -108,7 +108,7 @@ describe("get proposers api impl", () => {
|
||||
it("should get proposers for historical epoch", async () => {
|
||||
const historicalEpoch = currentEpoch - 2;
|
||||
initializeState(currentSlot - 2 * SLOTS_PER_EPOCH);
|
||||
modules.chain.getStateBySlot.mockResolvedValue({state, executionOptimistic: false, finalized: true});
|
||||
modules.chain.getStateBySlot.mockResolvedValue({state: cachedState, executionOptimistic: false, finalized: true});
|
||||
|
||||
const {data: result} = (await api.getProposerDuties({epoch: historicalEpoch})) as {
|
||||
data: routes.validator.ProposerDutyList;
|
||||
|
||||
@@ -23,9 +23,12 @@ type SubRewardValue = number; // All reward values should be integer
|
||||
export async function computeBlockRewards(
|
||||
config: BeaconConfig,
|
||||
block: BeaconBlock,
|
||||
preState: CachedBeaconStateAllForks,
|
||||
postState?: CachedBeaconStateAllForks
|
||||
preStateIn: CachedBeaconStateAllForks,
|
||||
postStateIn?: CachedBeaconStateAllForks
|
||||
): Promise<rewards.BlockRewards> {
|
||||
const preState = preStateIn.clone();
|
||||
const postState = postStateIn?.clone();
|
||||
|
||||
const fork = config.getForkName(block.slot);
|
||||
const {attestations: cachedAttestationsReward = 0, syncAggregate: cachedSyncAggregateReward = 0} =
|
||||
postState?.proposerRewards ?? {};
|
||||
|
||||
@@ -19,7 +19,7 @@ export async function computeSyncCommitteeRewards(
|
||||
}
|
||||
|
||||
const altairBlock = block as altair.BeaconBlock;
|
||||
const preStateAltair = preState as CachedBeaconStateAltair;
|
||||
const preStateAltair = preState.clone() as CachedBeaconStateAltair;
|
||||
|
||||
// Bound syncCommitteeValidatorIndices in case it goes beyond SYNC_COMMITTEE_SIZE just to be safe
|
||||
const syncCommitteeValidatorIndices = preStateAltair.epochCtx.currentSyncCommitteeIndexed.validatorIndices.slice(
|
||||
|
||||
Reference in New Issue
Block a user