feat: cache serialized data column sidecars (#8627)

**Motivation**

- https://github.com/ChainSafe/lodestar/issues/8624

**Description**

- Cache serialized data column sidecars (from gossip and reqresp)
- Use serialized data column sidecars (if available) when persisting to
db (for engine, ~10 per slot, they will not be available, so they will
still be reserialized)
This commit is contained in:
Cayman
2025-11-22 14:23:20 -05:00
committed by GitHub
parent 3e80b7391e
commit c2cf1aac27
15 changed files with 113 additions and 77 deletions

View File

@@ -95,6 +95,9 @@ export async function importBlock(
await writeBlockInputToDb.call(this, [blockInput]);
}
// Without forcefully clearing this cache, we would rely on WeakMap to evict memory which is not reliable
this.serializedCache.clear();
// 2. Import block to fork choice
// Should compute checkpoint balances before forkchoice.onBlock

View File

@@ -60,7 +60,19 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInputs: IBloc
);
}
fnPromises.push(this.db.dataColumnSidecar.putMany(blockRoot, dataColumnSidecars));
const binaryPuts = [];
const nonbinaryPuts = [];
for (const dataColumnSidecar of dataColumnSidecars) {
// skip reserializing column if we already have it
const serialized = this.serializedCache.get(dataColumnSidecar);
if (serialized) {
binaryPuts.push({key: dataColumnSidecar.index, value: serialized});
} else {
nonbinaryPuts.push(dataColumnSidecar);
}
}
fnPromises.push(this.db.dataColumnSidecar.putManyBinary(blockRoot, binaryPuts));
fnPromises.push(this.db.dataColumnSidecar.putMany(blockRoot, nonbinaryPuts));
this.logger.debug("Persisted dataColumnSidecars to hot DB", {
slot: block.message.slot,
root: blockRootHex,

View File

@@ -27,7 +27,6 @@ import {
Slot,
SlotRootHex,
SubnetID,
WithBytes,
altair,
capella,
deneb,
@@ -69,14 +68,8 @@ export interface INetwork extends INetworkCorePublic {
reStatusPeers(peers: PeerIdStr[]): Promise<void>;
searchUnknownSlotRoot(slotRoot: SlotRootHex, source: BlockInputSource, peer?: PeerIdStr): void;
// ReqResp
sendBeaconBlocksByRange(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest
): Promise<WithBytes<SignedBeaconBlock>[]>;
sendBeaconBlocksByRoot(
peerId: PeerIdStr,
request: BeaconBlocksByRootRequest
): Promise<WithBytes<SignedBeaconBlock>[]>;
sendBeaconBlocksByRange(peerId: PeerIdStr, request: phase0.BeaconBlocksByRangeRequest): Promise<SignedBeaconBlock[]>;
sendBeaconBlocksByRoot(peerId: PeerIdStr, request: BeaconBlocksByRootRequest): Promise<SignedBeaconBlock[]>;
sendBlobSidecarsByRange(peerId: PeerIdStr, request: deneb.BlobSidecarsByRangeRequest): Promise<deneb.BlobSidecar[]>;
sendBlobSidecarsByRoot(peerId: PeerIdStr, request: BlobSidecarsByRootRequest): Promise<deneb.BlobSidecar[]>;
sendDataColumnSidecarsByRange(

View File

@@ -20,7 +20,6 @@ import {
SingleAttestation,
SlotRootHex,
SubnetID,
WithBytes,
altair,
capella,
deneb,
@@ -513,7 +512,7 @@ export class Network implements INetwork {
async sendBeaconBlocksByRange(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest
): Promise<WithBytes<SignedBeaconBlock>[]> {
): Promise<SignedBeaconBlock[]> {
return collectSequentialBlocksInRange(
this.sendReqRespRequest(
peerId,
@@ -526,10 +525,7 @@ export class Network implements INetwork {
);
}
async sendBeaconBlocksByRoot(
peerId: PeerIdStr,
request: BeaconBlocksByRootRequest
): Promise<WithBytes<SignedBeaconBlock>[]> {
async sendBeaconBlocksByRoot(peerId: PeerIdStr, request: BeaconBlocksByRootRequest): Promise<SignedBeaconBlock[]> {
return collectMaxResponseTypedWithBytes(
this.sendReqRespRequest(
peerId,
@@ -539,7 +535,8 @@ export class Network implements INetwork {
request
),
request.length,
responseSszTypeByMethod[ReqRespMethod.BeaconBlocksByRoot]
responseSszTypeByMethod[ReqRespMethod.BeaconBlocksByRoot],
this.chain.serializedCache
);
}
@@ -592,7 +589,8 @@ export class Network implements INetwork {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.BlobSidecarsByRoot, [Version.V1], request),
request.length,
responseSszTypeByMethod[ReqRespMethod.BlobSidecarsByRoot]
responseSszTypeByMethod[ReqRespMethod.BlobSidecarsByRoot],
this.chain.serializedCache
);
}
@@ -614,7 +612,8 @@ export class Network implements INetwork {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.DataColumnSidecarsByRoot, [Version.V1], request),
request.reduce((total, {columns}) => total + columns.length, 0),
responseSszTypeByMethod[ReqRespMethod.DataColumnSidecarsByRoot]
responseSszTypeByMethod[ReqRespMethod.DataColumnSidecarsByRoot],
this.chain.serializedCache
);
}

View File

@@ -516,6 +516,7 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
throw new GossipActionError(GossipAction.REJECT, {code: "PRE_DENEB_BLOCK"});
}
const blockInput = await validateBeaconBlob(blobSidecar, topic.subnet, peerIdStr, seenTimestampSec);
chain.serializedCache.set(blobSidecar, serializedData);
if (!blockInput.hasBlockAndAllData()) {
const cutoffTimeMs = getCutoffTimeMs(chain, blobSlot, BLOCK_AVAILABILITY_CUTOFF_MS);
chain.logger.debug("Received gossip blob, waiting for full data availability", {
@@ -562,6 +563,7 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
peerIdStr,
seenTimestampSec
);
chain.serializedCache.set(dataColumnSidecar, serializedData);
const blockInputMeta = blockInput.getLogMeta();
const {receivedColumns} = blockInputMeta;
// it's not helpful to track every single column received

View File

@@ -1,6 +1,6 @@
import {Type} from "@chainsafe/ssz";
import {RequestError, RequestErrorCode, ResponseIncoming} from "@lodestar/reqresp";
import {WithBytes} from "@lodestar/types";
import {SerializedCache} from "../../../util/serializedCache.ts";
import {ResponseTypeGetter} from "../types.js";
/**
@@ -32,13 +32,16 @@ export async function collectExactOneTyped<T>(
export async function collectMaxResponseTyped<T>(
source: AsyncIterable<ResponseIncoming>,
maxResponses: number,
typeFn: ResponseTypeGetter<T>
typeFn: ResponseTypeGetter<T>,
serializedCache?: SerializedCache
): Promise<T[]> {
// else: zero or more responses
const responses: T[] = [];
for await (const chunk of source) {
const type = typeFn(chunk.fork, chunk.protocolVersion);
const response = sszDeserializeResponse(type, chunk.data);
// optionally cache the serialized response if the cache is available
serializedCache?.set(response as object, chunk.data);
responses.push(response);
if (maxResponses !== undefined && responses.length >= maxResponses) {
@@ -58,14 +61,17 @@ export async function collectMaxResponseTyped<T>(
export async function collectMaxResponseTypedWithBytes<T>(
source: AsyncIterable<ResponseIncoming>,
maxResponses: number,
typeFn: ResponseTypeGetter<T>
): Promise<WithBytes<T>[]> {
typeFn: ResponseTypeGetter<T>,
serializedCache?: SerializedCache
): Promise<T[]> {
// else: zero or more responses
const responses: WithBytes<T>[] = [];
const responses: T[] = [];
for await (const chunk of source) {
const type = typeFn(chunk.fork, chunk.protocolVersion);
const data = sszDeserializeResponse(type, chunk.data);
responses.push({data, bytes: chunk.data});
responses.push(data);
// optionally cache the serialized response if the cache is available
serializedCache?.set(data as object, chunk.data);
if (maxResponses !== undefined && responses.length >= maxResponses) {
break;

View File

@@ -1,6 +1,7 @@
import {ResponseIncoming} from "@lodestar/reqresp";
import {SignedBeaconBlock, WithBytes, phase0} from "@lodestar/types";
import {SignedBeaconBlock, phase0} from "@lodestar/types";
import {LodestarError} from "@lodestar/utils";
import {SerializedCache} from "../../../util/serializedCache.ts";
import {ReqRespMethod, responseSszTypeByMethod} from "../types.js";
import {sszDeserializeResponse} from "./collect.js";
@@ -10,9 +11,10 @@ import {sszDeserializeResponse} from "./collect.js";
*/
export async function collectSequentialBlocksInRange(
blockStream: AsyncIterable<ResponseIncoming>,
{count, startSlot}: Pick<phase0.BeaconBlocksByRangeRequest, "count" | "startSlot">
): Promise<WithBytes<SignedBeaconBlock>[]> {
const blocks: WithBytes<SignedBeaconBlock>[] = [];
{count, startSlot}: Pick<phase0.BeaconBlocksByRangeRequest, "count" | "startSlot">,
serializedCache?: SerializedCache
): Promise<SignedBeaconBlock[]> {
const blocks: SignedBeaconBlock[] = [];
for await (const chunk of blockStream) {
const blockType = responseSszTypeByMethod[ReqRespMethod.BeaconBlocksByRange](chunk.fork, chunk.protocolVersion);
@@ -30,11 +32,13 @@ export async function collectSequentialBlocksInRange(
}
const prevBlock = blocks.at(-1);
if (prevBlock && prevBlock.data.message.slot >= blockSlot) {
if (prevBlock && prevBlock.message.slot >= blockSlot) {
throw new BlocksByRangeError({code: BlocksByRangeErrorCode.BAD_SEQUENCE});
}
blocks.push({data: block, bytes: chunk.data});
blocks.push(block);
// optionally cache the serialized response if the cache is available
serializedCache?.set(block, chunk.data);
if (blocks.length >= count) {
break; // Done, collected all blocks
}

View File

@@ -749,27 +749,33 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
const anchorBlock = res[0];
// GENESIS_SLOT doesn't has valid signature
if (anchorBlock.data.message.slot === GENESIS_SLOT) return;
if (anchorBlock.message.slot === GENESIS_SLOT) return;
await verifyBlockProposerSignature(this.chain.bls, this.chain.getHeadState(), [anchorBlock]);
// We can write to the disk if this is ahead of prevFinalizedCheckpointBlock otherwise
// we will need to go make checks on the top of sync loop before writing as it might
// override prevFinalizedCheckpointBlock
if (this.prevFinalizedCheckpointBlock.slot < anchorBlock.data.message.slot)
await this.db.blockArchive.putBinary(anchorBlock.data.message.slot, anchorBlock.bytes);
if (this.prevFinalizedCheckpointBlock.slot < anchorBlock.message.slot) {
const serialized = this.chain.serializedCache.get(anchorBlock);
if (serialized) {
await this.db.blockArchive.putBinary(anchorBlock.message.slot, serialized);
} else {
await this.db.blockArchive.put(anchorBlock.message.slot, anchorBlock);
}
}
this.syncAnchor = {
anchorBlock: anchorBlock.data,
anchorBlock: anchorBlock,
anchorBlockRoot,
anchorSlot: anchorBlock.data.message.slot,
lastBackSyncedBlock: {root: anchorBlockRoot, slot: anchorBlock.data.message.slot, block: anchorBlock.data},
anchorSlot: anchorBlock.message.slot,
lastBackSyncedBlock: {root: anchorBlockRoot, slot: anchorBlock.message.slot, block: anchorBlock},
};
this.metrics?.backfillSync.totalBlocks.inc({method: BackfillSyncMethod.blockbyroot});
this.logger.verbose("Fetched new anchorBlock", {
root: toRootHex(anchorBlockRoot),
slot: anchorBlock.data.message.slot,
slot: anchorBlock.message.slot,
});
return;
@@ -825,15 +831,33 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
nextAnchor.slot > this.prevFinalizedCheckpointBlock.slot
? verifiedBlocks
: verifiedBlocks.slice(0, verifiedBlocks.length - 1);
await this.db.blockArchive.batchPutBinary(
blocksToPut.map((block) => ({
key: block.data.message.slot,
value: block.bytes,
slot: block.data.message.slot,
blockRoot: this.config.getForkTypes(block.data.message.slot).BeaconBlock.hashTreeRoot(block.data.message),
parentRoot: block.data.message.parentRoot,
}))
);
const binaryPuts = [];
const nonBinaryPuts = [];
for (const block of blocksToPut) {
const serialized = this.chain.serializedCache.get(block);
const item = {
key: block.message.slot,
slot: block.message.slot,
blockRoot: this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message),
parentRoot: block.message.parentRoot,
};
if (serialized) {
binaryPuts.push({...item, value: serialized});
} else {
nonBinaryPuts.push({...item, value: block});
}
}
if (binaryPuts.length > 0) {
await this.db.blockArchive.batchPutBinary(binaryPuts);
}
if (nonBinaryPuts.length > 0) {
await this.db.blockArchive.batchPut(nonBinaryPuts);
}
this.metrics?.backfillSync.totalBlocks.inc({method: BackfillSyncMethod.rangesync}, verifiedBlocks.length);
}

View File

@@ -1,7 +1,7 @@
import {BeaconConfig} from "@lodestar/config";
import {GENESIS_SLOT} from "@lodestar/params";
import {CachedBeaconStateAllForks, ISignatureSet, getBlockProposerSignatureSet} from "@lodestar/state-transition";
import {Root, SignedBeaconBlock, Slot, WithBytes, ssz} from "@lodestar/types";
import {Root, SignedBeaconBlock, Slot, ssz} from "@lodestar/types";
import {IBlsVerifier} from "../../chain/bls/index.js";
import {BackfillSyncError, BackfillSyncErrorCode} from "./errors.js";
@@ -14,19 +14,19 @@ export type BackfillBlock = BackfillBlockHeader & {block: SignedBeaconBlock};
export function verifyBlockSequence(
config: BeaconConfig,
blocks: WithBytes<SignedBeaconBlock>[],
blocks: SignedBeaconBlock[],
anchorRoot: Root
): {
nextAnchor: BackfillBlock | null;
verifiedBlocks: WithBytes<SignedBeaconBlock>[];
verifiedBlocks: SignedBeaconBlock[];
error?: BackfillSyncErrorCode.NOT_LINEAR;
} {
let nextRoot: Root = anchorRoot;
let nextAnchor: BackfillBlock | null = null;
const verifiedBlocks: WithBytes<SignedBeaconBlock>[] = [];
const verifiedBlocks: SignedBeaconBlock[] = [];
for (const block of blocks.reverse()) {
const blockRoot = config.getForkTypes(block.data.message.slot).BeaconBlock.hashTreeRoot(block.data.message);
const blockRoot = config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
if (!ssz.Root.equals(blockRoot, nextRoot)) {
if (ssz.Root.equals(nextRoot, anchorRoot)) {
throw new BackfillSyncError({code: BackfillSyncErrorCode.NOT_ANCHORED});
@@ -34,8 +34,8 @@ export function verifyBlockSequence(
return {nextAnchor, verifiedBlocks, error: BackfillSyncErrorCode.NOT_LINEAR};
}
verifiedBlocks.push(block);
nextAnchor = {block: block.data, slot: block.data.message.slot, root: nextRoot};
nextRoot = block.data.message.parentRoot;
nextAnchor = {block: block, slot: block.message.slot, root: nextRoot};
nextRoot = block.message.parentRoot;
}
return {nextAnchor, verifiedBlocks};
}
@@ -43,12 +43,12 @@ export function verifyBlockSequence(
export async function verifyBlockProposerSignature(
bls: IBlsVerifier,
state: CachedBeaconStateAllForks,
blocks: WithBytes<SignedBeaconBlock>[]
blocks: SignedBeaconBlock[]
): Promise<void> {
if (blocks.length === 1 && blocks[0].data.message.slot === GENESIS_SLOT) return;
if (blocks.length === 1 && blocks[0].message.slot === GENESIS_SLOT) return;
const signatures = blocks.reduce((sigs: ISignatureSet[], block) => {
// genesis block doesn't have valid signature
if (block.data.message.slot !== GENESIS_SLOT) sigs.push(getBlockProposerSignatureSet(state, block.data));
if (block.message.slot !== GENESIS_SLOT) sigs.push(getBlockProposerSignatureSet(state, block));
return sigs;
}, []);

View File

@@ -254,7 +254,7 @@ export async function requestByRange({
if (blocksRequest) {
requests.push(
network.sendBeaconBlocksByRange(peerIdStr, blocksRequest).then((blockResponse) => {
blocks = blockResponse.map(({data}) => data);
blocks = blockResponse;
})
);
}

View File

@@ -314,7 +314,7 @@ export async function fetchAndValidateBlock({
blockRoot,
}: Omit<FetchByRootAndValidateBlockProps, "chain">): Promise<SignedBeaconBlock> {
const response = await network.sendBeaconBlocksByRoot(peerIdStr, [blockRoot]);
const block = response.at(0)?.data;
const block = response.at(0);
if (!block) {
throw new DownloadByRootError({
code: DownloadByRootErrorCode.MISSING_BLOCK_RESPONSE,

View File

@@ -102,7 +102,7 @@ function runTests({useWorker}: {useWorker: boolean}): void {
expect(returnedBlocks).toHaveLength(req.count);
for (const [i, returnedBlock] of returnedBlocks.entries()) {
expect(ssz.phase0.SignedBeaconBlock.equals(returnedBlock.data, blocks[i])).toBe(true);
expect(ssz.phase0.SignedBeaconBlock.equals(returnedBlock, blocks[i])).toBe(true);
}
});

View File

@@ -4,8 +4,7 @@ import {fileURLToPath} from "node:url";
import {describe, expect, it} from "vitest";
import {createBeaconConfig} from "@lodestar/config";
import {config} from "@lodestar/config/default";
import {WithBytes, phase0, ssz} from "@lodestar/types";
import {ZERO_HASH} from "../../../../src/constants/constants.js";
import {phase0, ssz} from "@lodestar/types";
import {BackfillSyncError, BackfillSyncErrorCode} from "./../../../../src/sync/backfill/errors.js";
import {verifyBlockSequence} from "../../../../src/sync/backfill/verify.js";
@@ -23,9 +22,7 @@ describe("backfill sync - verify block sequence", () => {
it("should verify valid chain of blocks", () => {
const blocks = getBlocks();
expect(() =>
verifyBlockSequence(beaconConfig, blocks.slice(0, 2), blocks[2].data.message.parentRoot)
).not.toThrow();
expect(() => verifyBlockSequence(beaconConfig, blocks.slice(0, 2), blocks[2].message.parentRoot)).not.toThrow();
});
it("should fail with sequence not anchored", () => {
@@ -42,20 +39,20 @@ describe("backfill sync - verify block sequence", () => {
beaconConfig,
// remove middle block
blocks
.filter((b) => b.data.message.slot !== 2)
.filter((b) => b.message.slot !== 2)
.slice(0, blocks.length - 2),
// biome-ignore lint/style/noNonNullAssertion: using .at
blocks.at(-1)?.data.message.parentRoot!
blocks.at(-1)?.message.parentRoot!
);
if (error != null) throw new BackfillSyncError({code: error});
}).toThrow(BackfillSyncErrorCode.NOT_LINEAR);
});
//first 4 mainnet blocks
function getBlocks(): WithBytes<phase0.SignedBeaconBlock>[] {
function getBlocks(): phase0.SignedBeaconBlock[] {
const json = JSON.parse(fs.readFileSync(path.join(__dirname, "./blocks.json"), "utf-8")) as unknown[];
return json.map((b) => {
return {data: ssz.phase0.SignedBeaconBlock.fromJson(b), bytes: ZERO_HASH};
return ssz.phase0.SignedBeaconBlock.fromJson(b);
});
}
});

View File

@@ -12,7 +12,6 @@ import {BlockInputSource} from "../../../src/chain/blocks/blockInput/types.js";
import {BlockError, BlockErrorCode} from "../../../src/chain/errors/blockError.js";
import {ChainEvent, IBeaconChain} from "../../../src/chain/index.js";
import {SeenBlockProposers} from "../../../src/chain/seenCache/seenBlockProposers.js";
import {ZERO_HASH} from "../../../src/constants/constants.js";
import {INetwork, NetworkEventBus, PeerAction} from "../../../src/network/index.js";
import {PeerSyncMeta} from "../../../src/network/peers/peersData.js";
import {defaultSyncOptions} from "../../../src/sync/options.js";
@@ -146,11 +145,8 @@ describe.skip(
sendBeaconBlocksByRootResolveFn([_peerId, roots]);
const correctBlocks = Array.from(roots)
.map((root) => blocksByRoot.get(toHexString(root)))
.filter(notNullish)
.map((data) => ({data, bytes: ZERO_HASH}));
return wrongBlockRoot
? [{data: ssz.phase0.SignedBeaconBlock.defaultValue(), bytes: ZERO_HASH}]
: correctBlocks;
.filter(notNullish);
return wrongBlockRoot ? [ssz.phase0.SignedBeaconBlock.defaultValue()] : correctBlocks;
},
reportPeer: async (peerId, action, actionName) => reportPeerResolveFn([peerId, action, actionName]),

View File

@@ -44,7 +44,7 @@ describe("downloadByRoot.ts", () => {
it("should successfully fetch and validate block with matching root", async () => {
network = {
sendBeaconBlocksByRoot: vi.fn(() => [{data: capellaBlock.block}]),
sendBeaconBlocksByRoot: vi.fn(() => [capellaBlock.block]),
} as unknown as INetwork;
const response = await fetchAndValidateBlock({
@@ -74,7 +74,7 @@ describe("downloadByRoot.ts", () => {
it("should throw error when block root doesn't match requested root", async () => {
network = {
sendBeaconBlocksByRoot: vi.fn(() => [{data: capellaBlock.block}]),
sendBeaconBlocksByRoot: vi.fn(() => [capellaBlock.block]),
} as unknown as INetwork;
const invalidRoot = randomBytes(ROOT_SIZE);