mirror of
https://github.com/ChainSafe/lodestar.git
synced 2026-01-09 15:48:08 -05:00
refactor: update the structure of state regen operation (#8509)
**Motivation** Use smaller and pure functions for complex logic. **Description** - Split the operation into plan, fetch and apply stages **Steps to test or reproduce** Run all tests
This commit is contained in:
@@ -0,0 +1,86 @@
|
||||
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
|
||||
import {BeaconConfig} from "@lodestar/config";
|
||||
import {Logger} from "@lodestar/logger";
|
||||
import {IBeaconDb} from "../../../db/index.ts";
|
||||
import {IStateDiffCodec} from "../interface.ts";
|
||||
import {replayBlocks} from "../utils/replayBlocks.ts";
|
||||
import {StateRegenArtifacts} from "./fetch.ts";
|
||||
import {StateRegenPlan} from "./plan.ts";
|
||||
import {BeaconStateSnapshot} from "./ssz.ts";
|
||||
import {replayStateDifferentials} from "./stateDifferential.ts";
|
||||
import {beaconStateBytesToSnapshot, snapshotToBeaconStateBytes} from "./stateSnapshot.ts";
|
||||
|
||||
export type StateRegenContext = {
|
||||
codec: IStateDiffCodec;
|
||||
config: BeaconConfig;
|
||||
logger?: Logger;
|
||||
pubkey2index: PubkeyIndexMap;
|
||||
db: IBeaconDb;
|
||||
};
|
||||
|
||||
export async function applyStateRegenPlan(
|
||||
ctx: StateRegenContext,
|
||||
plan: StateRegenPlan,
|
||||
artifacts: StateRegenArtifacts
|
||||
): Promise<BeaconStateSnapshot> {
|
||||
// When we start a node from a certain checkpoint which is usually
|
||||
// not the snapshot epoch but we fetch it because of the fallback settings
|
||||
if (plan.snapshotSlot !== artifacts.snapshot.slot) {
|
||||
ctx.logger?.warn("Expected snapshot not found", {
|
||||
expectedSnapshotSlot: plan.snapshotSlot,
|
||||
availableSnapshotSlot: artifacts.snapshot.slot,
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: Need to do further thinking if we fail here with fatal error
|
||||
if (artifacts.missingDiffs.length) {
|
||||
ctx.logger?.warn("Missing some diff states", {
|
||||
snapshotSlot: plan.snapshotSlot,
|
||||
diffPath: plan.diffSlots.join(","),
|
||||
missingDiffs: artifacts.missingDiffs.join(","),
|
||||
});
|
||||
}
|
||||
if (artifacts.diffs.length + artifacts.missingDiffs.length !== plan.diffSlots.length) {
|
||||
throw new Error(`Can not find required state diffs ${plan.diffSlots.join(",")}`);
|
||||
}
|
||||
|
||||
if (plan.blockReplay && artifacts.diffs.at(-1)?.slot !== plan.blockReplay.fromSlot - 1) {
|
||||
throw new Error(`Can not replay blocks due to missing state diffs ${artifacts.missingDiffs.join(",")}`);
|
||||
}
|
||||
|
||||
ctx.logger?.verbose("Replaying state diffs", {
|
||||
snapshotSlot: plan.snapshotSlot,
|
||||
diffPath: plan.diffSlots.join(","),
|
||||
availableDiffs: artifacts.diffs.map((d) => d.slot).join(","),
|
||||
});
|
||||
|
||||
const stateWithDiffApplied = await replayStateDifferentials(
|
||||
{codec: ctx.codec, logger: ctx.logger},
|
||||
{stateDifferentials: artifacts.diffs, stateSnapshot: artifacts.snapshot}
|
||||
);
|
||||
|
||||
if (stateWithDiffApplied.stateBytes.byteLength === 0 || stateWithDiffApplied.balancesBytes.byteLength === 0) {
|
||||
throw new Error(
|
||||
`Invalid state after applying diffs:
|
||||
stateBytesSize=${stateWithDiffApplied.stateBytes.byteLength},
|
||||
balancesBytesSize=${stateWithDiffApplied.balancesBytes.byteLength}`
|
||||
);
|
||||
}
|
||||
|
||||
if (!plan.blockReplay) return stateWithDiffApplied;
|
||||
|
||||
const stateBytes = snapshotToBeaconStateBytes({config: ctx.config}, stateWithDiffApplied);
|
||||
|
||||
ctx.logger?.verbose("Replaying blocks", {
|
||||
fromSlot: plan.blockReplay.fromSlot,
|
||||
tillSlot: plan.blockReplay.tillSlot,
|
||||
});
|
||||
|
||||
const replayed = await replayBlocks(ctx, {
|
||||
stateBytes,
|
||||
fromSlot: plan.blockReplay.fromSlot,
|
||||
toSlot: plan.blockReplay.tillSlot,
|
||||
});
|
||||
|
||||
return beaconStateBytesToSnapshot({config: ctx.config}, plan.blockReplay.tillSlot, replayed);
|
||||
}
|
||||
@@ -1,143 +0,0 @@
|
||||
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
|
||||
import {BeaconConfig} from "@lodestar/config";
|
||||
import {Slot} from "@lodestar/types";
|
||||
import {Logger} from "@lodestar/utils";
|
||||
import {IBeaconDb} from "../../../db/interface.js";
|
||||
import {IStateDiffCodec} from "../interface.js";
|
||||
import {replayBlocks} from "../utils/replayBlocks.js";
|
||||
import {HierarchicalLayers} from "./hierarchicalLayers.js";
|
||||
import {BeaconStateSnapshot} from "./ssz.js";
|
||||
import {getStateDifferentials, replayStateDifferentials} from "./stateDifferential.js";
|
||||
import {beaconStateBytesToSnapshot, getStateSnapshot, snapshotToBeaconStateBytes} from "./stateSnapshot.js";
|
||||
|
||||
type DifferentialStateOperation = {
|
||||
snapshotSlot: Slot;
|
||||
diffSlots: Slot[];
|
||||
blockReplay?: {
|
||||
fromSlot: Slot;
|
||||
tillSlot: Slot;
|
||||
};
|
||||
};
|
||||
|
||||
export async function processDifferentialOperation(
|
||||
modules: {
|
||||
pubkey2index: PubkeyIndexMap;
|
||||
logger?: Logger;
|
||||
db: IBeaconDb;
|
||||
codec: IStateDiffCodec;
|
||||
config: BeaconConfig;
|
||||
},
|
||||
operation: DifferentialStateOperation,
|
||||
opts?: {fallbackSnapshot?: boolean}
|
||||
): Promise<BeaconStateSnapshot | null> {
|
||||
const {logger, db, codec, config} = modules;
|
||||
const {snapshotSlot, diffSlots, blockReplay} = operation;
|
||||
|
||||
logger?.verbose("Processing differential state operation", {
|
||||
snapshotSlot,
|
||||
diffSlots: diffSlots.join(","),
|
||||
blockReplayFrom: blockReplay?.fromSlot,
|
||||
blockReplayTill: blockReplay?.tillSlot,
|
||||
});
|
||||
|
||||
// 1. First step is to fetch the snapshot state
|
||||
const stateSnapshot = await getStateSnapshot({db}, {slot: snapshotSlot, fallback: opts?.fallbackSnapshot ?? false});
|
||||
|
||||
if (!stateSnapshot) {
|
||||
throw new Error(`Can not find state snapshot for slot=${snapshotSlot}`);
|
||||
}
|
||||
|
||||
if (snapshotSlot !== stateSnapshot.slot) {
|
||||
logger?.warn("Expected snapshot not found", {
|
||||
expectedSnapshotSlot: snapshotSlot,
|
||||
availableSnapshotSlot: stateSnapshot.slot,
|
||||
});
|
||||
}
|
||||
|
||||
// We don't have any diffs and block replay
|
||||
if (diffSlots.length === 0 && !blockReplay) {
|
||||
return stateSnapshot;
|
||||
}
|
||||
|
||||
// 2. Fetch all diff states
|
||||
const nonEmptyDiffs = await getStateDifferentials({db}, {slots: diffSlots});
|
||||
if (nonEmptyDiffs.length < diffSlots.length) {
|
||||
logger?.warn("Missing some diff states", {
|
||||
snapshotSlot: stateSnapshot.slot,
|
||||
diffPath: diffSlots.join(","),
|
||||
availableDiffs: nonEmptyDiffs.map((d) => d.slot).join(","),
|
||||
});
|
||||
}
|
||||
|
||||
const lastDiffSlot = nonEmptyDiffs.at(-1)?.slot;
|
||||
if (!lastDiffSlot) {
|
||||
throw new Error(`Can not find any required diffs ${diffSlots.join(",")}`);
|
||||
}
|
||||
|
||||
// 3. Replay state diff on top of snapshot
|
||||
logger?.verbose("Replaying state diffs", {
|
||||
snapshotSlot,
|
||||
diffPath: diffSlots.join(","),
|
||||
availableDiffs: nonEmptyDiffs.map((d) => d.slot).join(","),
|
||||
});
|
||||
|
||||
const stateWithDiffApplied = await replayStateDifferentials(
|
||||
{codec, logger},
|
||||
{stateDifferentials: nonEmptyDiffs, stateSnapshot}
|
||||
);
|
||||
|
||||
if (stateWithDiffApplied.stateBytes.byteLength === 0 || stateWithDiffApplied.balancesBytes.byteLength === 0) {
|
||||
throw new Error(
|
||||
`Invalid state after applying diffs:
|
||||
stateBytesSize=${stateWithDiffApplied.stateBytes.byteLength},
|
||||
balancesBytesSize=${stateWithDiffApplied.balancesBytes.byteLength}`
|
||||
);
|
||||
}
|
||||
|
||||
// There is no blocks to replay
|
||||
if (!blockReplay) return stateWithDiffApplied;
|
||||
|
||||
const stateBytes = snapshotToBeaconStateBytes({config}, stateWithDiffApplied);
|
||||
|
||||
// 4. Replay blocks
|
||||
const stateWithBlockReplay = await replayBlocks(modules, {
|
||||
toSlot: blockReplay.tillSlot,
|
||||
fromSlot: lastDiffSlot,
|
||||
stateBytes,
|
||||
});
|
||||
|
||||
return beaconStateBytesToSnapshot({config}, blockReplay.tillSlot, stateWithBlockReplay);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the operation required to reach a target slot
|
||||
* @internal
|
||||
*/
|
||||
export function getDifferentialOperation(
|
||||
modules: {layers: HierarchicalLayers},
|
||||
slot: Slot
|
||||
): DifferentialStateOperation {
|
||||
const {layers} = modules;
|
||||
|
||||
const path = layers.computeSlotPath(slot);
|
||||
const snapshotSlot = path[0];
|
||||
const diffSlots = path.slice(1);
|
||||
const lastDiffSlot = diffSlots.at(-1);
|
||||
|
||||
if (slot === lastDiffSlot || slot === snapshotSlot) {
|
||||
return {
|
||||
snapshotSlot,
|
||||
diffSlots,
|
||||
blockReplay: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
snapshotSlot,
|
||||
diffSlots,
|
||||
blockReplay: {
|
||||
fromSlot: lastDiffSlot ? lastDiffSlot + 1 : snapshotSlot + 1,
|
||||
tillSlot: slot,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
import {BeaconState, Slot} from "@lodestar/types";
|
||||
import {StateRegenContext, applyStateRegenPlan} from "./apply.ts";
|
||||
import {fetchStateRegenArtifacts} from "./fetch.ts";
|
||||
import {HierarchicalLayers} from "./hierarchicalLayers.ts";
|
||||
import {buildStateRegenPlan} from "./plan.ts";
|
||||
import {snapshotToBeaconState} from "./stateSnapshot.ts";
|
||||
|
||||
export async function regenerateState(
|
||||
ctx: StateRegenContext & {layers: HierarchicalLayers},
|
||||
target: Slot,
|
||||
opts?: {fallbackSnapshot?: boolean}
|
||||
): Promise<BeaconState | null> {
|
||||
ctx.logger?.verbose("Regenerating state via state differential", {
|
||||
slot: target,
|
||||
});
|
||||
|
||||
const plan = buildStateRegenPlan(ctx.layers, target);
|
||||
const artifacts = await fetchStateRegenArtifacts(ctx.db, plan, opts);
|
||||
const finalState = await applyStateRegenPlan(ctx, plan, artifacts);
|
||||
|
||||
return snapshotToBeaconState(ctx, finalState);
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
import {Slot} from "@lodestar/types";
|
||||
import {IBeaconDb} from "../../../db/index.ts";
|
||||
import {StateRegenPlan} from "./plan.ts";
|
||||
import {BeaconStateDifferential, BeaconStateSnapshot} from "./ssz.ts";
|
||||
import {getStateDifferential} from "./stateDifferential.ts";
|
||||
import {getStateSnapshot} from "./stateSnapshot.ts";
|
||||
|
||||
export type StateRegenArtifacts = {
|
||||
snapshot: BeaconStateSnapshot;
|
||||
diffs: BeaconStateDifferential[];
|
||||
missingDiffs: Slot[];
|
||||
};
|
||||
|
||||
export async function fetchStateRegenArtifacts(
|
||||
db: IBeaconDb,
|
||||
plan: StateRegenPlan,
|
||||
opts: {fallbackSnapshot?: boolean} = {}
|
||||
): Promise<StateRegenArtifacts> {
|
||||
const snapshot = await getStateSnapshot({db}, {slot: plan.snapshotSlot, fallback: opts.fallbackSnapshot ?? false});
|
||||
|
||||
if (!snapshot) {
|
||||
throw new Error(`Can not find state snapshot for slot=${plan.snapshotSlot}`);
|
||||
}
|
||||
|
||||
const diffs: BeaconStateDifferential[] = [];
|
||||
const missingDiffs: Slot[] = [];
|
||||
|
||||
for (const edge of plan.diffSlots) {
|
||||
const diff = await getStateDifferential({db}, {slot: edge});
|
||||
diff ? diffs.push(diff) : missingDiffs.push(edge);
|
||||
}
|
||||
|
||||
return {snapshot, diffs, missingDiffs};
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
import {Slot} from "@lodestar/types";
|
||||
import {HierarchicalLayers} from "./hierarchicalLayers.ts";
|
||||
|
||||
export type StateRegenPlan = {
|
||||
targetSlot: Slot;
|
||||
snapshotSlot: Slot;
|
||||
diffSlots: Slot[];
|
||||
blockReplay?: {fromSlot: Slot; tillSlot: Slot};
|
||||
};
|
||||
|
||||
export function buildStateRegenPlan(layers: HierarchicalLayers, target: Slot): StateRegenPlan {
|
||||
const path = layers.computeSlotPath(target);
|
||||
const [snapshotSlot, ...diffSlots] = path;
|
||||
const lastDiffSlot = diffSlots.at(-1);
|
||||
|
||||
if (target === lastDiffSlot || target === snapshotSlot) {
|
||||
return {
|
||||
snapshotSlot,
|
||||
diffSlots,
|
||||
blockReplay: undefined,
|
||||
targetSlot: target,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
snapshotSlot,
|
||||
diffSlots,
|
||||
blockReplay: {
|
||||
fromSlot: lastDiffSlot ? lastDiffSlot + 1 : snapshotSlot + 1,
|
||||
tillSlot: target,
|
||||
},
|
||||
targetSlot: target,
|
||||
};
|
||||
}
|
||||
@@ -47,8 +47,7 @@ export async function replayBlocks(
|
||||
}
|
||||
);
|
||||
|
||||
// Will use this for metrics
|
||||
// biome-ignore lint/correctness/noUnusedVariables: <explanation>
|
||||
// biome-ignore lint/correctness/noUnusedVariables: Will use this for metrics
|
||||
let blockCount = 0;
|
||||
|
||||
for await (const block of db.blockArchive.valuesStream({gt: fromSlot, lte: toSlot})) {
|
||||
@@ -61,8 +60,7 @@ export async function replayBlocks(
|
||||
dataAvailabilityStatus: DataAvailabilityStatus.Available,
|
||||
});
|
||||
} catch (e) {
|
||||
// Add metrics for error
|
||||
// biome-ignore lint/complexity/noUselessCatch: <explanation>
|
||||
// biome-ignore lint/complexity/noUselessCatch: Add metrics for error
|
||||
throw e;
|
||||
}
|
||||
blockCount++;
|
||||
|
||||
@@ -323,7 +323,7 @@ export const nonOverlappingLayersData: LayersTest[] = [
|
||||
path: [0, computeStartSlotAtEpoch(3)],
|
||||
},
|
||||
{
|
||||
title: "after slot of first diff layer",
|
||||
title: "one slot after first diff layer",
|
||||
slot: computeStartSlotAtEpoch(3) + 1,
|
||||
path: [0, computeStartSlotAtEpoch(3)],
|
||||
blockReplay: {
|
||||
@@ -346,7 +346,7 @@ export const nonOverlappingLayersData: LayersTest[] = [
|
||||
path: [0, computeStartSlotAtEpoch(5)],
|
||||
},
|
||||
{
|
||||
title: "after slot of second diff layer",
|
||||
title: "one slot after second diff layer",
|
||||
slot: computeStartSlotAtEpoch(5) + 1,
|
||||
path: [0, computeStartSlotAtEpoch(5)],
|
||||
blockReplay: {
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
import {describe, expect, it} from "vitest";
|
||||
import {getDifferentialOperation} from "../../../../../src/chain/archiveStore/differentialState/differentialOperation.js";
|
||||
import {HierarchicalLayers} from "../../../../../src/chain/archiveStore/differentialState/hierarchicalLayers.js";
|
||||
import {allLayerTests} from "../../../../fixtures/differentialState/hierarchicalLayers.js";
|
||||
|
||||
describe("differential state / operations", () => {
|
||||
it.each(allLayerTests)("$title", ({slot, path, layers, blockReplay}) => {
|
||||
const hLayers = HierarchicalLayers.fromString(layers);
|
||||
|
||||
const snapshotSlot = path[0];
|
||||
const diffSlots = path.slice(1);
|
||||
|
||||
expect(getDifferentialOperation({layers: hLayers}, slot)).toEqual({snapshotSlot, diffSlots, blockReplay});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,22 @@
|
||||
import {describe, expect, it} from "vitest";
|
||||
import {HierarchicalLayers} from "../../../../../src/chain/archiveStore/differentialState/hierarchicalLayers.ts";
|
||||
import {buildStateRegenPlan} from "../../../../../src/chain/archiveStore/differentialState/plan.ts";
|
||||
import {allLayerTests} from "../../../../fixtures/differentialState/hierarchicalLayers.ts";
|
||||
|
||||
describe("differential state / plan", () => {
|
||||
it.each(allLayerTests)("$title", ({slot, path, layers, blockReplay}) => {
|
||||
const hLayers = HierarchicalLayers.fromString(layers);
|
||||
|
||||
const snapshotSlot = path[0];
|
||||
const diffSlots = path.slice(1);
|
||||
|
||||
const plan = buildStateRegenPlan(hLayers, slot);
|
||||
|
||||
expect(plan).toEqual({
|
||||
snapshotSlot,
|
||||
diffSlots,
|
||||
blockReplay,
|
||||
targetSlot: slot,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,9 +1,9 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import {beforeAll, describe, expect, it} from "vitest";
|
||||
import {ForkName} from "@lodestar/params";
|
||||
import {BeaconState, Epoch, RootHex, Slot, phase0, ssz} from "@lodestar/types";
|
||||
import {fromHex} from "@lodestar/utils";
|
||||
import {beforeAll, describe, expect, it} from "vitest";
|
||||
import {IStateDiffCodec} from "../../../../../src/chain/archiveStore/interface.js";
|
||||
import {BinaryDiffCodec} from "../../../../../src/chain/archiveStore/utils/binaryDiffCodec.js";
|
||||
import {generateState} from "../../../../utils/state.js";
|
||||
|
||||
Reference in New Issue
Block a user