mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
10 Commits
test-blob-
...
blob-by-ro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce2589aff8 | ||
|
|
9292dfbc90 | ||
|
|
99c81bfdad | ||
|
|
03bb30b607 | ||
|
|
fb06e49e75 | ||
|
|
f128b462c6 | ||
|
|
9e93a7947e | ||
|
|
2b7508cdeb | ||
|
|
0946114849 | ||
|
|
4a1bdbbedf |
@@ -43,6 +43,8 @@ const BlobsSidecarsByRangeMessageName = "/blobs_sidecars_by_range"
|
||||
// BeaconBlockAndBlobsSidecarByRootName is a topic for fetching beacon blocks and sidecar blobs by root.
|
||||
const BeaconBlockAndBlobsSidecarByRootName = "/beacon_block_and_blobs_sidecar_by_root"
|
||||
|
||||
const BlobSidecarsByRootName = "/blob_sidecars_by_root"
|
||||
|
||||
const (
|
||||
// V1 RPC Topics
|
||||
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
|
||||
@@ -59,8 +61,9 @@ const (
|
||||
RPCMetaDataTopicV1 = protocolPrefix + MetadataMessageName + SchemaVersionV1
|
||||
// RPCBlobsSidecarsByRangeTopicV1 defines the v1 topic for the blobs sidecars by range rpc method.
|
||||
RPCBlobsSidecarsByRangeTopicV1 = protocolPrefix + BlobsSidecarsByRangeMessageName + SchemaVersionV1
|
||||
// RPCBeaconBlockAndBlobsSidecarByRootTopicV1 defines the v1 topic for the beacon block and blobs sidecar by root rpc method.
|
||||
RPCBeaconBlockAndBlobsSidecarByRootTopicV1 = protocolPrefix + BeaconBlockAndBlobsSidecarByRootName + SchemaVersionV1
|
||||
// RPCBlobSidecarsByRootTopicV1 is a topic for requestion blob sidecars by their block root. New in deneb.
|
||||
// /eth2/beacon_chain/req/blob_sidecars_by_root/1/
|
||||
RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1
|
||||
|
||||
// V2 RPC Topics
|
||||
// RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method.
|
||||
@@ -94,8 +97,9 @@ var RPCTopicMappings = map[string]interface{}{
|
||||
RPCMetaDataTopicV1: new(interface{}),
|
||||
RPCMetaDataTopicV2: new(interface{}),
|
||||
// RPC Blobs Sidecars By Range Message
|
||||
RPCBlobsSidecarsByRangeTopicV1: new(pb.BlobsSidecarsByRangeRequest),
|
||||
RPCBeaconBlockAndBlobsSidecarByRootTopicV1: new(p2ptypes.BeaconBlockByRootsReq),
|
||||
RPCBlobsSidecarsByRangeTopicV1: new(pb.BlobsSidecarsByRangeRequest),
|
||||
// RPC Blobs Sidecars By Root Message
|
||||
RPCBlobSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq),
|
||||
}
|
||||
|
||||
// Maps all registered protocol prefixes.
|
||||
@@ -114,6 +118,7 @@ var messageMapping = map[string]bool{
|
||||
MetadataMessageName: true,
|
||||
BlobsSidecarsByRangeMessageName: true,
|
||||
BeaconBlockAndBlobsSidecarByRootName: true,
|
||||
BlobSidecarsByRootName: true,
|
||||
}
|
||||
|
||||
// Maps all the RPC messages which are to updated in altair.
|
||||
@@ -137,6 +142,7 @@ var PreAltairV1SchemaMapping = map[string]bool{
|
||||
MetadataMessageName: true,
|
||||
BlobsSidecarsByRangeMessageName: false,
|
||||
BeaconBlockAndBlobsSidecarByRootName: false,
|
||||
BlobSidecarsByRootName: false,
|
||||
}
|
||||
|
||||
// VerifyTopicMapping verifies that the topic and its accompanying
|
||||
|
||||
@@ -138,6 +138,11 @@ func (_ *FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Att
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastBlob -- fake.
|
||||
func (p *FakeP2P) BroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.SignedBlobSidecar) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastSyncCommitteeMessage -- fake.
|
||||
func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
|
||||
return nil
|
||||
|
||||
@@ -170,6 +170,11 @@ func (p *TestP2P) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Att
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *TestP2P) BroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.SignedBlobSidecar) error {
|
||||
p.BroadcastCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastSyncCommitteeMessage broadcasts a sync committee message.
|
||||
func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
|
||||
p.BroadcastCalled = true
|
||||
|
||||
@@ -12,4 +12,5 @@ var (
|
||||
ErrRateLimited = errors.New("rate limited")
|
||||
ErrIODeadline = errors.New("i/o deadline exceeded")
|
||||
ErrInvalidRequest = errors.New("invalid range, step or count")
|
||||
ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch")
|
||||
)
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v3/config/params"
|
||||
eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
const rootLength = 32
|
||||
@@ -31,6 +32,7 @@ func (b *SSZBytes) HashTreeRootWith(hh *ssz.Hasher) error {
|
||||
|
||||
// BeaconBlockByRootsReq specifies the block by roots request type.
|
||||
type BeaconBlockByRootsReq [][rootLength]byte
|
||||
type BlobSidecarsByRootReq []*eth.BlobIdentifier
|
||||
|
||||
// MarshalSSZTo marshals the block by roots request with the provided byte slice.
|
||||
func (r *BeaconBlockByRootsReq) MarshalSSZTo(dst []byte) ([]byte, error) {
|
||||
|
||||
16
beacon-chain/sync/database.go
Normal file
16
beacon-chain/sync/database.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package sync
|
||||
|
||||
import eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
|
||||
type BlobSidecarGetter interface {
|
||||
BlobSidecar(blockRoot [32]byte, index uint64) (*eth.BlobSidecar, error)
|
||||
}
|
||||
|
||||
type BlobSidecarWriter interface {
|
||||
WriteBlobSidecar(blockRoot [32]byte, index uint64, sidecar *eth.BlobSidecar) error
|
||||
}
|
||||
|
||||
type BlobDB interface {
|
||||
BlobSidecarGetter
|
||||
BlobSidecarWriter
|
||||
}
|
||||
34
beacon-chain/sync/database_test.go
Normal file
34
beacon-chain/sync/database_test.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
type mockSidecarId struct {
|
||||
r [32]byte
|
||||
i uint64
|
||||
}
|
||||
|
||||
type MockBlobDB struct {
|
||||
storage map[mockSidecarId]*ethpb.BlobSidecar
|
||||
}
|
||||
|
||||
func (m *MockBlobDB) BlobSidecar(r [32]byte, idx uint64) (*ethpb.BlobSidecar, error) {
|
||||
sc, ok := m.storage[mockSidecarId{r, idx}]
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(db.ErrNotFound, "MockBlobsDB.storage does not contain blob: root=%#x, idx=%d", r, idx)
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
|
||||
func (m *MockBlobDB) WriteBlobSidecar(r [32]byte, idx uint64, s *ethpb.BlobSidecar) error {
|
||||
if m.storage == nil {
|
||||
m.storage = make(map[mockSidecarId]*ethpb.BlobSidecar)
|
||||
}
|
||||
m.storage[mockSidecarId{r, idx}] = s
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ BlobDB = &MockBlobDB{}
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
var responseCodeSuccess = byte(0x00)
|
||||
var responseCodeInvalidRequest = byte(0x01)
|
||||
var responseCodeServerError = byte(0x02)
|
||||
var responseCodeResourceUnavailable = byte(0x03)
|
||||
|
||||
func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error) {
|
||||
return createErrorResponse(code, reason, s.cfg.p2p)
|
||||
|
||||
@@ -429,34 +429,6 @@ func (f *blocksFetcher) requestBlocksByRoot(
|
||||
return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
|
||||
}
|
||||
|
||||
func (f *blocksFetcher) requestBlockAndSidecarByRoot(
|
||||
ctx context.Context,
|
||||
req *p2pTypes.BeaconBlockByRootsReq,
|
||||
pid peer.ID,
|
||||
) ([]*p2ppb.SignedBeaconBlockAndBlobsSidecar, error) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
l := f.peerLock(pid)
|
||||
l.Lock()
|
||||
log.WithFields(logrus.Fields{
|
||||
"peer": pid,
|
||||
"numRoots": len(*req),
|
||||
"capacity": f.rateLimiter.Remaining(pid.String()),
|
||||
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
|
||||
}).Debug("Requesting blocks (by roots)")
|
||||
if f.rateLimiter.Remaining(pid.String()) < int64(len(*req)) {
|
||||
if err := f.waitForBandwidth(pid, uint64(len(*req))); err != nil {
|
||||
l.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
f.rateLimiter.Add(pid.String(), int64(len(*req)))
|
||||
l.Unlock()
|
||||
|
||||
return prysmsync.SendBlocksAndSidecarsByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
|
||||
}
|
||||
|
||||
// waitForBandwidth blocks up until peer's bandwidth is restored.
|
||||
func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
|
||||
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
||||
|
||||
@@ -18,6 +18,13 @@ import (
|
||||
|
||||
type Option func(s *Service) error
|
||||
|
||||
func WithBlobDB(bdb BlobDB) Option {
|
||||
return func(s *Service) error {
|
||||
s.blobs = bdb
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithAttestationNotifier(notifier operation.Notifier) Option {
|
||||
return func(s *Service) error {
|
||||
s.cfg.attestationNotifier = notifier
|
||||
|
||||
@@ -554,6 +554,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) {
|
||||
func TestService_sortedPendingSlots(t *testing.T) {
|
||||
r := &Service{
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
}
|
||||
|
||||
@@ -791,6 +792,7 @@ func TestService_ProcessBadPendingBlocks(t *testing.T) {
|
||||
stateGen: stategen.New(db, doublylinkedtree.New()),
|
||||
},
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
}
|
||||
r.initCaches()
|
||||
|
||||
@@ -64,7 +64,7 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
|
||||
// BlocksByRoots requests
|
||||
topicMap[addEncoding(p2p.RPCBlocksByRootTopicV1)] = blockCollector
|
||||
topicMap[addEncoding(p2p.RPCBlocksByRootTopicV2)] = blockCollectorV2
|
||||
topicMap[addEncoding(p2p.RPCBeaconBlockAndBlobsSidecarByRootTopicV1)] = blobCollector
|
||||
topicMap[addEncoding(p2p.RPCBlobSidecarsByRootTopicV1)] = blobCollector
|
||||
|
||||
// BlockByRange requests
|
||||
topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV1)] = blockCollector
|
||||
|
||||
@@ -103,8 +103,8 @@ func (s *Service) registerRPCHandlersDeneb() {
|
||||
s.blobsSidecarsByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBeaconBlockAndBlobsSidecarByRootTopicV1,
|
||||
s.beaconBlockAndBlobsSidecarByRootRPCHandler,
|
||||
p2p.RPCBlobSidecarsByRootTopicV1,
|
||||
s.blobSidecarByRootRPCHandler,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v3/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// beaconBlockAndBlobsSidecarByRootRPCHandler looks up the request beacon block and blobs from the database from a given root
|
||||
func (s *Service) beaconBlockAndBlobsSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
|
||||
ctx, span := trace.StartSpan(ctx, "sync.beaconBlockAndBlobsSidecarByRootRPCHandler")
|
||||
defer span.End()
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
|
||||
log := log.WithField("handler", "beacon_block_and_blobs_sidecar_by_root")
|
||||
|
||||
rawMsg, ok := msg.(*types.BeaconBlockByRootsReq)
|
||||
if !ok {
|
||||
return errors.New("message is not type BeaconBlockByRootsReq")
|
||||
}
|
||||
blockRoots := *rawMsg
|
||||
if err := s.rateLimiter.validateRequest(stream, uint64(len(blockRoots))); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blockRoots) == 0 {
|
||||
// Add to rate limiter in the event no
|
||||
// roots are requested.
|
||||
s.rateLimiter.add(stream, 1)
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
|
||||
return errors.New("no block roots provided")
|
||||
}
|
||||
|
||||
if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
|
||||
return errors.New("requested more than the max block limit")
|
||||
}
|
||||
s.rateLimiter.add(stream, int64(len(blockRoots)))
|
||||
|
||||
for _, root := range blockRoots {
|
||||
blk, err := s.cfg.beaconDB.Block(ctx, root)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not fetch block")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
return err
|
||||
}
|
||||
|
||||
sidecar, err := s.cfg.beaconDB.BlobsSidecar(ctx, root)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not fetch sidecar")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) // TODO(Deneb): Return unavailable?
|
||||
return err
|
||||
}
|
||||
|
||||
if blk == nil || blk.IsNil() {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, "block not found!", stream)
|
||||
return errors.New("block not found")
|
||||
}
|
||||
|
||||
// TODO(Deneb): Reconstruct blind block
|
||||
pb, err := blk.PbDenebBlock()
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not fetch block")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
return err
|
||||
}
|
||||
if err := WriteBlockAndBlobsSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), ðpb.SignedBeaconBlockAndBlobsSidecar{BeaconBlock: pb, BlobsSidecar: sidecar}); err != nil {
|
||||
log.WithError(err).Debug("Could not send block and sidecar")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
return err
|
||||
}
|
||||
}
|
||||
closeStream(stream, log)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) sendBlocksAndSidecarsRequest(ctx context.Context, blockRoots *types.BeaconBlockByRootsReq, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := SendBlocksAndSidecarsByRootRequest(ctx, s.cfg.chain, s.cfg.p2p, id, blockRoots, func(blkAndSidecar *ethpb.SignedBeaconBlockAndBlobsSidecar) error {
|
||||
blk, err := blocks.NewSignedBeaconBlock(blkAndSidecar.BeaconBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blkRoot, err := blk.Block().HashTreeRoot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.pendingQueueLock.Lock()
|
||||
defer s.pendingQueueLock.Unlock()
|
||||
if err := s.insertBlkAndBlobToQueue(blk.Block().Slot(), blk, blkRoot, blkAndSidecar.BlobsSidecar); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
103
beacon-chain/sync/rpc_blob_sidecars_by_root.go
Normal file
103
beacon-chain/sync/rpc_blob_sidecars_by_root.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v3/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v3/time/slots"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
func minimumRequestEpoch(finalized, current primitives.Epoch) primitives.Epoch {
|
||||
// max(finalized_epoch, current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH)
|
||||
denebFork := params.BeaconConfig().DenebForkEpoch
|
||||
reqWindow := current - params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
|
||||
if finalized >= reqWindow && finalized >= denebFork {
|
||||
return finalized
|
||||
}
|
||||
if reqWindow >= finalized && reqWindow >= denebFork {
|
||||
return reqWindow
|
||||
}
|
||||
return denebFork
|
||||
}
|
||||
|
||||
// blobSidecarByRootRPCHandler handles the /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPC request.
|
||||
// spec: https://github.com/ethereum/consensus-specs/blob/a7e45db9ac2b60a33e144444969ad3ac0aae3d4c/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
|
||||
func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
|
||||
ctx, span := trace.StartSpan(ctx, "sync.blobSidecarByRootRPCHandler")
|
||||
defer span.End()
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
log := log.WithField("handler", p2p.BlobSidecarsByRootName[1:]) // slice the leading slash off the name var
|
||||
ref, ok := msg.(*types.BlobSidecarsByRootReq)
|
||||
if !ok {
|
||||
return errors.New("message is not type BeaconBlockByRootsReq")
|
||||
}
|
||||
minReqEpoch := minimumRequestEpoch(s.cfg.chain.FinalizedCheckpt().Epoch, slots.ToEpoch(s.cfg.chain.CurrentSlot()))
|
||||
blobIdents := *ref
|
||||
for i := range blobIdents {
|
||||
root, idx := bytesutil.ToBytes32(blobIdents[i].BlockRoot), blobIdents[i].Index
|
||||
sc, err := s.blobs.BlobSidecar(root, idx)
|
||||
if err != nil {
|
||||
if errors.Is(err, db.ErrNotFound) {
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Debugf("error retrieving BlobSidecar, root=%x, idnex=%d", root, idx)
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
return err
|
||||
}
|
||||
|
||||
// If any root in the request content references a block earlier than minimum_request_epoch,
|
||||
// peers MAY respond with error code 3: ResourceUnavailable or not include the blob in the response.
|
||||
if slots.ToEpoch(sc.Slot) < minReqEpoch {
|
||||
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrBlobLTMinRequest.Error(), stream)
|
||||
log.WithError(types.ErrBlobLTMinRequest).
|
||||
Debugf("requested blob for block %#x before minimum_request_epoch", blobIdents[i].BlockRoot)
|
||||
return types.ErrBlobLTMinRequest
|
||||
}
|
||||
SetStreamWriteDeadline(stream, defaultWriteDuration)
|
||||
if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
|
||||
log.WithError(chunkErr).Debug("Could not send a chunked response")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, chunkErr)
|
||||
return chunkErr
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) sendBlocksAndSidecarsRequest(ctx context.Context, blockRoots *types.BeaconBlockByRootsReq, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := SendBlocksAndSidecarsByRootRequest(ctx, s.cfg.chain, s.cfg.p2p, id, blockRoots, func(blkAndSidecar *ethpb.SignedBeaconBlockAndBlobsSidecar) error {
|
||||
blk, err := blocks.NewSignedBeaconBlock(blkAndSidecar.BeaconBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blkRoot, err := blk.Block().HashTreeRoot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.pendingQueueLock.Lock()
|
||||
defer s.pendingQueueLock.Unlock()
|
||||
if err := s.insertBlkAndBlobToQueue(blk.Block().Slot(), blk, blkRoot, blkAndSidecar.BlobsSidecar); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
373
beacon-chain/sync/rpc_blob_sidecars_by_root_test.go
Normal file
373
beacon-chain/sync/rpc_blob_sidecars_by_root_test.go
Normal file
@@ -0,0 +1,373 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"math"
|
||||
"math/big"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
gethTypes "github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing"
|
||||
p2pTypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v3/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v3/config/params"
|
||||
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
|
||||
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
|
||||
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v3/network/forks"
|
||||
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v3/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v3/testing/util"
|
||||
"github.com/prysmaticlabs/prysm/v3/time/slots"
|
||||
)
|
||||
|
||||
type sidecarsTestCase struct {
|
||||
name string
|
||||
nblocks int // how many blocks to loop through in setting up test fixtures & requests
|
||||
missing map[int]map[int]bool // skip this blob index, so that we can test different custody scenarios
|
||||
expired map[int]bool // mark block expired to test scenarios where requests are outside retention window
|
||||
chain *mock.ChainService // allow tests to control retention window via current slot and finalized checkpoint
|
||||
total *int // allow a test to specify the total number of responses received
|
||||
err error
|
||||
}
|
||||
|
||||
func generateTestBlock(t *testing.T, slot types.Slot) *ethpb.SignedBeaconBlockDeneb {
|
||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||
parent := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength)
|
||||
stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength)
|
||||
receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength)
|
||||
logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength)
|
||||
tx := gethTypes.NewTransaction(
|
||||
0,
|
||||
common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"),
|
||||
big.NewInt(0), 0, big.NewInt(0),
|
||||
nil,
|
||||
)
|
||||
txs := []*gethTypes.Transaction{tx}
|
||||
encodedBinaryTxs := make([][]byte, 1)
|
||||
var err error
|
||||
encodedBinaryTxs[0], err = txs[0].MarshalBinary()
|
||||
require.NoError(t, err)
|
||||
blockHash := bytesutil.ToBytes32([]byte("foo"))
|
||||
payload := &enginev1.ExecutionPayloadDeneb{
|
||||
ParentHash: parent,
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: stateRoot,
|
||||
ReceiptsRoot: receiptsRoot,
|
||||
LogsBloom: logsBloom,
|
||||
PrevRandao: blockHash[:],
|
||||
BlockNumber: 0,
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
Timestamp: 0,
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength),
|
||||
ExcessDataGas: bytesutil.PadTo([]byte("excessDataGas"), fieldparams.RootLength),
|
||||
BlockHash: blockHash[:],
|
||||
Transactions: encodedBinaryTxs,
|
||||
}
|
||||
block := util.NewBeaconBlockDeneb()
|
||||
block.Block.Body.ExecutionPayload = payload
|
||||
block.Block.Slot = slot
|
||||
return block
|
||||
}
|
||||
|
||||
func generateTestSidecar(root [32]byte, block *ethpb.SignedBeaconBlockDeneb, index int) *ethpb.BlobSidecar {
|
||||
blob := &enginev1.Blob{
|
||||
Data: make([]byte, fieldparams.BlobSize),
|
||||
}
|
||||
binary.LittleEndian.PutUint64(blob.Data, uint64(index))
|
||||
return ðpb.BlobSidecar{
|
||||
BlockRoot: root[:],
|
||||
Index: uint64(index),
|
||||
Slot: block.Block.Slot,
|
||||
BlockParentRoot: block.Block.ParentRoot[:],
|
||||
ProposerIndex: block.Block.ProposerIndex,
|
||||
Blob: blob,
|
||||
KzgCommitment: make([]byte, 48),
|
||||
KzgProof: make([]byte, 48),
|
||||
}
|
||||
}
|
||||
|
||||
type expectedResponse struct {
|
||||
sidecar *ethpb.BlobSidecar
|
||||
code uint8
|
||||
message string
|
||||
skipped bool
|
||||
}
|
||||
|
||||
type streamDecoder func(io.Reader, ssz.Unmarshaler) error
|
||||
|
||||
func (r *expectedResponse) requireExpected(t *testing.T, d streamDecoder, stream network.Stream) {
|
||||
if r.skipped {
|
||||
return
|
||||
}
|
||||
code, _, err := ReadStatusCode(stream, &encoder.SszNetworkEncoder{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, r.code, code, "unexpected response code")
|
||||
//require.Equal(t, r.message, msg, "unexpected error message")
|
||||
if r.sidecar == nil {
|
||||
return
|
||||
}
|
||||
sc := ðpb.BlobSidecar{}
|
||||
require.NoError(t, d(stream, sc))
|
||||
require.Equal(t, bytesutil.ToBytes32(sc.BlockRoot), bytesutil.ToBytes32(r.sidecar.BlockRoot))
|
||||
require.Equal(t, sc.Index, r.sidecar.Index)
|
||||
}
|
||||
|
||||
func (c sidecarsTestCase) run(t *testing.T) {
|
||||
cfg := params.BeaconConfig()
|
||||
repositionFutureEpochs(cfg)
|
||||
undo, err := params.SetActiveWithUndo(cfg)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, undo())
|
||||
}()
|
||||
maxBlobs := int(params.BeaconConfig().MaxBlobsPerBlock)
|
||||
if c.chain == nil {
|
||||
c.chain = defaultMockChain(t)
|
||||
}
|
||||
|
||||
db := &MockBlobDB{}
|
||||
var req p2pTypes.BlobSidecarsByRootReq
|
||||
var expect []*expectedResponse
|
||||
oldest, err := slots.EpochStart(minimumRequestEpoch(c.chain.FinalizedCheckPoint.Epoch, slots.ToEpoch(c.chain.CurrentSlot())))
|
||||
require.NoError(t, err)
|
||||
streamTerminated := false
|
||||
for i := 0; i < c.nblocks; i++ {
|
||||
// check if there is a slot override for this index
|
||||
// ie to create a block outside the minimum_request_epoch
|
||||
var bs types.Slot
|
||||
if c.expired[i] {
|
||||
// the lowest possible bound of the retention period is the deneb epoch, so make sure
|
||||
// the slot of an expired block is at least one slot less than the deneb epoch.
|
||||
bs = oldest - 1 - types.Slot(i)
|
||||
} else {
|
||||
bs = oldest + types.Slot(i)
|
||||
}
|
||||
block := generateTestBlock(t, bs+types.Slot(i))
|
||||
root, err := block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
binary.LittleEndian.PutUint64(root[:], uint64(i))
|
||||
for bi := 0; bi < maxBlobs; bi++ {
|
||||
ubi := uint64(bi)
|
||||
req = append(req, ðpb.BlobIdentifier{BlockRoot: root[:], Index: ubi})
|
||||
|
||||
if streamTerminated {
|
||||
// once we know there is a bad response in the sequence, we want to filter out any subsequent
|
||||
// expected responses, because an error response terminates the stream.
|
||||
continue
|
||||
}
|
||||
// skip sidecars that are supposed to be missing
|
||||
if missed, ok := c.missing[i]; ok && missed[bi] {
|
||||
continue
|
||||
}
|
||||
sc := generateTestSidecar(root, block, bi)
|
||||
require.NoError(t, db.WriteBlobSidecar(root, ubi, sc))
|
||||
// if a sidecar is expired, we'll expect an error for the *first* index, and after that
|
||||
// we'll expect no further chunks in the stream, so filter out any further expected responses.
|
||||
// we don't need to check what index this is because we work through them in order and the first one
|
||||
// will set streamTerminated = true and skip everything else in the test case.
|
||||
if c.expired[i] {
|
||||
expect = append(expect, &expectedResponse{
|
||||
code: responseCodeResourceUnavailable,
|
||||
message: p2pTypes.ErrBlobLTMinRequest.Error(),
|
||||
})
|
||||
streamTerminated = true
|
||||
continue
|
||||
}
|
||||
|
||||
expect = append(expect, &expectedResponse{
|
||||
sidecar: sc,
|
||||
code: responseCodeSuccess,
|
||||
message: "",
|
||||
})
|
||||
}
|
||||
}
|
||||
rate := params.BeaconNetworkConfig().MaxRequestBlobsSidecars * params.BeaconConfig().MaxBlobsPerBlock
|
||||
client := p2ptest.NewTestP2P(t)
|
||||
s := &Service{
|
||||
cfg: &config{p2p: client, chain: c.chain},
|
||||
blobs: db,
|
||||
rateLimiter: newRateLimiter(client)}
|
||||
s.setRateCollector(p2p.RPCBlobSidecarsByRootTopicV1, leakybucket.NewCollector(0.000001, int64(rate), time.Second, false))
|
||||
|
||||
dec := s.cfg.p2p.Encoding().DecodeWithMaxLength
|
||||
if c.total != nil {
|
||||
require.Equal(t, *c.total, len(expect))
|
||||
}
|
||||
nh := func(stream network.Stream) {
|
||||
for _, ex := range expect {
|
||||
ex.requireExpected(t, dec, stream)
|
||||
}
|
||||
}
|
||||
rht := &rpcHandlerTest{t: t, client: client, topic: p2p.RPCBlocksByRootTopicV1, timeout: time.Second * 10, err: c.err}
|
||||
rht.testHandler(nh, s.blobSidecarByRootRPCHandler, &req)
|
||||
}
|
||||
|
||||
type rpcHandlerTest struct {
|
||||
t *testing.T
|
||||
client *p2ptest.TestP2P
|
||||
topic protocol.ID
|
||||
timeout time.Duration
|
||||
err error
|
||||
}
|
||||
|
||||
func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, rhi interface{}) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rt.timeout)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
w := util.NewWaiter()
|
||||
server := p2ptest.NewTestP2P(rt.t)
|
||||
rt.client.Connect(server)
|
||||
defer func() {
|
||||
require.NoError(rt.t, rt.client.Disconnect(server.PeerID()))
|
||||
}()
|
||||
require.Equal(rt.t, 1, len(rt.client.BHost.Network().Peers()), "Expected peers to be connected")
|
||||
h := func(stream network.Stream) {
|
||||
defer w.Done()
|
||||
nh(stream)
|
||||
}
|
||||
server.BHost.SetStreamHandler(protocol.ID(rt.topic), h)
|
||||
stream, err := rt.client.BHost.NewStream(ctx, server.BHost.ID(), protocol.ID(rt.topic))
|
||||
require.NoError(rt.t, err)
|
||||
|
||||
err = rh(ctx, rhi, stream)
|
||||
if rt.err == nil {
|
||||
require.NoError(rt.t, err)
|
||||
} else {
|
||||
require.ErrorIs(rt.t, err, rt.err)
|
||||
}
|
||||
|
||||
w.RequireDoneBeforeCancel(rt.t, ctx)
|
||||
}
|
||||
|
||||
// we use max uints for future forks, but this causes overflows when computing slots
|
||||
// so it is helpful in tests to temporarily reposition the epochs to give room for some math.
|
||||
func repositionFutureEpochs(cfg *params.BeaconChainConfig) {
|
||||
if cfg.CapellaForkEpoch == math.MaxUint64 {
|
||||
cfg.CapellaForkEpoch = cfg.BellatrixForkEpoch + 100
|
||||
}
|
||||
if cfg.DenebForkEpoch == math.MaxUint64 {
|
||||
cfg.DenebForkEpoch = cfg.CapellaForkEpoch + 100
|
||||
}
|
||||
}
|
||||
|
||||
func defaultMockChain(t *testing.T) *mock.ChainService {
|
||||
df, err := forks.Fork(params.BeaconConfig().DenebForkEpoch)
|
||||
require.NoError(t, err)
|
||||
ce := params.BeaconConfig().DenebForkEpoch + params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
|
||||
fe := ce - 2
|
||||
cs, err := slots.EpochStart(ce)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &mock.ChainService{
|
||||
ValidatorsRoot: [32]byte{},
|
||||
Slot: &cs,
|
||||
FinalizedCheckPoint: ðpb.Checkpoint{Epoch: fe},
|
||||
Fork: df}
|
||||
}
|
||||
|
||||
func TestSidecarByRootValidation(t *testing.T) {
|
||||
cfg := params.BeaconConfig()
|
||||
repositionFutureEpochs(cfg)
|
||||
undo, err := params.SetActiveWithUndo(cfg)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, undo())
|
||||
}()
|
||||
capellaSlot, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
|
||||
require.NoError(t, err)
|
||||
dmc := defaultMockChain(t)
|
||||
dmc.Slot = &capellaSlot
|
||||
dmc.FinalizedCheckPoint = ðpb.Checkpoint{Epoch: params.BeaconConfig().CapellaForkEpoch}
|
||||
cases := []sidecarsTestCase{
|
||||
{
|
||||
name: "block before minimum_request_epoch",
|
||||
nblocks: 1,
|
||||
expired: map[int]bool{0: true},
|
||||
chain: dmc,
|
||||
err: p2pTypes.ErrBlobLTMinRequest,
|
||||
},
|
||||
{
|
||||
name: "blocks before and after minimum_request_epoch",
|
||||
nblocks: 2,
|
||||
expired: map[int]bool{0: true},
|
||||
chain: dmc,
|
||||
err: p2pTypes.ErrBlobLTMinRequest,
|
||||
},
|
||||
{
|
||||
name: "one after minimum_request_epoch then one before",
|
||||
nblocks: 2,
|
||||
expired: map[int]bool{1: true},
|
||||
chain: dmc,
|
||||
err: p2pTypes.ErrBlobLTMinRequest,
|
||||
},
|
||||
{
|
||||
name: "one missing index, one after minimum_request_epoch then one before",
|
||||
nblocks: 3,
|
||||
missing: map[int]map[int]bool{0: map[int]bool{0: true}},
|
||||
expired: map[int]bool{1: true},
|
||||
chain: dmc,
|
||||
err: p2pTypes.ErrBlobLTMinRequest,
|
||||
},
|
||||
{
|
||||
name: "2 missing indices from 2 different blocks",
|
||||
nblocks: 3,
|
||||
missing: map[int]map[int]bool{0: map[int]bool{0: true}, 2: map[int]bool{3: true}},
|
||||
total: func(i int) *int { return &i }(3*int(params.BeaconConfig().MaxBlobsPerBlock) - 2), // aka 10
|
||||
},
|
||||
{
|
||||
name: "all indices missing",
|
||||
nblocks: 1,
|
||||
missing: map[int]map[int]bool{0: map[int]bool{0: true, 1: true, 2: true, 3: true}},
|
||||
total: func(i int) *int { return &i }(0), // aka 10
|
||||
},
|
||||
{
|
||||
name: "block with all indices missing between 2 full blocks",
|
||||
nblocks: 3,
|
||||
missing: map[int]map[int]bool{1: map[int]bool{0: true, 1: true, 2: true, 3: true}},
|
||||
total: func(i int) *int { return &i }(8), // aka 10
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.run(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSidecarsByRootOK(t *testing.T) {
|
||||
cases := []sidecarsTestCase{
|
||||
{
|
||||
name: "0 blob",
|
||||
nblocks: 0,
|
||||
},
|
||||
{
|
||||
name: "1 blob",
|
||||
nblocks: 1,
|
||||
},
|
||||
{
|
||||
name: "2 blob",
|
||||
nblocks: 2,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.run(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v3/network/forks"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v3/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v3/time/slots"
|
||||
)
|
||||
|
||||
// chunkBlockWriter writes the given message as a chunked response to the given network
|
||||
@@ -107,6 +108,25 @@ func WriteBlobsSidecarChunk(stream libp2pcore.Stream, chain blockchain.ChainInfo
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteBlobSidecarChunk writes blob chunk object to stream.
|
||||
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
|
||||
func WriteBlobSidecarChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, sidecar *ethpb.BlobSidecar) error {
|
||||
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
|
||||
return err
|
||||
}
|
||||
valRoot := chain.GenesisValidatorsRoot()
|
||||
ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.GetSlot()), valRoot[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := writeContextToStream(ctxBytes[:], stream, chain); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = encoding.EncodeWithMaxLength(stream, sidecar)
|
||||
return err
|
||||
}
|
||||
|
||||
func WriteBlockAndBlobsSidecarChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, b *ethpb.SignedBeaconBlockAndBlobsSidecar) error {
|
||||
SetStreamWriteDeadline(stream, defaultWriteDuration)
|
||||
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
|
||||
|
||||
@@ -132,6 +132,50 @@ func SendBeaconBlocksByRootRequest(
|
||||
return blocks, nil
|
||||
}
|
||||
|
||||
func SendSidecarsByRoot(
|
||||
ctx context.Context, ci blockchain.ChainInfoFetcher, p2pApi p2p.P2P, pid peer.ID,
|
||||
req *p2ptypes.BlobSidecarsByRootReq, processor BlobsSidecarProcessor,
|
||||
) ([]*pb.BlobsSidecar, error) {
|
||||
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRootName, slots.ToEpoch(ci.CurrentSlot()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stream, err := p2pApi.Send(ctx, req, topic, pid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer closeStream(stream, log)
|
||||
|
||||
sidecars := make([]*pb.BlobsSidecar, 0, len(*req))
|
||||
process := func(s *pb.BlobsSidecar) error {
|
||||
sidecars = append(sidecars, s)
|
||||
if processor != nil {
|
||||
return processor(s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
max := params.BeaconNetworkConfig().MaxRequestBlobsSidecars * params.BeaconConfig().MaxBlobsPerBlock
|
||||
for i := 0; i < len(*req); i++ {
|
||||
// Exit if peer sends more than MAX_REQUEST_BLOBS_SIDECARS.
|
||||
if uint64(i) >= max {
|
||||
break
|
||||
}
|
||||
isFirstChunk := i == 0
|
||||
sc, err := ReadChunkedBlobsSidecar(stream, ci, p2pApi, isFirstChunk)
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := process(sc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return sidecars, nil
|
||||
}
|
||||
|
||||
func SendBlocksAndSidecarsByRootRequest(
|
||||
ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID,
|
||||
req *p2ptypes.BeaconBlockByRootsReq, processor BeaconBlockAndSidecarProcessor,
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
|
||||
lruwrpr "github.com/prysmaticlabs/prysm/v3/cache/lru"
|
||||
"github.com/prysmaticlabs/prysm/v3/config/params"
|
||||
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v3/runtime"
|
||||
prysmTime "github.com/prysmaticlabs/prysm/v3/time"
|
||||
@@ -139,6 +140,7 @@ type Service struct {
|
||||
syncContributionBitsOverlapLock sync.RWMutex
|
||||
syncContributionBitsOverlapCache *lru.Cache
|
||||
signatureChan chan *signatureVerifier
|
||||
blobs BlobDB
|
||||
}
|
||||
|
||||
// NewService initializes new regular sync service.
|
||||
@@ -289,6 +291,10 @@ func (s *Service) registerHandlers() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) setRateCollector(topic string, c *leakybucket.Collector) {
|
||||
s.rateLimiter.limiterMap[topic] = c
|
||||
}
|
||||
|
||||
// marks the chain as having started.
|
||||
func (s *Service) markForChainStart() {
|
||||
s.chainStarted.Set()
|
||||
|
||||
@@ -26,4 +26,5 @@ const (
|
||||
SyncCommitteeAggregationBytesLength = 16 // SyncCommitteeAggregationBytesLength defines the length of sync committee aggregate bytes.
|
||||
SyncAggregateSyncCommitteeBytesLength = 64 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate.
|
||||
MaxWithdrawalsPerPayload = 16 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload.
|
||||
BlobSize = 131072 // defined to match blobs.size in bazel ssz codegen
|
||||
)
|
||||
|
||||
@@ -26,4 +26,5 @@ const (
|
||||
SyncCommitteeAggregationBytesLength = 1 // SyncCommitteeAggregationBytesLength defines the sync committee aggregate bytes.
|
||||
SyncAggregateSyncCommitteeBytesLength = 4 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate.
|
||||
MaxWithdrawalsPerPayload = 4 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload.
|
||||
BlobSize = 131072 // defined to match blobs.size in bazel ssz codegen
|
||||
)
|
||||
|
||||
@@ -213,6 +213,9 @@ type BeaconChainConfig struct {
|
||||
|
||||
// Execution engine timeout value
|
||||
ExecutionEngineTimeoutValue uint64 // ExecutionEngineTimeoutValue defines the seconds to wait before timing out engine endpoints with execution payload execution semantics (newPayload, forkchoiceUpdated).
|
||||
|
||||
// Deneb presets
|
||||
MaxBlobsPerBlock uint64
|
||||
}
|
||||
|
||||
// InitializeForkSchedule initializes the schedules forks baked into the config.
|
||||
|
||||
@@ -268,6 +268,9 @@ var mainnetBeaconConfig = &BeaconChainConfig{
|
||||
|
||||
// Execution engine timeout value
|
||||
ExecutionEngineTimeoutValue: 8, // 8 seconds default based on: https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#core
|
||||
|
||||
// deneb presets
|
||||
MaxBlobsPerBlock: 4,
|
||||
}
|
||||
|
||||
// MainnetTestConfig provides a version of the mainnet config that has a different name
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -20,3 +22,35 @@ func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
type Waiter struct {
|
||||
c chan struct{}
|
||||
}
|
||||
|
||||
func NewWaiter() *Waiter {
|
||||
return &Waiter{
|
||||
c: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Waiter) Done() {
|
||||
close(w.c)
|
||||
}
|
||||
|
||||
func (w *Waiter) RequireDoneAfter(t *testing.T, timeout time.Duration) {
|
||||
select {
|
||||
case <-w.c:
|
||||
return
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("timeout after %s", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Waiter) RequireDoneBeforeCancel(t *testing.T, ctx context.Context) {
|
||||
select {
|
||||
case <-w.c:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("context canceled before Done with error=%s", ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user