mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Retry fetch origin data column sidecars. (#15634)
* `convertToAddrInfo`: Add peer details on error. * `fetchOriginColumns`: Remove unused argument. * `fetchOriginColumns`: Fix typo * `isSidecarIndexRequested`: Log requested indices. * `fetchOriginColumns`: Retry. * Add changelog. * Fix Preston comment. * `custodyGroupCountFromPeerENR`: Add agent on error messages. * Update beacon-chain/sync/initial-sync/service.go Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * Fix `TestCustodyGroupCountFromPeer`. * `s.fetchOriginColumns`: Use `maxAttempts` and `delay` as parameters to ease unit testing. * Implement `TestFetchOriginColumns`. * `SendDataColumnSidecarsByRangeRequest` and `SendDataColumnSidecarsByRootRequest`: Add option to downscore the peer on RPC error. * `fetchOriginColumns`: Remove max attempts, and downscore peers on RPC fault. --------- Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
@@ -155,6 +155,7 @@ func (s *Service) custodyGroupCountFromPeerENR(pid peer.ID) uint64 {
|
||||
log := log.WithFields(logrus.Fields{
|
||||
"peerID": pid,
|
||||
"defaultValue": custodyRequirement,
|
||||
"agent": agentString(pid, s.Host()),
|
||||
})
|
||||
|
||||
// Retrieve the ENR of the peer.
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
|
||||
testp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
|
||||
@@ -269,6 +270,7 @@ func TestCustodyGroupCountFromPeer(t *testing.T) {
|
||||
service := &Service{
|
||||
peers: peers,
|
||||
metaData: tc.metadata,
|
||||
host: testp2p.NewTestP2P(t).Host(),
|
||||
}
|
||||
|
||||
// Retrieve the custody count from the remote peer.
|
||||
@@ -329,6 +331,7 @@ func TestCustodyGroupCountFromPeerENR(t *testing.T) {
|
||||
|
||||
service := &Service{
|
||||
peers: peers,
|
||||
host: testp2p.NewTestP2P(t).Host(),
|
||||
}
|
||||
|
||||
actual := service.custodyGroupCountFromPeerENR(pid)
|
||||
|
||||
@@ -684,7 +684,7 @@ func (s *Service) filterPeer(node *enode.Node) bool {
|
||||
|
||||
peerData, multiAddrs, err := convertToAddrInfo(node)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not convert to peer data")
|
||||
log.WithError(err).WithField("node", node.String()).Debug("Could not convert to peer data")
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -851,7 +851,7 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
|
||||
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) {
|
||||
multiAddrs, err := retrieveMultiAddrsFromNode(node)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, errors.Wrap(err, "retrieve multiaddrs from node")
|
||||
}
|
||||
|
||||
if len(multiAddrs) == 0 {
|
||||
|
||||
@@ -63,6 +63,7 @@ type TestP2P struct {
|
||||
custodyInfoMut sync.RWMutex // protects custodyGroupCount and earliestAvailableSlot
|
||||
earliestAvailableSlot primitives.Slot
|
||||
custodyGroupCount uint64
|
||||
enr *enr.Record
|
||||
}
|
||||
|
||||
// NewTestP2P initializes a new p2p test service.
|
||||
@@ -103,6 +104,7 @@ func NewTestP2P(t *testing.T, userOptions ...config.Option) *TestP2P {
|
||||
pubsub: ps,
|
||||
joinedTopics: map[string]*pubsub.Topic{},
|
||||
peers: peerStatuses,
|
||||
enr: new(enr.Record),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,8 +312,8 @@ func (p *TestP2P) Host() host.Host {
|
||||
}
|
||||
|
||||
// ENR returns the enr of the local peer.
|
||||
func (*TestP2P) ENR() *enr.Record {
|
||||
return new(enr.Record)
|
||||
func (p *TestP2P) ENR() *enr.Record {
|
||||
return p.enr
|
||||
}
|
||||
|
||||
// NodeID returns the node id of the local peer.
|
||||
|
||||
@@ -29,13 +29,14 @@ import (
|
||||
// DataColumnSidecarsParams stores the common parameters needed to
|
||||
// fetch data column sidecars from peers.
|
||||
type DataColumnSidecarsParams struct {
|
||||
Ctx context.Context // Context
|
||||
Tor blockchain.TemporalOracle // Temporal oracle, useful to get the current slot
|
||||
P2P prysmP2P.P2P // P2P network interface
|
||||
RateLimiter *leakybucket.Collector // Rate limiter for outgoing requests
|
||||
CtxMap ContextByteVersions // Context map, useful to know if a message is mapped to the correct fork
|
||||
Storage filesystem.DataColumnStorageReader // Data columns storage
|
||||
NewVerifier verification.NewDataColumnsVerifier // Data columns verifier to check to conformity of incoming data column sidecars
|
||||
Ctx context.Context // Context
|
||||
Tor blockchain.TemporalOracle // Temporal oracle, useful to get the current slot
|
||||
P2P prysmP2P.P2P // P2P network interface
|
||||
RateLimiter *leakybucket.Collector // Rate limiter for outgoing requests
|
||||
CtxMap ContextByteVersions // Context map, useful to know if a message is mapped to the correct fork
|
||||
Storage filesystem.DataColumnStorageReader // Data columns storage
|
||||
NewVerifier verification.NewDataColumnsVerifier // Data columns verifier to check to conformity of incoming data column sidecars
|
||||
DownscorePeerOnRPCFault bool // Downscore a peer if it commits an RPC fault. Not responding sidecars at all is considered as a fault.
|
||||
}
|
||||
|
||||
// FetchDataColumnSidecars retrieves data column sidecars from storage and peers for the given
|
||||
|
||||
@@ -34,6 +34,7 @@ go_library(
|
||||
"//beacon-chain/verification:go_default_library",
|
||||
"//cmd/beacon-chain/flags:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
@@ -108,7 +109,9 @@ go_test(
|
||||
"//time:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
|
||||
"@com_github_paulbellamy_ratecounter//:go_default_library",
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/rand"
|
||||
@@ -209,6 +210,8 @@ func (s *Service) Start() {
|
||||
|
||||
// fetchOriginSidecars fetches origin sidecars
|
||||
func (s *Service) fetchOriginSidecars(peers []peer.ID) error {
|
||||
const delay = 10 * time.Second // The delay between each attempt to fetch origin data column sidecars
|
||||
|
||||
blockRoot, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
|
||||
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
|
||||
return nil
|
||||
@@ -234,7 +237,7 @@ func (s *Service) fetchOriginSidecars(peers []peer.ID) error {
|
||||
blockVersion := roBlock.Version()
|
||||
|
||||
if blockVersion >= version.Fulu {
|
||||
if err := s.fetchOriginColumns(peers, roBlock); err != nil {
|
||||
if err := s.fetchOriginColumns(roBlock, delay); err != nil {
|
||||
return errors.Wrap(err, "fetch origin columns")
|
||||
}
|
||||
return nil
|
||||
@@ -391,7 +394,11 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID, rob blocks.ROBlock) error {
|
||||
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
|
||||
}
|
||||
|
||||
func (s *Service) fetchOriginColumns(pids []peer.ID, roBlock blocks.ROBlock) error {
|
||||
func (s *Service) fetchOriginColumns(roBlock blocks.ROBlock, delay time.Duration) error {
|
||||
const (
|
||||
errorMessage = "Failed to fetch origin data column sidecars"
|
||||
warningIteration = 10
|
||||
)
|
||||
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
|
||||
|
||||
// Return early if the origin block has no blob commitments.
|
||||
@@ -420,21 +427,40 @@ func (s *Service) fetchOriginColumns(pids []peer.ID, roBlock blocks.ROBlock) err
|
||||
root := roBlock.Root()
|
||||
|
||||
params := sync.DataColumnSidecarsParams{
|
||||
Ctx: s.ctx,
|
||||
Tor: s.clock,
|
||||
P2P: s.cfg.P2P,
|
||||
CtxMap: s.ctxMap,
|
||||
Storage: s.cfg.DataColumnStorage,
|
||||
NewVerifier: s.newDataColumnsVerifier,
|
||||
Ctx: s.ctx,
|
||||
Tor: s.clock,
|
||||
P2P: s.cfg.P2P,
|
||||
CtxMap: s.ctxMap,
|
||||
Storage: s.cfg.DataColumnStorage,
|
||||
NewVerifier: s.newDataColumnsVerifier,
|
||||
DownscorePeerOnRPCFault: true,
|
||||
}
|
||||
|
||||
verfifiedRoDataColumnsByRoot, err := sync.FetchDataColumnSidecars(params, []blocks.ROBlock{roBlock}, info.CustodyColumns)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetch data column sidecars")
|
||||
var verifiedRoDataColumnsByRoot map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn
|
||||
for attempt := uint64(0); ; attempt++ {
|
||||
verifiedRoDataColumnsByRoot, err = sync.FetchDataColumnSidecars(params, []blocks.ROBlock{roBlock}, info.CustodyColumns)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
log := log.WithError(err).WithFields(logrus.Fields{
|
||||
"attempt": attempt,
|
||||
"delay": delay,
|
||||
})
|
||||
|
||||
if attempt%warningIteration == 0 && attempt > 0 {
|
||||
log.Warning(errorMessage)
|
||||
time.Sleep(delay)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug(errorMessage)
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
// Save origin data columns to disk.
|
||||
verifiedRoDataColumnsSidecars, ok := verfifiedRoDataColumnsByRoot[root]
|
||||
verifiedRoDataColumnsSidecars, ok := verifiedRoDataColumnsByRoot[root]
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot extract origins data column sidecars for block root %#x - should never happen", root)
|
||||
}
|
||||
@@ -447,7 +473,7 @@ func (s *Service) fetchOriginColumns(pids []peer.ID, roBlock blocks.ROBlock) err
|
||||
"blockRoot": fmt.Sprintf("%#x", roBlock.Root()),
|
||||
"blobCount": len(commitments),
|
||||
"columnCount": len(verifiedRoDataColumnsSidecars),
|
||||
}).Info("Successfully downloaded data columns for checkpoint sync block")
|
||||
}).Info("Successfully downloaded data column sidecars for checkpoint sync block")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package initialsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -13,8 +14,12 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/kv"
|
||||
dbtest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
testp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
prysmSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
@@ -22,10 +27,14 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
@@ -663,3 +672,147 @@ func TestFetchOriginSidecars(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFetchOriginColumns(t *testing.T) {
|
||||
// Load the trusted setup.
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Setup test environment
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
cfg.FuluForkEpoch = 0
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
const (
|
||||
delay = 0
|
||||
blobCount = 1
|
||||
)
|
||||
|
||||
t.Run("block has no commitments", func(t *testing.T) {
|
||||
service := new(Service)
|
||||
|
||||
// Create a block with no blob commitments
|
||||
block := util.NewBeaconBlockFulu()
|
||||
signedBlock, err := blocks.NewSignedBeaconBlock(block)
|
||||
require.NoError(t, err)
|
||||
roBlock, err := blocks.NewROBlock(signedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = service.fetchOriginColumns(roBlock, delay)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("FetchDataColumnSidecars succeeds immediately", func(t *testing.T) {
|
||||
storage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
p2p := p2ptest.NewTestP2P(t)
|
||||
|
||||
service := &Service{
|
||||
cfg: &Config{
|
||||
P2P: p2p,
|
||||
DataColumnStorage: storage,
|
||||
},
|
||||
}
|
||||
|
||||
// Create a block with blob commitments and sidecars
|
||||
roBlock, _, verifiedSidecars := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
|
||||
|
||||
// Store all sidecars in advance so FetchDataColumnSidecars succeeds immediately
|
||||
err := storage.Save(verifiedSidecars)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = service.fetchOriginColumns(roBlock, delay)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("first attempt to FetchDataColumnSidecars fails but second attempt succeeds", func(t *testing.T) {
|
||||
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
|
||||
storage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
|
||||
// Custody columns with this private key and 4-cgc: 31, 81, 97, 105
|
||||
privateKeyBytes := [32]byte{1}
|
||||
privateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes[:])
|
||||
require.NoError(t, err)
|
||||
|
||||
protocol := fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRangeTopicV1)
|
||||
|
||||
p2p, other := testp2p.NewTestP2P(t), testp2p.NewTestP2P(t, libp2p.Identity(privateKey))
|
||||
p2p.Peers().SetConnectionState(other.PeerID(), peers.Connected)
|
||||
p2p.Connect(other)
|
||||
|
||||
p2p.Peers().SetChainState(other.PeerID(), ðpb.StatusV2{
|
||||
HeadSlot: 5,
|
||||
})
|
||||
|
||||
other.ENR().Set(peerdas.Cgc(numberOfCustodyGroups))
|
||||
p2p.Peers().UpdateENR(other.ENR(), other.PeerID())
|
||||
|
||||
expectedRequest := ðpb.DataColumnSidecarsByRangeRequest{
|
||||
StartSlot: 0,
|
||||
Count: 1,
|
||||
Columns: []uint64{1, 17, 19, 42, 75, 87, 102, 117},
|
||||
}
|
||||
|
||||
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
|
||||
|
||||
gs := startup.NewClockSynchronizer()
|
||||
err = gs.SetClock(startup.NewClock(time.Unix(4113849600, 0), [fieldparams.RootLength]byte{}))
|
||||
require.NoError(t, err)
|
||||
|
||||
waiter := verification.NewInitializerWaiter(gs, nil, nil)
|
||||
initializer, err := waiter.WaitForInitializer(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
newDataColumnsVerifier := newDataColumnsVerifierFromInitializer(initializer)
|
||||
|
||||
// Create a block with blob commitments and sidecars
|
||||
roBlock, _, verifiedRoSidecars := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
|
||||
|
||||
ctxMap, err := prysmSync.ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
service := &Service{
|
||||
ctx: t.Context(),
|
||||
clock: clock,
|
||||
newDataColumnsVerifier: newDataColumnsVerifier,
|
||||
cfg: &Config{
|
||||
P2P: p2p,
|
||||
DataColumnStorage: storage,
|
||||
},
|
||||
ctxMap: ctxMap,
|
||||
}
|
||||
|
||||
// Do not respond any sidecar on the first attempt, and respond everything requested on the second one.
|
||||
firstAttempt := true
|
||||
other.SetStreamHandler(protocol, func(stream network.Stream) {
|
||||
actualRequest := new(ethpb.DataColumnSidecarsByRangeRequest)
|
||||
err := other.Encoding().DecodeWithMaxLength(stream, actualRequest)
|
||||
assert.NoError(t, err)
|
||||
assert.DeepEqual(t, expectedRequest, actualRequest)
|
||||
|
||||
if firstAttempt {
|
||||
firstAttempt = false
|
||||
err = stream.CloseWrite()
|
||||
assert.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, column := range actualRequest.Columns {
|
||||
err = prysmSync.WriteDataColumnSidecarChunk(stream, clock, other.Encoding(), verifiedRoSidecars[column].DataColumnSidecar)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = stream.CloseWrite()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
err = service.fetchOriginColumns(roBlock, delay)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check all corresponding sidecars are saved in the store.
|
||||
summary := storage.Summary(roBlock.Root())
|
||||
for _, index := range expectedRequest.Columns {
|
||||
require.Equal(t, true, summary.HasIndex(index))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -453,6 +453,10 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
// Send the request.
|
||||
stream, err := p.P2P.Send(p.Ctx, request, topic, pid)
|
||||
if err != nil {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, pid, "cannotSendDataColumnSidecarsByRangeRequest")
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "p2p send")
|
||||
}
|
||||
defer closeStream(stream, log)
|
||||
@@ -467,6 +471,10 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
|
||||
validatorSlotWithinBounds, err := isSidecarSlotWithinBounds(request)
|
||||
if err != nil {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, pid, "servedSidecarSlotOutOfBounds")
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "is sidecar slot within bounds")
|
||||
}
|
||||
|
||||
@@ -476,9 +484,17 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
isSidecarIndexRequested(request),
|
||||
)
|
||||
if errors.Is(err, io.EOF) {
|
||||
if p.DownscorePeerOnRPCFault && len(roDataColumns) == 0 {
|
||||
downscorePeer(p.P2P, pid, "noReturnedSidecar")
|
||||
}
|
||||
|
||||
return roDataColumns, nil
|
||||
}
|
||||
if err != nil {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, pid, "readChunkedDataColumnSidecarError")
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "read chunked data column sidecar")
|
||||
}
|
||||
|
||||
@@ -491,6 +507,10 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
|
||||
// All requested sidecars were delivered by the peer. Expecting EOF.
|
||||
if _, err := readChunkedDataColumnSidecar(stream, p.P2P, p.CtxMap); !errors.Is(err, io.EOF) {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, pid, "tooManyResponseDataColumnSidecars")
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(errMaxResponseDataColumnSidecarsExceeded, "requestedCount=%d", totalCount)
|
||||
}
|
||||
|
||||
@@ -528,7 +548,8 @@ func isSidecarIndexRequested(request *ethpb.DataColumnSidecarsByRangeRequest) Da
|
||||
return func(sidecar blocks.RODataColumn) error {
|
||||
columnIndex := sidecar.Index
|
||||
if !requestedIndices[columnIndex] {
|
||||
return errors.Errorf("data column sidecar index %d not found in requested indices", columnIndex)
|
||||
requested := sortedSliceFromMap(requestedIndices)
|
||||
return errors.Errorf("data column sidecar index %d returned by the peer but not found in requested indices %v", columnIndex, requested)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -566,6 +587,10 @@ func SendDataColumnSidecarsByRootRequest(p DataColumnSidecarsParams, peer goPeer
|
||||
// Send the request to the peer.
|
||||
stream, err := p.P2P.Send(p.Ctx, identifiers, topic, peer)
|
||||
if err != nil {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, peer, "cannotSendDataColumnSidecarsByRootRequest")
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "p2p api send")
|
||||
}
|
||||
defer closeStream(stream, log)
|
||||
@@ -577,9 +602,17 @@ func SendDataColumnSidecarsByRootRequest(p DataColumnSidecarsParams, peer goPeer
|
||||
for range count {
|
||||
roDataColumn, err := readChunkedDataColumnSidecar(stream, p.P2P, p.CtxMap, isSidecarIndexRootRequested(identifiers))
|
||||
if errors.Is(err, io.EOF) {
|
||||
if p.DownscorePeerOnRPCFault && len(roDataColumns) == 0 {
|
||||
downscorePeer(p.P2P, peer, "noReturnedSidecar")
|
||||
}
|
||||
|
||||
return roDataColumns, nil
|
||||
}
|
||||
if err != nil {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, peer, "readChunkedDataColumnSidecarError")
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "read chunked data column sidecar")
|
||||
}
|
||||
|
||||
@@ -592,6 +625,10 @@ func SendDataColumnSidecarsByRootRequest(p DataColumnSidecarsParams, peer goPeer
|
||||
|
||||
// All requested sidecars were delivered by the peer. Expecting EOF.
|
||||
if _, err := readChunkedDataColumnSidecar(stream, p.P2P, p.CtxMap); !errors.Is(err, io.EOF) {
|
||||
if p.DownscorePeerOnRPCFault {
|
||||
downscorePeer(p.P2P, peer, "tooManyResponseDataColumnSidecars")
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(errMaxResponseDataColumnSidecarsExceeded, "requestedCount=%d", count)
|
||||
}
|
||||
|
||||
@@ -689,3 +726,13 @@ func readChunkedDataColumnSidecar(
|
||||
|
||||
return &roDataColumn, nil
|
||||
}
|
||||
|
||||
func downscorePeer(p2p p2p.P2P, peerID peer.ID, reason string, fields ...logrus.Fields) {
|
||||
log := log
|
||||
for _, field := range fields {
|
||||
log = log.WithFields(field)
|
||||
}
|
||||
|
||||
newScore := p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user