fix: emit blob_sidecar event as soon as we receive BlobSidecars (#7967)

**Motivation**

Right now we only emit `blob_sidecar` event during block import when we
have received all blobs but this is not ideal as the event is mostly
used to gather timing information of when blobs are received by nodes in
the network. We should emit the event as soon as possible similar to
https://github.com/ChainSafe/lodestar/pull/7953.

**Description**

Emit `blob_sidecar` event as soon as we receive `BlobSidecar`
- through `publishBlock` api if we are proposer
- on `blob_sidecar` gossip topic from the network
- from engine api via `engine_getBlobsV1` method
- from req/resp via `blob_sidecars_by_root` method
This commit is contained in:
Nico Flaig
2025-06-23 10:16:23 +02:00
committed by GitHub
parent 4a12abd772
commit 855e58594f
7 changed files with 83 additions and 33 deletions

View File

@@ -74,7 +74,7 @@ export enum EventType {
lightClientFinalityUpdate = "light_client_finality_update",
/** Payload attributes for block proposal */
payloadAttributes = "payload_attributes",
/** The node has received a valid blobSidecar (from P2P or API) */
/** The node has received a valid BlobSidecar (from P2P or API) */
blobSidecar = "blob_sidecar",
}

View File

@@ -1,6 +1,12 @@
import {routes} from "@lodestar/api";
import {ApiError, ApplicationMethods} from "@lodestar/api/server";
import {ForkPostBellatrix, SLOTS_PER_HISTORICAL_ROOT, isForkPostBellatrix, isForkPostElectra} from "@lodestar/params";
import {
ForkName,
ForkPostBellatrix,
SLOTS_PER_HISTORICAL_ROOT,
isForkPostBellatrix,
isForkPostElectra,
} from "@lodestar/params";
import {
computeEpochAtSlot,
computeTimeAtSlot,
@@ -16,11 +22,12 @@ import {
deneb,
isSignedBlockContents,
} from "@lodestar/types";
import {fromHex, sleep, toRootHex} from "@lodestar/utils";
import {fromHex, sleep, toHex, toRootHex} from "@lodestar/utils";
import {
BlobsSource,
BlockInput,
BlockInputDataBlobs,
BlockInputType,
BlockSource,
ImportBlockOpts,
getBlockInput,
@@ -31,7 +38,7 @@ import {BlockError, BlockErrorCode, BlockGossipError} from "../../../../chain/er
import {validateGossipBlock} from "../../../../chain/validation/block.js";
import {OpSource} from "../../../../chain/validatorMonitor.js";
import {NetworkEvent} from "../../../../network/index.js";
import {computeBlobSidecars} from "../../../../util/blobs.js";
import {computeBlobSidecars, kzgCommitmentToVersionedHash} from "../../../../util/blobs.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {ApiModules} from "../../types.js";
@@ -200,8 +207,6 @@ export function getBeaconBlockApi({
await sleep(msToBlockSlot);
}
chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockRoot});
// TODO: Validate block
const delaySec =
seenTimestampSec - (chain.genesisTime + blockForImport.block.message.slot * config.SECONDS_PER_SLOT);
@@ -235,6 +240,29 @@ export function getBeaconBlockApi({
}),
];
await promiseAllMaybeAsync(publishPromises);
if (chain.emitter.listenerCount(routes.events.EventType.blockGossip)) {
chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockRoot});
}
if (
chain.emitter.listenerCount(routes.events.EventType.blobSidecar) &&
blockForImport.type === BlockInputType.availableData &&
(blockForImport.blockData.fork === ForkName.deneb || blockForImport.blockData.fork === ForkName.electra)
) {
const {blobs} = blockForImport.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
chain.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot,
slot,
index,
kzgCommitment: toHex(kzgCommitment),
versionedHash: toHex(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
}
};
const publishBlindedBlock: ApplicationMethods<routes.beacon.block.Endpoints>["publishBlindedBlock"] = async (

View File

@@ -18,9 +18,8 @@ import {
isStateValidatorsNodesPopulated,
} from "@lodestar/state-transition";
import {Attestation, BeaconBlock, altair, capella, electra, phase0, ssz} from "@lodestar/types";
import {isErrorAborted, toHex, toRootHex} from "@lodestar/utils";
import {isErrorAborted, toRootHex} from "@lodestar/utils";
import {ZERO_HASH_HEX} from "../../constants/index.js";
import {kzgCommitmentToVersionedHash} from "../../util/blobs.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {isOptimisticBlock} from "../../util/forkChoice.js";
import {isQueueErrorAborted} from "../../util/queue/index.js";
@@ -433,22 +432,6 @@ export async function importBlock(
this.emitter.emit(routes.events.EventType.proposerSlashing, proposerSlashing);
}
}
if (
blockInput.type === BlockInputType.availableData &&
this.emitter.listenerCount(routes.events.EventType.blobSidecar)
) {
const {blobs} = blockInput.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHex(kzgCommitment),
versionedHash: toHex(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
}
});
}

View File

@@ -13,7 +13,7 @@ import {
ssz,
sszTypesFor,
} from "@lodestar/types";
import {LogLevel, Logger, prettyBytes, toRootHex} from "@lodestar/utils";
import {LogLevel, Logger, prettyBytes, toHex, toRootHex} from "@lodestar/utils";
import {
BlobSidecarValidation,
BlockInput,
@@ -53,6 +53,7 @@ import {validateLightClientFinalityUpdate} from "../../chain/validation/lightCli
import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js";
import {OpSource} from "../../chain/validatorMonitor.js";
import {Metrics} from "../../metrics/index.js";
import {kzgCommitmentToVersionedHash} from "../../util/blobs.js";
import {INetworkCore} from "../core/index.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {
@@ -168,7 +169,9 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
logger.debug("Validated gossip block", {...logCtx, recvToValidation, validationTime});
chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockRootHex});
if (chain.emitter.listenerCount(routes.events.EventType.blockGossip)) {
chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockRootHex});
}
return blockInput;
} catch (e) {
@@ -197,8 +200,8 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
const blobBlockHeader = blobSidecar.signedBlockHeader.message;
const slot = blobBlockHeader.slot;
const fork = config.getForkName(slot);
const blockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobBlockHeader);
const blockHex = prettyBytes(blockRoot);
const blockRootHex = toRootHex(ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobBlockHeader));
const blockShortHex = prettyBytes(blockRootHex);
const delaySec = chain.clock.secFromSlot(slot, seenTimestampSec);
const recvToValLatency = Date.now() / 1000 - seenTimestampSec;
@@ -220,9 +223,19 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
metrics?.gossipBlob.recvToValidation.observe(recvToValidation);
metrics?.gossipBlob.validationTime.observe(validationTime);
if (chain.emitter.listenerCount(routes.events.EventType.blobSidecar)) {
chain.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
index: blobSidecar.index,
kzgCommitment: toHex(blobSidecar.kzgCommitment),
versionedHash: toHex(kzgCommitmentToVersionedHash(blobSidecar.kzgCommitment)),
});
}
logger.debug("Received gossip blob", {
slot: slot,
root: blockHex,
root: blockShortHex,
currentSlot: chain.clock.currentSlot,
peerId: peerIdStr,
delaySec,
@@ -238,7 +251,7 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
if (e instanceof BlobSidecarGossipError) {
// Don't trigger this yet if full block and blobs haven't arrived yet
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput.block !== null) {
logger.debug("Gossip blob has error", {slot, root: blockHex, code: e.type.code});
logger.debug("Gossip blob has error", {slot, root: blockShortHex, code: e.type.code});
events.emit(NetworkEvent.unknownBlockParent, {blockInput, peer: peerIdStr});
}

View File

@@ -1,10 +1,11 @@
import {toHexString} from "@chainsafe/ssz";
import {routes} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, ForkSeq} from "@lodestar/params";
import {signedBlockToSignedHeader} from "@lodestar/state-transition";
import {RootHex, SignedBeaconBlock, deneb, phase0} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";
import {fromHex} from "@lodestar/utils";
import {fromHex, toHex} from "@lodestar/utils";
import {
BlobsSource,
BlockInput,
@@ -15,6 +16,7 @@ import {
getBlockInput,
getBlockInputBlobs,
} from "../../chain/blocks/types.js";
import {ChainEventEmitter} from "../../chain/emitter.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {IExecutionEngine} from "../../execution/index.js";
import {Metrics} from "../../metrics/index.js";
@@ -69,12 +71,13 @@ export async function unavailableBeaconBlobsByRoot(
unavailableBlockInput: BlockInput | NullBlockInput,
opts: {
metrics: Metrics | null;
emitter: ChainEventEmitter | null;
executionEngine: IExecutionEngine;
engineGetBlobsCache?: Map<RootHex, BlobAndProof | null>;
blockInputsRetryTrackerCache?: Set<RootHex>;
}
): Promise<BlockInput> {
const {executionEngine, metrics, engineGetBlobsCache, blockInputsRetryTrackerCache} = opts;
const {executionEngine, metrics, emitter, engineGetBlobsCache, blockInputsRetryTrackerCache} = opts;
if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.dataPromise) {
return unavailableBlockInput;
}
@@ -163,8 +166,9 @@ export async function unavailableBeaconBlobsByRoot(
for (let j = 0; j < versionedHashes.length; j++) {
const blobAndProof = blobAndProofs[j] ?? null;
const versionedHash = versionedHashes[j];
// save to cache for future reference
engineGetBlobsCache?.set(toHexString(versionedHashes[j]), blobAndProof);
engineGetBlobsCache?.set(toHexString(versionedHash), blobAndProof);
if (blobAndProof !== null) {
metrics?.blockInputFetchStats.dataPromiseBlobsEngineGetBlobsApiNotNull.inc();
@@ -180,6 +184,16 @@ export async function unavailableBeaconBlobsByRoot(
// for e.g. a blockInput that might be awaiting blobs promise fullfillment in
// verifyBlocksDataAvailability
blobsCache.set(blobSidecar.index, blobSidecar);
if (emitter?.listenerCount(routes.events.EventType.blobSidecar)) {
emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
index,
kzgCommitment: toHex(kzgCommitment),
versionedHash: toHex(versionedHash),
});
}
} else {
metrics?.blockInputFetchStats.dataPromiseBlobsDelayedGossipAvailable.inc();
metrics?.blockInputFetchStats.dataPromiseBlobsDelayedGossipAvailableSavedGetBlobsCompute.inc();
@@ -243,6 +257,16 @@ export async function unavailableBeaconBlobsByRoot(
// verifyBlocksDataAvailability
for (const blobSidecar of networkResBlobSidecars) {
blobsCache.set(blobSidecar.index, blobSidecar);
if (emitter?.listenerCount(routes.events.EventType.blobSidecar)) {
emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
index: blobSidecar.index,
kzgCommitment: toHex(blobSidecar.kzgCommitment),
versionedHash: toHex(kzgCommitmentToVersionedHash(blobSidecar.kzgCommitment)),
});
}
}
// check and see if all blobs are now available and in that case resolve availability

View File

@@ -538,6 +538,7 @@ export class UnknownBlockSync {
try {
const blockInput = await unavailableBeaconBlobsByRoot(this.config, this.network, peer, unavailableBlockInput, {
metrics: this.metrics,
emitter: this.chain.emitter,
executionEngine: this.chain.executionEngine,
engineGetBlobsCache: this.engineGetBlobsCache,
blockInputsRetryTrackerCache: this.blockInputsRetryTrackerCache,

View File

@@ -116,6 +116,7 @@ describe("unavailableBeaconBlobsByRoot", () => {
{
executionEngine: executionEngine as unknown as IExecutionEngine,
metrics: null,
emitter: null,
engineGetBlobsCache,
}
);