Compare commits

...

2 Commits

Author SHA1 Message Date
kasey
a0071826c5 improve peer scoring code in range sync (#15173)
* separate block/blob peer scoring

* Preston's test coverage feedback

* test to ensure we don't combine distinct errors

---------

Co-authored-by: Kasey <kasey@users.noreply.github.com>
(cherry picked from commit 8418157f8a)
2025-04-17 13:43:11 -05:00
james-prysm
3d3f336d16 fix expected withdrawals (#15191)
* fixed underflow with expected withdrawals

* update comment

* Revert "update comment"

This reverts commit e07da541ac.

* attempting to fix comment indents

* fixing another missed tab in comments

* trying tabs one more time for fixing tabs

* adding undeflow safety

* fixing error typo

* missed wrapping the error

(cherry picked from commit 9c00b06966)
2025-04-17 13:43:00 -05:00
20 changed files with 401 additions and 91 deletions

View File

@@ -102,6 +102,9 @@ func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) {
// Increment increments the number of bad responses we have received from the given remote peer.
// If peer doesn't exist this method is no-op.
func (s *BadResponsesScorer) Increment(pid peer.ID) {
if pid == "" {
return
}
s.store.Lock()
defer s.store.Unlock()

View File

@@ -124,6 +124,9 @@ func (s *BlockProviderScorer) Params() *BlockProviderScorerConfig {
// IncrementProcessedBlocks increments the number of blocks that have been successfully processed.
func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64) {
if pid == "" {
return
}
s.store.Lock()
defer s.store.Unlock()
defer s.touchNoLock(pid)

View File

@@ -62,9 +62,11 @@ func (b *BeaconState) NextWithdrawalValidatorIndex() (primitives.ValidatorIndex,
//
// validator = state.validators[withdrawal.index]
// has_sufficient_effective_balance = validator.effective_balance >= MIN_ACTIVATION_BALANCE
// has_excess_balance = state.balances[withdrawal.index] > MIN_ACTIVATION_BALANCE
// total_withdrawn = sum(w.amount for w in withdrawals if w.validator_index == withdrawal.validator_index)
// balance = state.balances[withdrawal.validator_index] - total_withdrawn
// has_excess_balance = balance > MIN_ACTIVATION_BALANCE
// if validator.exit_epoch == FAR_FUTURE_EPOCH and has_sufficient_effective_balance and has_excess_balance:
// withdrawable_balance = min(state.balances[withdrawal.index] - MIN_ACTIVATION_BALANCE, withdrawal.amount)
// withdrawable_balance = min(balance - MIN_ACTIVATION_BALANCE, withdrawal.amount)
// withdrawals.append(Withdrawal(
// index=withdrawal_index,
// validator_index=withdrawal.index,
@@ -132,9 +134,19 @@ func (b *BeaconState) ExpectedWithdrawals() ([]*enginev1.Withdrawal, uint64, err
return nil, 0, fmt.Errorf("could not retrieve balance at index %d: %w", w.Index, err)
}
hasSufficientEffectiveBalance := v.EffectiveBalance() >= params.BeaconConfig().MinActivationBalance
hasExcessBalance := vBal > params.BeaconConfig().MinActivationBalance
var totalWithdrawn uint64
for _, wi := range withdrawals {
if wi.ValidatorIndex == w.Index {
totalWithdrawn += wi.Amount
}
}
balance, err := mathutil.Sub64(vBal, totalWithdrawn)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to subtract balance %d with total withdrawn %d", vBal, totalWithdrawn)
}
hasExcessBalance := balance > params.BeaconConfig().MinActivationBalance
if v.ExitEpoch() == params.BeaconConfig().FarFutureEpoch && hasSufficientEffectiveBalance && hasExcessBalance {
amount := min(vBal-params.BeaconConfig().MinActivationBalance, w.Amount)
amount := min(balance-params.BeaconConfig().MinActivationBalance, w.Amount)
withdrawals = append(withdrawals, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: w.Index,
@@ -165,7 +177,10 @@ func (b *BeaconState) ExpectedWithdrawals() ([]*enginev1.Withdrawal, uint64, err
partiallyWithdrawnBalance += w.Amount
}
}
balance = balance - partiallyWithdrawnBalance
balance, err = mathutil.Sub64(balance, partiallyWithdrawnBalance)
if err != nil {
return nil, 0, errors.Wrapf(err, "could not subtract balance %d with partial withdrawn balance %d", balance, partiallyWithdrawnBalance)
}
}
if helpers.IsFullyWithdrawableValidator(val, balance, epoch, b.version) {
withdrawals = append(withdrawals, &enginev1.Withdrawal{

View File

@@ -416,3 +416,37 @@ func TestExpectedWithdrawals(t *testing.T) {
require.DeepEqual(t, withdrawalFull, expected[1])
})
}
func TestExpectedWithdrawals_underflow_electra(t *testing.T) {
s, err := state_native.InitializeFromProtoUnsafeElectra(&ethpb.BeaconStateElectra{})
require.NoError(t, err)
vals := make([]*ethpb.Validator, 1)
balances := make([]uint64, 1)
balances[0] = 2015_000_000_000 //Validator A begins leaking ETH due to inactivity, and over time, its balance decreases to 2,015 ETH
val := &ethpb.Validator{
WithdrawalCredentials: make([]byte, 32),
EffectiveBalance: params.BeaconConfig().MaxEffectiveBalanceElectra,
WithdrawableEpoch: primitives.Epoch(0),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
}
val.WithdrawalCredentials[0] = params.BeaconConfig().CompoundingWithdrawalPrefixByte
val.WithdrawalCredentials[31] = byte(0)
vals[0] = val
require.NoError(t, s.SetValidators(vals))
require.NoError(t, s.SetBalances(balances))
require.NoError(t, s.AppendPendingPartialWithdrawal(&ethpb.PendingPartialWithdrawal{
Amount: 1008_000_000_000,
WithdrawableEpoch: primitives.Epoch(0),
}))
require.NoError(t, s.AppendPendingPartialWithdrawal(&ethpb.PendingPartialWithdrawal{
Amount: 1008_000_000_000,
WithdrawableEpoch: primitives.Epoch(0),
}))
expected, _, err := s.ExpectedWithdrawals()
require.NoError(t, err)
require.Equal(t, 3, len(expected)) // is a fully withdrawable validator
require.Equal(t, uint64(1008_000_000_000), expected[0].Amount)
require.Equal(t, uint64(975_000_000_000), expected[1].Amount)
require.Equal(t, uint64(32_000_000_000), expected[2].Amount)
}

View File

@@ -60,6 +60,7 @@ go_test(
"blocks_fetcher_test.go",
"blocks_fetcher_utils_test.go",
"blocks_queue_test.go",
"downscore_test.go",
"fsm_benchmark_test.go",
"fsm_test.go",
"initial_sync_test.go",
@@ -70,6 +71,7 @@ go_test(
tags = ["CI_race_detection"],
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
@@ -78,6 +80,7 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
@@ -105,6 +108,7 @@ go_test(
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],

View File

@@ -120,11 +120,20 @@ type fetchRequestParams struct {
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
type fetchRequestResponse struct {
pid peer.ID
start primitives.Slot
count uint64
bwb []blocks.BlockWithROBlobs
err error
blocksFrom peer.ID
blobsFrom peer.ID
start primitives.Slot
count uint64
bwb []blocks.BlockWithROBlobs
err error
}
func (r *fetchRequestResponse) blocksQueueFetchedData() *blocksQueueFetchedData {
return &blocksQueueFetchedData{
blocksFrom: r.blocksFrom,
blobsFrom: r.blobsFrom,
bwb: r.bwb,
}
}
// newBlocksFetcher creates ready to use fetcher.
@@ -314,13 +323,14 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
}
}
response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
response.bwb, response.blocksFrom, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
if response.err == nil {
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers)
pid, bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.blocksFrom, peers)
if err != nil {
response.err = err
}
response.bwb = bwb
response.blobsFrom = pid
}
return response
}
@@ -537,20 +547,20 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
}
// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks.BlockWithROBlobs, pid peer.ID, peers []peer.ID) ([]blocks.BlockWithROBlobs, error) {
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks.BlockWithROBlobs, pid peer.ID, peers []peer.ID) (peer.ID, []blocks.BlockWithROBlobs, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
defer span.End()
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
return bwb, nil
return "", bwb, nil
}
blobWindowStart, err := prysmsync.BlobRPCMinValidSlot(f.clock.CurrentSlot())
if err != nil {
return nil, err
return "", nil, err
}
// Construct request message based on observed interval of blocks in need of blobs.
req := countCommitments(bwb, blobWindowStart).blobRange(f.bs).Request()
if req == nil {
return bwb, nil
return "", bwb, nil
}
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
// We dial the initial peer first to ensure that we get the desired set of blobs.
@@ -573,9 +583,9 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks.Blo
log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response")
continue
}
return robs, err
return p, robs, err
}
return nil, errNoPeersAvailable
return "", nil, errNoPeersAvailable
}
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.

View File

@@ -22,8 +22,9 @@ import (
// Blocks are stored in an ascending slot order. The first block is guaranteed to have parent
// either in DB or initial sync cache.
type forkData struct {
peer peer.ID
bwb []blocks.BlockWithROBlobs
blocksFrom peer.ID
blobsFrom peer.ID
bwb []blocks.BlockWithROBlobs
}
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
@@ -280,13 +281,13 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
}
// We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import
// the blocks.
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
bpid, bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}
// The caller will use the BlocksWith VerifiedBlobs in bwb as the starting point for
// round-robin syncing the alternate chain.
return &forkData{peer: pid, bwb: bwb}, nil
return &forkData{blocksFrom: pid, blobsFrom: bpid, bwb: bwb}, nil
}
return nil, errNoAlternateBlocks
}
@@ -302,13 +303,15 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
if err != nil {
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
}
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
var bpid peer.ID
bpid, bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
}
return &forkData{
peer: pid,
bwb: bwb,
blocksFrom: pid,
bwb: bwb,
blobsFrom: bpid,
}, nil
}
// Request block's parent.

View File

@@ -263,7 +263,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
reqEnd := testForkStartSlot(t, 251) + primitives.Slot(findForkReqRangeSize())
require.Equal(t, primitives.Slot(len(chain1)), fork.bwb[0].Block.Block().Slot())
require.Equal(t, int(reqEnd-forkSlot1b), len(fork.bwb))
require.Equal(t, curForkMoreBlocksPeer, fork.peer)
require.Equal(t, curForkMoreBlocksPeer, fork.blocksFrom)
// Save all chain1b blocks (so that they do not interfere with alternative fork)
for _, blk := range chain1b {
util.SaveBlock(t, ctx, beaconDB, blk)
@@ -283,7 +283,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
alternativePeer := connectPeerHavingBlocks(t, p2p, chain2, finalizedSlot, p2p.Peers())
fork, err = fetcher.findFork(ctx, 251)
require.NoError(t, err)
assert.Equal(t, alternativePeer, fork.peer)
assert.Equal(t, alternativePeer, fork.blocksFrom)
assert.Equal(t, 65, len(fork.bwb))
ind := forkSlot
for _, blk := range fork.bwb {

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
beaconsync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
@@ -93,8 +94,9 @@ type blocksQueue struct {
// blocksQueueFetchedData is a data container that is returned from a queue on each step.
type blocksQueueFetchedData struct {
pid peer.ID
bwb []blocks.BlockWithROBlobs
blocksFrom peer.ID
blobsFrom peer.ID
bwb []blocks.BlockWithROBlobs
}
// newBlocksQueue creates initialized priority queue.
@@ -337,13 +339,15 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
}
if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) {
// Peer returned invalid data, penalize.
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid)
log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks")
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blocksFrom)
log.WithField("pid", response.blocksFrom).Debug("Peer is penalized for invalid blocks")
} else if errors.Is(response.err, verification.ErrBlobInvalid) {
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blobsFrom)
log.WithField("pid", response.blobsFrom).Debug("Peer is penalized for invalid blob response")
}
return m.state, response.err
}
m.pid = response.pid
m.bwb = response.bwb
m.fetched = *response
return stateDataParsed, nil
}
}
@@ -358,19 +362,15 @@ func (q *blocksQueue) onReadyToSendEvent(ctx context.Context) eventHandlerFn {
return m.state, errInvalidInitialState
}
if len(m.bwb) == 0 {
if m.numFetched() == 0 {
return stateSkipped, nil
}
send := func() (stateID, error) {
data := &blocksQueueFetchedData{
pid: m.pid,
bwb: m.bwb,
}
select {
case <-ctx.Done():
return m.state, ctx.Err()
case q.fetchedData <- data:
case q.fetchedData <- m.fetched.blocksQueueFetchedData():
}
return stateSent, nil
}

View File

@@ -472,8 +472,8 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
updatedState, err := handlerFn(&stateMachine{
state: stateScheduled,
}, &fetchRequestResponse{
pid: "abc",
err: errSlotIsTooHigh,
blocksFrom: "abc",
err: errSlotIsTooHigh,
})
assert.ErrorContains(t, errSlotIsTooHigh.Error(), err)
assert.Equal(t, stateScheduled, updatedState)
@@ -495,9 +495,9 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
updatedState, err := handlerFn(&stateMachine{
state: stateScheduled,
}, &fetchRequestResponse{
pid: "abc",
err: errSlotIsTooHigh,
start: 256,
blocksFrom: "abc",
err: errSlotIsTooHigh,
start: 256,
})
assert.ErrorContains(t, errSlotIsTooHigh.Error(), err)
assert.Equal(t, stateScheduled, updatedState)
@@ -517,8 +517,8 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
updatedState, err := handlerFn(&stateMachine{
state: stateScheduled,
}, &fetchRequestResponse{
pid: "abc",
err: beaconsync.ErrInvalidFetchedData,
blocksFrom: "abc",
err: beaconsync.ErrInvalidFetchedData,
})
assert.ErrorContains(t, beaconsync.ErrInvalidFetchedData.Error(), err)
assert.Equal(t, stateScheduled, updatedState)
@@ -537,7 +537,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
wsbCopy, err := wsb.Copy()
require.NoError(t, err)
response := &fetchRequestResponse{
pid: "abc",
blocksFrom: "abc",
bwb: []blocks.BlockWithROBlobs{
{Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsb}},
{Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsbCopy}},
@@ -546,13 +546,15 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
fsm := &stateMachine{
state: stateScheduled,
}
assert.Equal(t, peer.ID(""), fsm.pid)
assert.Equal(t, 0, len(fsm.bwb))
assert.Equal(t, peer.ID(""), fsm.fetched.blocksFrom)
assert.Equal(t, peer.ID(""), fsm.fetched.blobsFrom)
assert.Equal(t, 0, fsm.numFetched())
updatedState, err := handlerFn(fsm, response)
assert.NoError(t, err)
assert.Equal(t, stateDataParsed, updatedState)
assert.Equal(t, response.pid, fsm.pid)
assert.DeepSSZEqual(t, response.bwb, fsm.bwb)
assert.Equal(t, response.blocksFrom, fsm.fetched.blocksFrom)
assert.Equal(t, response.blobsFrom, fsm.fetched.blobsFrom)
assert.DeepSSZEqual(t, response.bwb, fsm.fetched.bwb)
})
}
@@ -635,10 +637,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
queue.smm.addStateMachine(256)
queue.smm.addStateMachine(320)
queue.smm.machines[256].state = stateDataParsed
queue.smm.machines[256].pid = pidDataParsed
queue.smm.machines[256].fetched.blocksFrom = pidDataParsed
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
queue.smm.machines[256].bwb = []blocks.BlockWithROBlobs{
queue.smm.machines[256].fetched.bwb = []blocks.BlockWithROBlobs{
{Block: rwsb},
}
@@ -669,10 +671,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
queue.smm.machines[256].state = stateDataParsed
queue.smm.addStateMachine(320)
queue.smm.machines[320].state = stateDataParsed
queue.smm.machines[320].pid = pidDataParsed
queue.smm.machines[320].fetched.blocksFrom = pidDataParsed
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
queue.smm.machines[320].bwb = []blocks.BlockWithROBlobs{
queue.smm.machines[320].fetched.bwb = []blocks.BlockWithROBlobs{
{Block: rwsb},
}
@@ -700,10 +702,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
queue.smm.machines[256].state = stateSkipped
queue.smm.addStateMachine(320)
queue.smm.machines[320].state = stateDataParsed
queue.smm.machines[320].pid = pidDataParsed
queue.smm.machines[320].fetched.blocksFrom = pidDataParsed
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
queue.smm.machines[320].bwb = []blocks.BlockWithROBlobs{
queue.smm.machines[320].fetched.bwb = []blocks.BlockWithROBlobs{
{Block: rwsb},
}
@@ -1199,17 +1201,17 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
firstFSM, ok := queue.smm.findStateMachine(forkedSlot)
require.Equal(t, true, ok)
require.Equal(t, stateDataParsed, firstFSM.state)
require.Equal(t, forkedPeer, firstFSM.pid)
require.Equal(t, forkedPeer, firstFSM.fetched.blocksFrom)
reqEnd := testForkStartSlot(t, 251) + primitives.Slot(findForkReqRangeSize())
require.Equal(t, int(reqEnd-forkedSlot), len(firstFSM.bwb))
require.Equal(t, forkedSlot, firstFSM.bwb[0].Block.Block().Slot())
require.Equal(t, int(reqEnd-forkedSlot), len(firstFSM.fetched.bwb))
require.Equal(t, forkedSlot, firstFSM.fetched.bwb[0].Block.Block().Slot())
// Assert that forked data from chain2 is available (within 64 fetched blocks).
for i, blk := range chain2[forkedSlot:] {
if i >= len(firstFSM.bwb) {
if i >= len(firstFSM.fetched.bwb) {
break
}
rootFromFSM := firstFSM.bwb[i].Block.Root()
rootFromFSM := firstFSM.fetched.bwb[i].Block.Root()
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
assert.Equal(t, blkRoot, rootFromFSM)
@@ -1217,7 +1219,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
// Assert that machines are in the expected state.
startSlot = forkedEpochStartSlot.Add(1 + blocksPerRequest)
require.Equal(t, int(blocksPerRequest)-int(forkedSlot-(forkedEpochStartSlot+1)), len(firstFSM.bwb))
require.Equal(t, int(blocksPerRequest)-int(forkedSlot-(forkedEpochStartSlot+1)), len(firstFSM.fetched.bwb))
for i := startSlot; i < startSlot.Add(blocksPerRequest*(lookaheadSteps-1)); i += primitives.Slot(blocksPerRequest) {
fsm, ok := queue.smm.findStateMachine(i)
require.Equal(t, true, ok)

View File

@@ -24,8 +24,8 @@ func (q *blocksQueue) resetFromFork(fork *forkData) error {
return err
}
fsm := q.smm.addStateMachine(firstBlock.Slot())
fsm.pid = fork.peer
fsm.bwb = fork.bwb
fsm.fetched.bwb = fork.bwb
fsm.fetched.blocksFrom, fsm.fetched.blobsFrom = fork.blocksFrom, fork.blobsFrom
fsm.state = stateDataParsed
// The rest of machines are in skipped state.

View File

@@ -0,0 +1,219 @@
package initialsync
import (
"context"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata"
p2pt "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
)
type testDownscorePeer int
const (
testDownscoreNeither testDownscorePeer = iota
testDownscoreBlock
testDownscoreBlob
)
func peerIDForTestDownscore(w testDownscorePeer, name string) peer.ID {
switch w {
case testDownscoreBlock:
return peer.ID("block" + name)
case testDownscoreBlob:
return peer.ID("blob" + name)
default:
return ""
}
}
func TestUpdatePeerScorerStats(t *testing.T) {
cases := []struct {
name string
err error
processed uint64
downPeer testDownscorePeer
}{
{
name: "invalid block",
err: blockchain.ErrInvalidPayload,
downPeer: testDownscoreBlock,
processed: 10,
},
{
name: "invalid blob",
err: verification.ErrBlobIndexInvalid,
downPeer: testDownscoreBlob,
processed: 3,
},
{
name: "not validity error",
err: errors.New("test"),
processed: 32,
},
{
name: "no error",
processed: 32,
},
}
s := &Service{
cfg: &Config{
P2P: p2pt.NewTestP2P(t),
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
data := &blocksQueueFetchedData{
blocksFrom: peerIDForTestDownscore(testDownscoreBlock, c.name),
blobsFrom: peerIDForTestDownscore(testDownscoreBlob, c.name),
}
s.updatePeerScorerStats(data, c.processed, c.err)
if c.err != nil && c.downPeer != testDownscoreNeither {
switch c.downPeer {
case testDownscoreBlock:
// block should be downscored
blocksCount, err := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Count(data.blocksFrom)
require.NoError(t, err)
require.Equal(t, 1, blocksCount)
// blob should not be downscored - also we expect a not found error since peer scoring did not interact with blobs
blobCount, err := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Count(data.blobsFrom)
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blobCount)
case testDownscoreBlob:
// block should not be downscored - also we expect a not found error since peer scoring did not interact with blocks
blocksCount, err := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Count(data.blocksFrom)
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blocksCount)
// blob should be downscored
blobCount, err := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Count(data.blobsFrom)
require.NoError(t, err)
require.Equal(t, 1, blobCount)
}
assert.Equal(t, uint64(0), s.cfg.P2P.Peers().Scorers().BlockProviderScorer().ProcessedBlocks(data.blocksFrom))
return
}
// block should not be downscored - also we expect a not found error since peer scoring did not interact with blocks
blocksCount, err := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Count(data.blocksFrom)
// The scorer will know about the the block peer because it will have a processed blocks count
require.NoError(t, err)
require.Equal(t, 0, blocksCount)
// no downscore, so scorer doesn't know the peer
blobCount, err := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Count(data.blobsFrom)
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blobCount)
assert.Equal(t, c.processed, s.cfg.P2P.Peers().Scorers().BlockProviderScorer().ProcessedBlocks(data.blocksFrom))
})
}
}
func TestOnDataReceivedDownscore(t *testing.T) {
cases := []struct {
name string
err error
downPeer testDownscorePeer
}{
{
name: "invalid block",
err: sync.ErrInvalidFetchedData,
downPeer: testDownscoreBlock,
},
{
name: "invalid blob",
err: errors.Wrap(verification.ErrBlobInvalid, "test"),
downPeer: testDownscoreBlob,
},
{
name: "not validity error",
err: errors.New("test"),
},
{
name: "no error",
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
data := &fetchRequestResponse{
blocksFrom: peerIDForTestDownscore(testDownscoreBlock, c.name),
blobsFrom: peerIDForTestDownscore(testDownscoreBlob, c.name),
err: c.err,
}
if c.downPeer == testDownscoreBlob {
require.Equal(t, true, verification.IsBlobValidationFailure(c.err))
}
ctx := context.Background()
p2p := p2pt.NewTestP2P(t)
mc := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p2p,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
})
q := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: p2p,
blocksFetcher: fetcher,
highestExpectedSlot: primitives.Slot(32),
chain: mc})
sm := q.smm.addStateMachine(0)
sm.state = stateScheduled
handle := q.onDataReceivedEvent(context.Background())
endState, err := handle(sm, data)
if c.err != nil {
require.ErrorIs(t, err, c.err)
} else {
require.NoError(t, err)
}
// state machine should stay in "scheduled" if there's an error
// and transition to "data parsed" if there's no error
if c.err != nil {
require.Equal(t, stateScheduled, endState)
} else {
require.Equal(t, stateDataParsed, endState)
}
if c.err != nil && c.downPeer != testDownscoreNeither {
switch c.downPeer {
case testDownscoreBlock:
// block should be downscored
blocksCount, err := p2p.Peers().Scorers().BadResponsesScorer().Count(data.blocksFrom)
require.NoError(t, err)
require.Equal(t, 1, blocksCount)
// blob should not be downscored - also we expect a not found error since peer scoring did not interact with blobs
blobCount, err := p2p.Peers().Scorers().BadResponsesScorer().Count(data.blobsFrom)
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blobCount)
case testDownscoreBlob:
// block should not be downscored - also we expect a not found error since peer scoring did not interact with blocks
blocksCount, err := p2p.Peers().Scorers().BadResponsesScorer().Count(data.blocksFrom)
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blocksCount)
// blob should be downscored
blobCount, err := p2p.Peers().Scorers().BadResponsesScorer().Count(data.blobsFrom)
require.NoError(t, err)
require.Equal(t, 1, blobCount)
}
assert.Equal(t, uint64(0), p2p.Peers().Scorers().BlockProviderScorer().ProcessedBlocks(data.blocksFrom))
return
}
// block should not be downscored - also we expect a not found error since peer scoring did not interact with blocks
blocksCount, err := p2p.Peers().Scorers().BadResponsesScorer().Count(data.blocksFrom)
// no downscore, so scorer doesn't know the peer
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blocksCount)
blobCount, err := p2p.Peers().Scorers().BadResponsesScorer().Count(data.blobsFrom)
// no downscore, so scorer doesn't know the peer
require.ErrorIs(t, err, peerdata.ErrPeerUnknown)
require.Equal(t, -1, blobCount)
})
}
}

View File

@@ -6,11 +6,9 @@ import (
"sort"
"time"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
prysmTime "github.com/OffchainLabs/prysm/v6/time"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
@@ -45,8 +43,7 @@ type stateMachine struct {
smm *stateMachineManager
start primitives.Slot
state stateID
pid peer.ID
bwb []blocks.BlockWithROBlobs
fetched fetchRequestResponse
updated time.Time
}
@@ -78,7 +75,7 @@ func (smm *stateMachineManager) addStateMachine(startSlot primitives.Slot) *stat
smm: smm,
start: startSlot,
state: stateNew,
bwb: []blocks.BlockWithROBlobs{},
fetched: fetchRequestResponse{},
updated: prysmTime.Now(),
}
smm.recalculateMachineAttribs()
@@ -90,7 +87,7 @@ func (smm *stateMachineManager) removeStateMachine(startSlot primitives.Slot) er
if _, ok := smm.machines[startSlot]; !ok {
return fmt.Errorf("state for machine %v is not found", startSlot)
}
smm.machines[startSlot].bwb = nil
smm.machines[startSlot].fetched = fetchRequestResponse{}
delete(smm.machines, startSlot)
smm.recalculateMachineAttribs()
return nil
@@ -187,6 +184,10 @@ func (m *stateMachine) isLast() bool {
return m.start == m.smm.keys[len(m.smm.keys)-1]
}
func (m *stateMachine) numFetched() int {
return len(m.fetched.bwb)
}
// String returns human-readable representation of a FSM state.
func (m *stateMachine) String() string {
return fmt.Sprintf("{%d:%s}", slots.ToEpoch(m.start), m.state)

View File

@@ -14,7 +14,6 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -127,7 +126,7 @@ func (s *Service) syncToNonFinalizedEpoch(ctx context.Context) error {
}
for data := range queue.fetchedData {
count, err := s.processFetchedDataRegSync(ctx, data)
s.updatePeerScorerStats(data.pid, count, err)
s.updatePeerScorerStats(data, count, err)
}
log.WithFields(logrus.Fields{
"syncedSlot": s.cfg.Chain.HeadSlot(),
@@ -147,7 +146,7 @@ func (s *Service) processFetchedData(ctx context.Context, data *blocksQueueFetch
if err != nil {
log.WithError(err).Warn("Skip processing batched blocks")
}
s.updatePeerScorerStats(data.pid, count, err)
s.updatePeerScorerStats(data, count, err)
}
// processFetchedDataRegSync processes data received from queue.
@@ -339,18 +338,19 @@ func isPunishableError(err error) bool {
}
// updatePeerScorerStats adjusts monitored metrics for a peer.
func (s *Service) updatePeerScorerStats(pid peer.ID, count uint64, err error) {
if pid == "" {
return
}
func (s *Service) updatePeerScorerStats(data *blocksQueueFetchedData, count uint64, err error) {
if isPunishableError(err) {
log.WithError(err).WithField("peer_id", pid).Warn("Incrementing peers bad response count")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(pid)
if verification.IsBlobValidationFailure(err) {
log.WithError(err).WithField("peer_id", data.blobsFrom).Warn("Downscoring peer for invalid blobs")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blobsFrom)
} else {
log.WithError(err).WithField("peer_id", data.blocksFrom).Warn("Downscoring peer for invalid blocks")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blocksFrom)
}
// If the error is punishable, exit here so that we don't give them credit for providing bad blocks.
return
}
scorer := s.cfg.P2P.Peers().Scorers().BlockProviderScorer()
scorer.IncrementProcessedBlocks(pid, count)
s.cfg.P2P.Peers().Scorers().BlockProviderScorer().IncrementProcessedBlocks(data.blocksFrom, count)
}
// isProcessedBlock checks DB and local cache for presence of a given block, to avoid duplicates.

View File

@@ -156,7 +156,7 @@ func readChunkEncodedBlobsLowMax(t *testing.T, s *Service, expect []*expectedBlo
}
return func(stream network.Stream) {
_, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf, 1)
require.ErrorIs(t, err, ErrInvalidFetchedData)
require.ErrorIs(t, err, errMaxRequestBlobSidecarsExceeded)
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
@@ -30,14 +31,14 @@ var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
var (
// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
ErrInvalidFetchedData = errors.New("invalid data returned from peer")
errBlobIndexOutOfBounds = errors.Wrap(ErrInvalidFetchedData, "blob index out of range")
errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit")
errChunkResponseSlotNotAsc = errors.Wrap(ErrInvalidFetchedData, "blob slot not higher than previous block root")
errChunkResponseIndexNotAsc = errors.Wrap(ErrInvalidFetchedData, "blob indices for a block must start at 0 and increase by 1")
errUnrequested = errors.Wrap(ErrInvalidFetchedData, "received BlobSidecar in response that was not requested")
errBlobResponseOutOfBounds = errors.Wrap(ErrInvalidFetchedData, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(ErrInvalidFetchedData, "blob block details do not match")
errChunkResponseParentMismatch = errors.Wrap(ErrInvalidFetchedData, "parent root for response element doesn't match previous element root")
errBlobIndexOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "blob index out of range")
errMaxRequestBlobSidecarsExceeded = errors.Wrap(verification.ErrBlobInvalid, "peer exceeded req blob chunk tx limit")
errChunkResponseSlotNotAsc = errors.Wrap(verification.ErrBlobInvalid, "blob slot not higher than previous block root")
errChunkResponseIndexNotAsc = errors.Wrap(verification.ErrBlobInvalid, "blob indices for a block must start at 0 and increase by 1")
errUnrequested = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar in response that was not requested")
errBlobResponseOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(verification.ErrBlobInvalid, "blob block details do not match")
errChunkResponseParentMismatch = errors.Wrap(verification.ErrBlobInvalid, "parent root for response element doesn't match previous element root")
)
// BeaconBlockProcessor defines a block processing function, which allows to start utilizing

View File

@@ -12,6 +12,7 @@ import (
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
p2pTypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -877,3 +878,7 @@ func TestSendBlobsByRangeRequest(t *testing.T) {
assert.Equal(t, int(totalElectraBlobs), len(blobs))
})
}
func TestErrInvalidFetchedDataDistinction(t *testing.T) {
require.Equal(t, false, errors.Is(ErrInvalidFetchedData, verification.ErrBlobInvalid))
}

View File

@@ -16,6 +16,11 @@ func AsVerificationFailure(err error) error {
return errors.Join(ErrInvalid, err)
}
// IsBlobValidationFailure checks if the given error is a blob validation failure.
func IsBlobValidationFailure(err error) bool {
return errors.Is(err, ErrBlobInvalid)
}
var (
// ErrBlobInvalid is joined with all other blob verification errors. This enables other packages to check for any sort of
// verification error at one point, like sync code checking for peer scoring purposes.

View File

@@ -0,0 +1,3 @@
### Fixed
- fixed underflow with balances in leaking edge case with expected withdrawals.

View File

@@ -0,0 +1,2 @@
### Fixed
- Attribute block and blob issues to correct peers during range syncing.