Fix Commitments Check (#14493)

* Fix Commitments Check

* `highestFinalizedEpoch`: Refactor (no functional change).

* `retrieveMissingDataColumnsFromPeers`: Fix logs.

* `VerifyDataColumnSidecarKZGProofs`: Optimise with capacity.

* Save data columns when initial syncing.

* `dataColumnSidecarsByRangeRPCHandler`: Add logs when a request enters.

* Improve logging.

* Improve logging.

* `peersWithDataColumns: Do not filter any more on peer head slot.

* Fix Nishant's comment.

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
This commit is contained in:
Nishant Das
2024-10-04 18:27:28 +08:00
committed by GitHub
parent f29c6a9ee8
commit ae626d5bb2
25 changed files with 403 additions and 100 deletions

View File

@@ -233,7 +233,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return err
}
}
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {
nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), b); err != nil {
return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot())
}
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),

View File

@@ -238,7 +238,9 @@ func (s *Service) handleDA(
if err != nil {
return 0, err
}
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil {
nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), rob); err != nil {
return 0, errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)")
}
} else {

View File

@@ -104,6 +104,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
WithStateGen(stateGen),
WithPayloadIDCache(cache.NewPayloadIDCache()),
WithClockSynchronizer(startup.NewClockSynchronizer()),
WithP2PBroadcaster(&mockAccesser{}),
}
chainService, err := NewService(ctx, opts...)

View File

@@ -134,6 +134,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithSyncChecker(mock.MockChecker{}),
WithExecutionEngineCaller(&mockExecution.EngineClient{}),
WithP2PBroadcaster(&mockAccesser{}),
}
// append the variadic opts so they override the defaults by being processed afterwards
opts = append(defOpts, opts...)

View File

@@ -94,7 +94,7 @@ func CustodyColumnSubnets(nodeId enode.ID, custodySubnetCount uint64) (map[uint6
func CustodyColumns(nodeId enode.ID, custodySubnetCount uint64) (map[uint64]bool, error) {
dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount
// Compute the custodied subnets.
// Compute the custody subnets.
subnetIds, err := CustodyColumnSubnets(nodeId, custodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody subnets")
@@ -408,17 +408,23 @@ func DataColumnSidecarsForReconstruct(
// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
if sc.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
numberOfColumns := params.BeaconConfig().NumberOfColumns
if sc.ColumnIndex >= numberOfColumns {
return false, errIndexTooLarge
}
if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
return false, errMismatchLength
}
var commitments []kzg.Bytes48
var indices []uint64
var cells []kzg.Cell
var proofs []kzg.Bytes48
count := len(sc.DataColumn)
commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)
for i := range sc.DataColumn {
commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i]))
indices = append(indices, sc.ColumnIndex)

View File

@@ -12,6 +12,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/das",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
@@ -30,6 +31,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"availability_columns_test.go",
"availability_test.go",
"cache_test.go",
],
@@ -37,6 +39,7 @@ go_test(
deps = [
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
@@ -45,6 +48,7 @@ go_test(
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
@@ -80,7 +81,7 @@ func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.RO
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := commitmentsToCheck(b, current)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())

View File

@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -75,39 +76,58 @@ func (s *LazilyPersistentStoreColumn) PersistColumns(current primitives.Slot, sc
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := fullCommitmentsToCheck(b, current)
func (s *LazilyPersistentStoreColumn) IsDataAvailable(
ctx context.Context,
nodeID enode.ID,
currentSlot primitives.Slot,
block blocks.ROBlock,
) error {
blockCommitments, err := fullCommitmentsToCheck(nodeID, block, currentSlot)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())
return errors.Wrapf(err, "full commitments to check with block root `%#x` and current slot `%d`", block.Root(), currentSlot)
}
// Return early for blocks that are pre-deneb or which do not have any commitments.
// Return early for blocks that do not have any commitments.
if blockCommitments.count() == 0 {
return nil
}
key := keyFromBlock(b)
// Build the cache key for the block.
key := keyFromBlock(block)
// Retrieve the cache entry for the block, or create an empty one if it doesn't exist.
entry := s.cache.ensure(key)
// Delete the cache entry for the block at the end.
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
// Get the root of the block.
blockRoot := block.Root()
// Wait for the summarizer to be ready before proceeding.
summarizer, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
log.
WithField("root", fmt.Sprintf("%#x", blockRoot)).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
// Get the summary for the block, and set it in the cache entry.
summary := summarizer.Summary(blockRoot)
entry.setDiskSummary(summary)
}
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
sidecars, err := entry.filterColumns(root, &blockCommitments)
sidecars, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
}
// Do thorough verifications of each BlobSidecar for the block.
// Same as above, we don't save BlobSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, b, sidecars)
// Do thorough verifications of each RODataColumns for the block.
// Same as above, we don't save DataColumnsSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, block, sidecars)
if err != nil {
var me verification.VerificationMultiError
ok := errors.As(err, &me)
@@ -120,33 +140,62 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre
log.WithFields(lf).
Debug("invalid ColumnSidecars received")
}
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", root)
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", blockRoot)
}
// Ensure that each column sidecar is written to disk.
for i := range vscs {
if err := s.store.SaveDataColumn(vscs[i]); err != nil {
return errors.Wrapf(err, "failed to save ColumnSidecar index %d for block %#x", vscs[i].ColumnIndex, root)
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", vscs[i].ColumnIndex, blockRoot)
}
}
// All ColumnSidecars are persisted - da check succeeds.
// All ColumnSidecars are persisted - data availability check succeeds.
return nil
}
func fullCommitmentsToCheck(b blocks.ROBlock, current primitives.Slot) (safeCommitmentsArray, error) {
var ar safeCommitmentsArray
if b.Version() < version.Deneb {
return ar, nil
// fullCommitmentsToCheck returns the commitments to check for a given block.
func fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot primitives.Slot) (*safeCommitmentsArray, error) {
// Return early for blocks that are pre-deneb.
if block.Version() < version.Deneb {
return &safeCommitmentsArray{}, nil
}
// We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return ar, nil
// Compute the block epoch.
blockSlot := block.Block().Slot()
blockEpoch := slots.ToEpoch(blockSlot)
// Compute the current spoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Return early if the request is out of the MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS window.
if !params.WithinDAPeriod(blockEpoch, currentEpoch) {
return &safeCommitmentsArray{}, nil
}
kc, err := b.Block().Body().BlobKzgCommitments()
// Retrieve the KZG commitments for the block.
kzgCommitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return ar, err
return nil, errors.Wrap(err, "blob KZG commitments")
}
for i := range ar {
copy(ar[i], kc)
// Return early if there are no commitments in the block.
if len(kzgCommitments) == 0 {
return &safeCommitmentsArray{}, nil
}
return ar, nil
// Retrieve the custody columns.
custodySubnetCount := peerdas.CustodySubnetCount()
custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
// Create a safe commitments array for the custody columns.
commitmentsArray := &safeCommitmentsArray{}
for column := range custodyColumns {
commitmentsArray[column] = kzgCommitments
}
return commitmentsArray, nil
}

View File

@@ -0,0 +1,94 @@
package das
import (
"testing"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
func TestFullCommitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) blocks.ROBlock
slot primitives.Slot
err error
}{
{
name: "pre deneb",
block: func(t *testing.T) blocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := blocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeToAllSubnets = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
b := c.block(t)
co, err := fullCommitmentsToCheck(enode.ID{}, b, c.slot)
if c.err != nil {
require.ErrorIs(t, err, c.err)
} else {
require.NoError(t, err)
}
for i := 0; i < len(co); i++ {
require.DeepEqual(t, c.commits, co[i])
}
})
}
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"testing"
"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
@@ -124,18 +125,18 @@ func TestLazilyPersistent_Missing(t *testing.T) {
// Only one commitment persisted, should return error with other indices
require.NoError(t, as.Persist(1, scs[2]))
err := as.IsDataAvailable(ctx, 1, blk)
err := as.IsDataAvailable(ctx, enode.ID{}, 1, blk)
require.ErrorIs(t, err, errMissingSidecar)
// All but one persisted, return missing idx
require.NoError(t, as.Persist(1, scs[0]))
err = as.IsDataAvailable(ctx, 1, blk)
err = as.IsDataAvailable(ctx, enode.ID{}, 1, blk)
require.ErrorIs(t, err, errMissingSidecar)
// All persisted, return nil
require.NoError(t, as.Persist(1, scs...))
require.NoError(t, as.IsDataAvailable(ctx, 1, blk))
require.NoError(t, as.IsDataAvailable(ctx, enode.ID{}, 1, blk))
}
func TestLazilyPersistent_Mismatch(t *testing.T) {
@@ -150,7 +151,7 @@ func TestLazilyPersistent_Mismatch(t *testing.T) {
// Only one commitment persisted, should return error with other indices
require.NoError(t, as.Persist(1, scs[0]))
err := as.IsDataAvailable(ctx, 1, blk)
err := as.IsDataAvailable(ctx, enode.ID{}, 1, blk)
require.NotNil(t, err)
require.ErrorIs(t, err, errCommitmentMismatch)
}

View File

@@ -134,33 +134,37 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return scs, nil
}
func (e *cacheEntry) filterColumns(root [32]byte, kc *safeCommitmentsArray) ([]blocks.RODataColumn, error) {
if e.diskSummary.AllAvailable(kc.count()) {
func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitmentsArray) ([]blocks.RODataColumn, error) {
nonEmptyIndices := commitmentsArray.nonEmptyIndices()
if e.diskSummary.AllDataColumnsAvailable(nonEmptyIndices) {
return nil, nil
}
scs := make([]blocks.RODataColumn, 0, kc.count())
commitmentsCount := commitmentsArray.count()
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)
for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
// We already have this blob, we don't need to write it or validate it.
// Skip if we arleady store this data column.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.colScs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)
}
if commitmentsArray[i] == nil {
continue
}
if e.colScs[i] == nil {
return nil, errors.Wrapf(errMissingSidecar, "root=%#x, index=%#x", root, i)
}
if !reflect.DeepEqual(kc[i], e.colScs[i].KzgCommitments) {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, kc[i])
if !reflect.DeepEqual(commitmentsArray[i], e.colScs[i].KzgCommitments) {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, commitmentsArray[i])
}
scs = append(scs, *e.colScs[i])
sidecars = append(sidecars, *e.colScs[i])
}
return scs, nil
return sidecars, nil
}
// safeCommitmentArray is a fixed size array of commitment byte slices. This is helpful for avoiding
@@ -176,13 +180,32 @@ func (s safeCommitmentArray) count() int {
return fieldparams.MaxBlobsPerBlock
}
// safeCommitmentsArray is a fixed size array of commitments.
// This is helpful for avoiding gratuitous bounds checks.
type safeCommitmentsArray [fieldparams.NumberOfColumns][][]byte
// count returns the number of commitments in the array.
func (s *safeCommitmentsArray) count() int {
count := 0
for i := range s {
if s[i] == nil {
return i
if s[i] != nil {
count++
}
}
return fieldparams.NumberOfColumns
return count
}
// nonEmptyIndices returns a map of indices that are non-nil in the array.
func (s *safeCommitmentsArray) nonEmptyIndices() map[uint64]bool {
columns := make(map[uint64]bool)
for i := range s {
if s[i] != nil {
columns[uint64(i)] = true
}
}
return columns
}

View File

@@ -3,6 +3,7 @@ package das
import (
"context"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
@@ -14,6 +15,6 @@ import (
// IsDataAvailable guarantees that all blobs committed to in the block have been
// durably persisted before returning a non-error value.
type AvailabilityStore interface {
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
IsDataAvailable(ctx context.Context, nodeID enode.ID, current primitives.Slot, b blocks.ROBlock) error
Persist(current primitives.Slot, sc ...blocks.ROBlob) error
}

View File

@@ -3,6 +3,7 @@ package das
import (
"context"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
@@ -16,7 +17,7 @@ type MockAvailabilityStore struct {
var _ AvailabilityStore = &MockAvailabilityStore{}
// IsDataAvailable satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests.
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error {
if m.VerifyAvailabilityCallback != nil {
return m.VerifyAvailabilityCallback(ctx, current, b)
}

View File

@@ -53,7 +53,7 @@ func (f *ForkChoice) ShouldOverrideFCU() (override bool) {
// Only reorg blocks that arrive late
early, err := head.arrivedEarly(f.store.genesisTime)
if err != nil {
log.WithError(err).Error("could not check if block arrived early")
log.WithError(err).Error("Could not check if block arrived early")
return
}
if early {

View File

@@ -54,7 +54,7 @@ func (vs *Server) eth1DataMajorityVote(ctx context.Context, beaconState state.Be
// by ETH1_FOLLOW_DISTANCE. The head state should maintain the same ETH1Data until this condition has passed, so
// trust the existing head for the right eth1 vote until we can get a meaningful value from the deposit contract.
if latestValidTime < genesisTime+followDistanceSeconds {
log.WithField("genesisTime", genesisTime).WithField("latestValidTime", latestValidTime).Warn("voting period before genesis + follow distance, using eth1data from head")
log.WithField("genesisTime", genesisTime).WithField("latestValidTime", latestValidTime).Warn("Voting period before genesis + follow distance, using eth1data from head")
return vs.HeadFetcher.HeadETH1Data(), nil
}

View File

@@ -101,7 +101,7 @@ func (vs *Server) getLocalPayloadFromEngine(
return nil, errors.Wrap(err, "could not get cached payload from execution client")
}
}
log.WithFields(logFields).Debug("payload ID cache miss")
log.WithFields(logFields).Debug("Payload ID cache miss")
parentHash, err := vs.getParentBlockHash(ctx, st, slot)
switch {
case errors.Is(err, errActivationNotReached) || errors.Is(err, errNoTerminalBlockHash):
@@ -190,7 +190,7 @@ func (vs *Server) getLocalPayloadFromEngine(
}
warnIfFeeRecipientDiffers(val.FeeRecipient[:], res.ExecutionData.FeeRecipient())
log.WithField("value", res.Bid).Debug("received execution payload from local engine")
log.WithField("value", res.Bid).Debug("Received execution payload from local engine")
return res, nil
}

View File

@@ -41,6 +41,7 @@ go_library(
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",

View File

@@ -4,6 +4,7 @@ import (
"context"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
@@ -88,8 +89,11 @@ func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []
status.LowParentRoot, highest.Root(), status.LowSlot, highest.Block().Slot())
}
// TODO: Use the real node ID when backfill is implemented for data columns.
emptyNodeID := enode.ID{}
for i := range blocks {
if err := store.IsDataAvailable(ctx, current, blocks[i]); err != nil {
if err := store.IsDataAvailable(ctx, emptyNodeID, current, blocks[i]); err != nil {
return nil, err
}
}

View File

@@ -48,6 +48,7 @@ go_library(
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_pkg_errors//:go_default_library",

View File

@@ -705,11 +705,11 @@ func (f *blocksFetcher) blocksWithMissingDataColumnsBoundaries(
}
// custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`.
func (f *blocksFetcher) custodyAllNeededColumns(inputPeers []peer.ID, columns map[uint64]bool) ([]peer.ID, error) {
outputPeers := make([]peer.ID, 0, len(inputPeers))
func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) {
outputPeers := make(map[peer.ID]bool, len(inputPeers))
loop:
for _, peer := range inputPeers {
for peer := range inputPeers {
// Get the node ID from the peer ID.
nodeID, err := p2p.ConvertPeerIDToNodeID(peer)
if err != nil {
@@ -731,7 +731,7 @@ loop:
}
}
outputPeers = append(outputPeers, peer)
outputPeers[peer] = true
}
return outputPeers, nil
@@ -842,11 +842,16 @@ func maxInt(slice []int) int {
func (f *blocksFetcher) requestDataColumnsFromPeers(
ctx context.Context,
request *p2ppb.DataColumnSidecarsByRangeRequest,
peers []peer.ID,
peers map[peer.ID]bool,
) ([]blocks.RODataColumn, peer.ID, error) {
peersSlice := make([]peer.ID, 0, len(peers))
for peer := range peers {
peersSlice = append(peersSlice, peer)
}
// Shuffle peers to avoid always querying the same peers
f.rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
f.rand.Shuffle(len(peersSlice), func(i, j int) {
peersSlice[i], peersSlice[j] = peersSlice[j], peersSlice[i]
})
var columnsLog interface{} = "all"
@@ -863,7 +868,7 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(
"items": request.Count * columnsCount,
})
for _, peer := range peers {
for _, peer := range peersSlice {
log := log.WithField("peer", peer)
if ctx.Err() != nil {
@@ -1071,7 +1076,7 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
}
// Filter peers.
filteredPeers, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns)
filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns)
if err != nil {
return errors.Wrap(err, "peers with slot and data columns")
}
@@ -1081,11 +1086,16 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
WithFields(logrus.Fields{
"peers": peersToFilter,
"filteredPeers": filteredPeers,
"delay": delay,
"waitDuration": delay,
"targetSlot": lastSlot,
}).
Warning("No peers available to retrieve missing data columns, retrying later")
// If no peers are available, log the descriptions to help debugging.
for _, description := range descriptions {
log.Debug(description)
}
time.Sleep(delay)
continue
}
@@ -1112,7 +1122,7 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
if len(roDataColumns) == 0 {
log.
WithFields(logrus.Fields{
"peers": peers,
"peers": peersToFilter,
"filteredPeers": filteredPeers,
"delay": delay,
"start": startSlot,

View File

@@ -1370,25 +1370,36 @@ func TestCustodyAllNeededColumns(t *testing.T) {
4 * params.BeaconConfig().CustodyRequirement,
32 * params.BeaconConfig().CustodyRequirement,
4 * params.BeaconConfig().CustodyRequirement,
32 * params.BeaconConfig().CustodyRequirement}
peersID := make([]peer.ID, 0, len(custodyCounts))
for _, custodyCount := range custodyCounts {
peerRecord, peerID := createPeer(t, len(peersID), custodyCount)
peersID = append(peersID, peerID)
p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound)
32 * params.BeaconConfig().CustodyRequirement,
}
expected := []peer.ID{peersID[1], peersID[3]}
expected := make(map[peer.ID]bool)
blocksFetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{
p2p: p2p,
})
peersID := make(map[peer.ID]bool, len(custodyCounts))
for _, custodyCount := range custodyCounts {
peerRecord, peerID := createPeer(t, len(peersID), custodyCount)
peersID[peerID] = true
p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound)
if custodyCount == 32*params.BeaconConfig().CustodyRequirement {
expected[peerID] = true
}
}
blocksFetcher := newBlocksFetcher(
context.Background(),
&blocksFetcherConfig{
p2p: p2p,
},
)
actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns)
require.NoError(t, err)
require.DeepSSZEqual(t, expected, actual)
require.Equal(t, len(expected), len(actual))
for peerID := range expected {
_, ok := actual[peerID]
require.Equal(t, true, ok)
}
}
func TestCustodyColumns(t *testing.T) {

View File

@@ -374,7 +374,7 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns(
peers []peer.ID,
targetSlot primitives.Slot,
dataColumns map[uint64]bool,
) ([]peer.ID, error) {
) (map[peer.ID]bool, []string, error) {
peersCount := len(peers)
// TODO: Uncomment when we are not in devnet any more.
@@ -390,23 +390,58 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns(
// TODO: Modify to retrieve data columns from all possible peers.
// TODO: If a peer does respond some of the request columns, do not re-request responded columns.
peersWithAdmissibleHeadSlot := make([]peer.ID, 0, peersCount)
// Compute the target epoch from the target slot.
targetEpoch := slots.ToEpoch(targetSlot)
// Filter out peers with head slot lower than the target slot.
peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, peersCount)
descriptions := make([]string, 0, peersCount)
// Filter out peers with head epoch lower than our target epoch.
// Technically, we should be able to use the head slot from the peer.
// However, our vision of the head slot of the peer is updated twice per epoch
// via P2P messages. So it is likely that we think the peer is lagging behind
// while it is actually not.
// ==> We use the head epoch as a proxy instead.
// However, if the peer is actually lagging for a few slots,
// we may requests some data columns it doesn't have yet.
for _, peer := range peers {
peerChainState, err := f.p2p.Peers().ChainState(peer)
if err != nil || peerChainState == nil || peerChainState.HeadSlot < targetSlot {
if err != nil {
description := fmt.Sprintf("peer %s: error: %s", peer, err)
descriptions = append(descriptions, description)
continue
}
peersWithAdmissibleHeadSlot = append(peersWithAdmissibleHeadSlot, peer)
if peerChainState == nil {
description := fmt.Sprintf("peer %s: chain state is nil", peer)
descriptions = append(descriptions, description)
continue
}
peerHeadEpoch := slots.ToEpoch(peerChainState.HeadSlot)
if peerHeadEpoch < targetEpoch {
description := fmt.Sprintf("peer %s: head epoch %d < target epoch %d", peer, peerHeadEpoch, targetEpoch)
descriptions = append(descriptions, description)
continue
}
peersWithAdmissibleHeadEpoch[peer] = true
}
// Filter out peers that do not have all the data columns needed.
finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadSlot, dataColumns)
finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadEpoch, dataColumns)
if err != nil {
return nil, errors.Wrap(err, "custody all needed columns")
return nil, nil, errors.Wrap(err, "custody all needed columns")
}
return finalPeers, nil
for peer := range peersWithAdmissibleHeadEpoch {
if _, ok := finalPeers[peer]; !ok {
description := fmt.Sprintf("peer %s: does not custody all needed columns", peer)
descriptions = append(descriptions, description)
}
}
return finalPeers, descriptions, nil
}

View File

@@ -234,12 +234,18 @@ func syncFields(b blocks.ROBlock) logrus.Fields {
}
// highestFinalizedEpoch returns the absolute highest finalized epoch of all connected peers.
// Note this can be lower than our finalized epoch if we have no peers or peers that are all behind us.
// It returns `0` if no peers are connected.
// Note this can be lower than our finalized epoch if our connected peers are all behind us.
func (s *Service) highestFinalizedEpoch() primitives.Epoch {
highest := primitives.Epoch(0)
for _, pid := range s.cfg.P2P.Peers().Connected() {
peerChainState, err := s.cfg.P2P.Peers().ChainState(pid)
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch > highest {
if err != nil || peerChainState == nil {
continue
}
if peerChainState.FinalizedEpoch > highest {
highest = peerChainState.FinalizedEpoch
}
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
@@ -408,7 +409,10 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
if err := avs.Persist(current, sidecars...); err != nil {
return err
}
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
// node ID is not used for checking blobs data availability.
emptyNodeID := enode.ID{}
if err := avs.IsDataAvailable(s.ctx, emptyNodeID, current, rob); err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
continue
}
@@ -462,7 +466,9 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error {
if err := avs.PersistColumns(current, sidecars...); err != nil {
return err
}
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(s.ctx, nodeID, current, rob); err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Columns from peer for origin block were unusable")
continue
}

View File

@@ -6,7 +6,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
@@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -69,12 +70,54 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", p2p.DataColumnSidecarsByRangeName[1:]) // slice the leading slash off the name var
r, ok := msg.(*pb.DataColumnSidecarsByRangeRequest)
if !ok {
return errors.New("message is not type *pb.DataColumnSidecarsByRangeRequest")
}
// Compute custody columns.
nodeID := s.cfg.p2p.NodeID()
numberOfColumns := params.BeaconConfig().NumberOfColumns
custodySubnetCount := peerdas.CustodySubnetCount()
custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount)
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream)
return err
}
custodyColumnsCount := uint64(len(custodyColumns))
// Compute requested columns.
requestedColumns := r.Columns
requestedColumnsCount := uint64(len(requestedColumns))
// Format log fields.
var (
custodyColumnsLog interface{} = "all"
requestedColumnsLog interface{} = "all"
)
if custodyColumnsCount != numberOfColumns {
custodyColumnsLog = uint64MapToSortedSlice(custodyColumns)
}
if requestedColumnsCount != numberOfColumns {
requestedColumnsLog = requestedColumns
}
// Get the remote peer.
remotePeer := stream.Conn().RemotePeer()
log.WithFields(logrus.Fields{
"remotePeer": remotePeer,
"custodyColumns": custodyColumnsLog,
"requestedColumns": requestedColumnsLog,
"startSlot": r.StartSlot,
"count": r.Count,
}).Debug("Serving data columns by range request")
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
return err
}