Compare commits

...

10 Commits

Author SHA1 Message Date
kasey
ce2589aff8 initialize blob queue for test 2023-03-06 12:55:22 -06:00
kasey
9292dfbc90 handler name consistent with path 2023-03-03 14:05:14 -06:00
kasey
99c81bfdad add BroadcastBlob method stubs to fix build 2023-03-02 20:24:29 -06:00
kasey
03bb30b607 rm unused func 2023-03-02 20:20:12 -06:00
kasey
fb06e49e75 cleaning up test mess 2023-03-02 20:20:12 -06:00
kasey
f128b462c6 test blob slot < min req epoch, other fixes 2023-03-02 20:20:12 -06:00
kasey
9e93a7947e add test for base case of sidecar by root 2023-03-02 20:20:12 -06:00
kasey
2b7508cdeb rm unused requestBlockAndSidecarByRoot 2023-03-02 20:18:30 -06:00
kasey
0946114849 add db interface to service and use in handler 2023-03-02 20:18:30 -06:00
kasey
4a1bdbbedf boilerplate for block&block-by-root->blob-by-root 2023-03-02 20:18:30 -06:00
24 changed files with 674 additions and 142 deletions

View File

@@ -43,6 +43,8 @@ const BlobsSidecarsByRangeMessageName = "/blobs_sidecars_by_range"
// BeaconBlockAndBlobsSidecarByRootName is a topic for fetching beacon blocks and sidecar blobs by root.
const BeaconBlockAndBlobsSidecarByRootName = "/beacon_block_and_blobs_sidecar_by_root"
const BlobSidecarsByRootName = "/blob_sidecars_by_root"
const (
// V1 RPC Topics
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
@@ -59,8 +61,9 @@ const (
RPCMetaDataTopicV1 = protocolPrefix + MetadataMessageName + SchemaVersionV1
// RPCBlobsSidecarsByRangeTopicV1 defines the v1 topic for the blobs sidecars by range rpc method.
RPCBlobsSidecarsByRangeTopicV1 = protocolPrefix + BlobsSidecarsByRangeMessageName + SchemaVersionV1
// RPCBeaconBlockAndBlobsSidecarByRootTopicV1 defines the v1 topic for the beacon block and blobs sidecar by root rpc method.
RPCBeaconBlockAndBlobsSidecarByRootTopicV1 = protocolPrefix + BeaconBlockAndBlobsSidecarByRootName + SchemaVersionV1
// RPCBlobSidecarsByRootTopicV1 is a topic for requestion blob sidecars by their block root. New in deneb.
// /eth2/beacon_chain/req/blob_sidecars_by_root/1/
RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1
// V2 RPC Topics
// RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method.
@@ -94,8 +97,9 @@ var RPCTopicMappings = map[string]interface{}{
RPCMetaDataTopicV1: new(interface{}),
RPCMetaDataTopicV2: new(interface{}),
// RPC Blobs Sidecars By Range Message
RPCBlobsSidecarsByRangeTopicV1: new(pb.BlobsSidecarsByRangeRequest),
RPCBeaconBlockAndBlobsSidecarByRootTopicV1: new(p2ptypes.BeaconBlockByRootsReq),
RPCBlobsSidecarsByRangeTopicV1: new(pb.BlobsSidecarsByRangeRequest),
// RPC Blobs Sidecars By Root Message
RPCBlobSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq),
}
// Maps all registered protocol prefixes.
@@ -114,6 +118,7 @@ var messageMapping = map[string]bool{
MetadataMessageName: true,
BlobsSidecarsByRangeMessageName: true,
BeaconBlockAndBlobsSidecarByRootName: true,
BlobSidecarsByRootName: true,
}
// Maps all the RPC messages which are to updated in altair.
@@ -137,6 +142,7 @@ var PreAltairV1SchemaMapping = map[string]bool{
MetadataMessageName: true,
BlobsSidecarsByRangeMessageName: false,
BeaconBlockAndBlobsSidecarByRootName: false,
BlobSidecarsByRootName: false,
}
// VerifyTopicMapping verifies that the topic and its accompanying

View File

@@ -138,6 +138,11 @@ func (_ *FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Att
return nil
}
// BroadcastBlob -- fake.
func (p *FakeP2P) BroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.SignedBlobSidecar) error {
return nil
}
// BroadcastSyncCommitteeMessage -- fake.
func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
return nil

View File

@@ -170,6 +170,11 @@ func (p *TestP2P) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Att
return nil
}
func (p *TestP2P) BroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.SignedBlobSidecar) error {
p.BroadcastCalled = true
return nil
}
// BroadcastSyncCommitteeMessage broadcasts a sync committee message.
func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
p.BroadcastCalled = true

View File

@@ -12,4 +12,5 @@ var (
ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch")
)

View File

@@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v3/config/params"
eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)
const rootLength = 32
@@ -31,6 +32,7 @@ func (b *SSZBytes) HashTreeRootWith(hh *ssz.Hasher) error {
// BeaconBlockByRootsReq specifies the block by roots request type.
type BeaconBlockByRootsReq [][rootLength]byte
type BlobSidecarsByRootReq []*eth.BlobIdentifier
// MarshalSSZTo marshals the block by roots request with the provided byte slice.
func (r *BeaconBlockByRootsReq) MarshalSSZTo(dst []byte) ([]byte, error) {

View File

@@ -0,0 +1,16 @@
package sync
import eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
type BlobSidecarGetter interface {
BlobSidecar(blockRoot [32]byte, index uint64) (*eth.BlobSidecar, error)
}
type BlobSidecarWriter interface {
WriteBlobSidecar(blockRoot [32]byte, index uint64, sidecar *eth.BlobSidecar) error
}
type BlobDB interface {
BlobSidecarGetter
BlobSidecarWriter
}

View File

@@ -0,0 +1,34 @@
package sync
import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)
type mockSidecarId struct {
r [32]byte
i uint64
}
type MockBlobDB struct {
storage map[mockSidecarId]*ethpb.BlobSidecar
}
func (m *MockBlobDB) BlobSidecar(r [32]byte, idx uint64) (*ethpb.BlobSidecar, error) {
sc, ok := m.storage[mockSidecarId{r, idx}]
if !ok {
return nil, errors.Wrapf(db.ErrNotFound, "MockBlobsDB.storage does not contain blob: root=%#x, idx=%d", r, idx)
}
return sc, nil
}
func (m *MockBlobDB) WriteBlobSidecar(r [32]byte, idx uint64, s *ethpb.BlobSidecar) error {
if m.storage == nil {
m.storage = make(map[mockSidecarId]*ethpb.BlobSidecar)
}
m.storage[mockSidecarId{r, idx}] = s
return nil
}
var _ BlobDB = &MockBlobDB{}

View File

@@ -16,6 +16,7 @@ import (
var responseCodeSuccess = byte(0x00)
var responseCodeInvalidRequest = byte(0x01)
var responseCodeServerError = byte(0x02)
var responseCodeResourceUnavailable = byte(0x03)
func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error) {
return createErrorResponse(code, reason, s.cfg.p2p)

View File

@@ -429,34 +429,6 @@ func (f *blocksFetcher) requestBlocksByRoot(
return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
func (f *blocksFetcher) requestBlockAndSidecarByRoot(
ctx context.Context,
req *p2pTypes.BeaconBlockByRootsReq,
pid peer.ID,
) ([]*p2ppb.SignedBeaconBlockAndBlobsSidecar, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
l := f.peerLock(pid)
l.Lock()
log.WithFields(logrus.Fields{
"peer": pid,
"numRoots": len(*req),
"capacity": f.rateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blocks (by roots)")
if f.rateLimiter.Remaining(pid.String()) < int64(len(*req)) {
if err := f.waitForBandwidth(pid, uint64(len(*req))); err != nil {
l.Unlock()
return nil, err
}
}
f.rateLimiter.Add(pid.String(), int64(len(*req)))
l.Unlock()
return prysmsync.SendBlocksAndSidecarsByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
// waitForBandwidth blocks up until peer's bandwidth is restored.
func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
log.WithField("peer", pid).Debug("Slowing down for rate limit")

View File

@@ -18,6 +18,13 @@ import (
type Option func(s *Service) error
func WithBlobDB(bdb BlobDB) Option {
return func(s *Service) error {
s.blobs = bdb
return nil
}
}
func WithAttestationNotifier(notifier operation.Notifier) Option {
return func(s *Service) error {
s.cfg.attestationNotifier = notifier

View File

@@ -554,6 +554,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) {
func TestService_sortedPendingSlots(t *testing.T) {
r := &Service{
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool),
}
@@ -791,6 +792,7 @@ func TestService_ProcessBadPendingBlocks(t *testing.T) {
stateGen: stategen.New(db, doublylinkedtree.New()),
},
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool),
}
r.initCaches()

View File

@@ -64,7 +64,7 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
// BlocksByRoots requests
topicMap[addEncoding(p2p.RPCBlocksByRootTopicV1)] = blockCollector
topicMap[addEncoding(p2p.RPCBlocksByRootTopicV2)] = blockCollectorV2
topicMap[addEncoding(p2p.RPCBeaconBlockAndBlobsSidecarByRootTopicV1)] = blobCollector
topicMap[addEncoding(p2p.RPCBlobSidecarsByRootTopicV1)] = blobCollector
// BlockByRange requests
topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV1)] = blockCollector

View File

@@ -103,8 +103,8 @@ func (s *Service) registerRPCHandlersDeneb() {
s.blobsSidecarsByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBeaconBlockAndBlobsSidecarByRootTopicV1,
s.beaconBlockAndBlobsSidecarByRootRPCHandler,
p2p.RPCBlobSidecarsByRootTopicV1,
s.blobSidecarByRootRPCHandler,
)
}

View File

@@ -1,107 +0,0 @@
package sync
import (
"context"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"go.opencensus.io/trace"
)
// beaconBlockAndBlobsSidecarByRootRPCHandler looks up the request beacon block and blobs from the database from a given root
func (s *Service) beaconBlockAndBlobsSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.beaconBlockAndBlobsSidecarByRootRPCHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", "beacon_block_and_blobs_sidecar_by_root")
rawMsg, ok := msg.(*types.BeaconBlockByRootsReq)
if !ok {
return errors.New("message is not type BeaconBlockByRootsReq")
}
blockRoots := *rawMsg
if err := s.rateLimiter.validateRequest(stream, uint64(len(blockRoots))); err != nil {
return err
}
if len(blockRoots) == 0 {
// Add to rate limiter in the event no
// roots are requested.
s.rateLimiter.add(stream, 1)
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}
if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}
s.rateLimiter.add(stream, int64(len(blockRoots)))
for _, root := range blockRoots {
blk, err := s.cfg.beaconDB.Block(ctx, root)
if err != nil {
log.WithError(err).Debug("Could not fetch block")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
sidecar, err := s.cfg.beaconDB.BlobsSidecar(ctx, root)
if err != nil {
log.WithError(err).Debug("Could not fetch sidecar")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) // TODO(Deneb): Return unavailable?
return err
}
if blk == nil || blk.IsNil() {
s.writeErrorResponseToStream(responseCodeInvalidRequest, "block not found!", stream)
return errors.New("block not found")
}
// TODO(Deneb): Reconstruct blind block
pb, err := blk.PbDenebBlock()
if err != nil {
log.WithError(err).Debug("Could not fetch block")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if err := WriteBlockAndBlobsSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), &ethpb.SignedBeaconBlockAndBlobsSidecar{BeaconBlock: pb, BlobsSidecar: sidecar}); err != nil {
log.WithError(err).Debug("Could not send block and sidecar")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
}
closeStream(stream, log)
return nil
}
func (s *Service) sendBlocksAndSidecarsRequest(ctx context.Context, blockRoots *types.BeaconBlockByRootsReq, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
_, err := SendBlocksAndSidecarsByRootRequest(ctx, s.cfg.chain, s.cfg.p2p, id, blockRoots, func(blkAndSidecar *ethpb.SignedBeaconBlockAndBlobsSidecar) error {
blk, err := blocks.NewSignedBeaconBlock(blkAndSidecar.BeaconBlock)
if err != nil {
return err
}
blkRoot, err := blk.Block().HashTreeRoot()
if err != nil {
return err
}
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
if err := s.insertBlkAndBlobToQueue(blk.Block().Slot(), blk, blkRoot, blkAndSidecar.BlobsSidecar); err != nil {
return err
}
return nil
})
return err
}

View File

@@ -0,0 +1,103 @@
package sync
import (
"context"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"go.opencensus.io/trace"
)
func minimumRequestEpoch(finalized, current primitives.Epoch) primitives.Epoch {
// max(finalized_epoch, current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH)
denebFork := params.BeaconConfig().DenebForkEpoch
reqWindow := current - params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
if finalized >= reqWindow && finalized >= denebFork {
return finalized
}
if reqWindow >= finalized && reqWindow >= denebFork {
return reqWindow
}
return denebFork
}
// blobSidecarByRootRPCHandler handles the /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPC request.
// spec: https://github.com/ethereum/consensus-specs/blob/a7e45db9ac2b60a33e144444969ad3ac0aae3d4c/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.blobSidecarByRootRPCHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", p2p.BlobSidecarsByRootName[1:]) // slice the leading slash off the name var
ref, ok := msg.(*types.BlobSidecarsByRootReq)
if !ok {
return errors.New("message is not type BeaconBlockByRootsReq")
}
minReqEpoch := minimumRequestEpoch(s.cfg.chain.FinalizedCheckpt().Epoch, slots.ToEpoch(s.cfg.chain.CurrentSlot()))
blobIdents := *ref
for i := range blobIdents {
root, idx := bytesutil.ToBytes32(blobIdents[i].BlockRoot), blobIdents[i].Index
sc, err := s.blobs.BlobSidecar(root, idx)
if err != nil {
if errors.Is(err, db.ErrNotFound) {
continue
}
log.WithError(err).Debugf("error retrieving BlobSidecar, root=%x, idnex=%d", root, idx)
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
// If any root in the request content references a block earlier than minimum_request_epoch,
// peers MAY respond with error code 3: ResourceUnavailable or not include the blob in the response.
if slots.ToEpoch(sc.Slot) < minReqEpoch {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrBlobLTMinRequest.Error(), stream)
log.WithError(types.ErrBlobLTMinRequest).
Debugf("requested blob for block %#x before minimum_request_epoch", blobIdents[i].BlockRoot)
return types.ErrBlobLTMinRequest
}
SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return chunkErr
}
s.rateLimiter.add(stream, 1)
}
return nil
}
func (s *Service) sendBlocksAndSidecarsRequest(ctx context.Context, blockRoots *types.BeaconBlockByRootsReq, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
_, err := SendBlocksAndSidecarsByRootRequest(ctx, s.cfg.chain, s.cfg.p2p, id, blockRoots, func(blkAndSidecar *ethpb.SignedBeaconBlockAndBlobsSidecar) error {
blk, err := blocks.NewSignedBeaconBlock(blkAndSidecar.BeaconBlock)
if err != nil {
return err
}
blkRoot, err := blk.Block().HashTreeRoot()
if err != nil {
return err
}
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
if err := s.insertBlkAndBlobToQueue(blk.Block().Slot(), blk, blkRoot, blkAndSidecar.BlobsSidecar); err != nil {
return err
}
return nil
})
return err
}

View File

@@ -0,0 +1,373 @@
package sync
import (
"context"
"encoding/binary"
"io"
"math"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
ssz "github.com/prysmaticlabs/fastssz"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
p2ptest "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v3/config/fieldparams"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
type sidecarsTestCase struct {
name string
nblocks int // how many blocks to loop through in setting up test fixtures & requests
missing map[int]map[int]bool // skip this blob index, so that we can test different custody scenarios
expired map[int]bool // mark block expired to test scenarios where requests are outside retention window
chain *mock.ChainService // allow tests to control retention window via current slot and finalized checkpoint
total *int // allow a test to specify the total number of responses received
err error
}
func generateTestBlock(t *testing.T, slot types.Slot) *ethpb.SignedBeaconBlockDeneb {
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
parent := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength)
stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength)
receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength)
logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength)
tx := gethTypes.NewTransaction(
0,
common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"),
big.NewInt(0), 0, big.NewInt(0),
nil,
)
txs := []*gethTypes.Transaction{tx}
encodedBinaryTxs := make([][]byte, 1)
var err error
encodedBinaryTxs[0], err = txs[0].MarshalBinary()
require.NoError(t, err)
blockHash := bytesutil.ToBytes32([]byte("foo"))
payload := &enginev1.ExecutionPayloadDeneb{
ParentHash: parent,
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: stateRoot,
ReceiptsRoot: receiptsRoot,
LogsBloom: logsBloom,
PrevRandao: blockHash[:],
BlockNumber: 0,
GasLimit: 0,
GasUsed: 0,
Timestamp: 0,
ExtraData: make([]byte, 0),
BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength),
ExcessDataGas: bytesutil.PadTo([]byte("excessDataGas"), fieldparams.RootLength),
BlockHash: blockHash[:],
Transactions: encodedBinaryTxs,
}
block := util.NewBeaconBlockDeneb()
block.Block.Body.ExecutionPayload = payload
block.Block.Slot = slot
return block
}
func generateTestSidecar(root [32]byte, block *ethpb.SignedBeaconBlockDeneb, index int) *ethpb.BlobSidecar {
blob := &enginev1.Blob{
Data: make([]byte, fieldparams.BlobSize),
}
binary.LittleEndian.PutUint64(blob.Data, uint64(index))
return &ethpb.BlobSidecar{
BlockRoot: root[:],
Index: uint64(index),
Slot: block.Block.Slot,
BlockParentRoot: block.Block.ParentRoot[:],
ProposerIndex: block.Block.ProposerIndex,
Blob: blob,
KzgCommitment: make([]byte, 48),
KzgProof: make([]byte, 48),
}
}
type expectedResponse struct {
sidecar *ethpb.BlobSidecar
code uint8
message string
skipped bool
}
type streamDecoder func(io.Reader, ssz.Unmarshaler) error
func (r *expectedResponse) requireExpected(t *testing.T, d streamDecoder, stream network.Stream) {
if r.skipped {
return
}
code, _, err := ReadStatusCode(stream, &encoder.SszNetworkEncoder{})
require.NoError(t, err)
require.Equal(t, r.code, code, "unexpected response code")
//require.Equal(t, r.message, msg, "unexpected error message")
if r.sidecar == nil {
return
}
sc := &ethpb.BlobSidecar{}
require.NoError(t, d(stream, sc))
require.Equal(t, bytesutil.ToBytes32(sc.BlockRoot), bytesutil.ToBytes32(r.sidecar.BlockRoot))
require.Equal(t, sc.Index, r.sidecar.Index)
}
func (c sidecarsTestCase) run(t *testing.T) {
cfg := params.BeaconConfig()
repositionFutureEpochs(cfg)
undo, err := params.SetActiveWithUndo(cfg)
require.NoError(t, err)
defer func() {
require.NoError(t, undo())
}()
maxBlobs := int(params.BeaconConfig().MaxBlobsPerBlock)
if c.chain == nil {
c.chain = defaultMockChain(t)
}
db := &MockBlobDB{}
var req p2pTypes.BlobSidecarsByRootReq
var expect []*expectedResponse
oldest, err := slots.EpochStart(minimumRequestEpoch(c.chain.FinalizedCheckPoint.Epoch, slots.ToEpoch(c.chain.CurrentSlot())))
require.NoError(t, err)
streamTerminated := false
for i := 0; i < c.nblocks; i++ {
// check if there is a slot override for this index
// ie to create a block outside the minimum_request_epoch
var bs types.Slot
if c.expired[i] {
// the lowest possible bound of the retention period is the deneb epoch, so make sure
// the slot of an expired block is at least one slot less than the deneb epoch.
bs = oldest - 1 - types.Slot(i)
} else {
bs = oldest + types.Slot(i)
}
block := generateTestBlock(t, bs+types.Slot(i))
root, err := block.HashTreeRoot()
require.NoError(t, err)
binary.LittleEndian.PutUint64(root[:], uint64(i))
for bi := 0; bi < maxBlobs; bi++ {
ubi := uint64(bi)
req = append(req, &ethpb.BlobIdentifier{BlockRoot: root[:], Index: ubi})
if streamTerminated {
// once we know there is a bad response in the sequence, we want to filter out any subsequent
// expected responses, because an error response terminates the stream.
continue
}
// skip sidecars that are supposed to be missing
if missed, ok := c.missing[i]; ok && missed[bi] {
continue
}
sc := generateTestSidecar(root, block, bi)
require.NoError(t, db.WriteBlobSidecar(root, ubi, sc))
// if a sidecar is expired, we'll expect an error for the *first* index, and after that
// we'll expect no further chunks in the stream, so filter out any further expected responses.
// we don't need to check what index this is because we work through them in order and the first one
// will set streamTerminated = true and skip everything else in the test case.
if c.expired[i] {
expect = append(expect, &expectedResponse{
code: responseCodeResourceUnavailable,
message: p2pTypes.ErrBlobLTMinRequest.Error(),
})
streamTerminated = true
continue
}
expect = append(expect, &expectedResponse{
sidecar: sc,
code: responseCodeSuccess,
message: "",
})
}
}
rate := params.BeaconNetworkConfig().MaxRequestBlobsSidecars * params.BeaconConfig().MaxBlobsPerBlock
client := p2ptest.NewTestP2P(t)
s := &Service{
cfg: &config{p2p: client, chain: c.chain},
blobs: db,
rateLimiter: newRateLimiter(client)}
s.setRateCollector(p2p.RPCBlobSidecarsByRootTopicV1, leakybucket.NewCollector(0.000001, int64(rate), time.Second, false))
dec := s.cfg.p2p.Encoding().DecodeWithMaxLength
if c.total != nil {
require.Equal(t, *c.total, len(expect))
}
nh := func(stream network.Stream) {
for _, ex := range expect {
ex.requireExpected(t, dec, stream)
}
}
rht := &rpcHandlerTest{t: t, client: client, topic: p2p.RPCBlocksByRootTopicV1, timeout: time.Second * 10, err: c.err}
rht.testHandler(nh, s.blobSidecarByRootRPCHandler, &req)
}
type rpcHandlerTest struct {
t *testing.T
client *p2ptest.TestP2P
topic protocol.ID
timeout time.Duration
err error
}
func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, rhi interface{}) {
ctx, cancel := context.WithTimeout(context.Background(), rt.timeout)
defer func() {
cancel()
}()
w := util.NewWaiter()
server := p2ptest.NewTestP2P(rt.t)
rt.client.Connect(server)
defer func() {
require.NoError(rt.t, rt.client.Disconnect(server.PeerID()))
}()
require.Equal(rt.t, 1, len(rt.client.BHost.Network().Peers()), "Expected peers to be connected")
h := func(stream network.Stream) {
defer w.Done()
nh(stream)
}
server.BHost.SetStreamHandler(protocol.ID(rt.topic), h)
stream, err := rt.client.BHost.NewStream(ctx, server.BHost.ID(), protocol.ID(rt.topic))
require.NoError(rt.t, err)
err = rh(ctx, rhi, stream)
if rt.err == nil {
require.NoError(rt.t, err)
} else {
require.ErrorIs(rt.t, err, rt.err)
}
w.RequireDoneBeforeCancel(rt.t, ctx)
}
// we use max uints for future forks, but this causes overflows when computing slots
// so it is helpful in tests to temporarily reposition the epochs to give room for some math.
func repositionFutureEpochs(cfg *params.BeaconChainConfig) {
if cfg.CapellaForkEpoch == math.MaxUint64 {
cfg.CapellaForkEpoch = cfg.BellatrixForkEpoch + 100
}
if cfg.DenebForkEpoch == math.MaxUint64 {
cfg.DenebForkEpoch = cfg.CapellaForkEpoch + 100
}
}
func defaultMockChain(t *testing.T) *mock.ChainService {
df, err := forks.Fork(params.BeaconConfig().DenebForkEpoch)
require.NoError(t, err)
ce := params.BeaconConfig().DenebForkEpoch + params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
fe := ce - 2
cs, err := slots.EpochStart(ce)
require.NoError(t, err)
return &mock.ChainService{
ValidatorsRoot: [32]byte{},
Slot: &cs,
FinalizedCheckPoint: &ethpb.Checkpoint{Epoch: fe},
Fork: df}
}
func TestSidecarByRootValidation(t *testing.T) {
cfg := params.BeaconConfig()
repositionFutureEpochs(cfg)
undo, err := params.SetActiveWithUndo(cfg)
require.NoError(t, err)
defer func() {
require.NoError(t, undo())
}()
capellaSlot, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
require.NoError(t, err)
dmc := defaultMockChain(t)
dmc.Slot = &capellaSlot
dmc.FinalizedCheckPoint = &ethpb.Checkpoint{Epoch: params.BeaconConfig().CapellaForkEpoch}
cases := []sidecarsTestCase{
{
name: "block before minimum_request_epoch",
nblocks: 1,
expired: map[int]bool{0: true},
chain: dmc,
err: p2pTypes.ErrBlobLTMinRequest,
},
{
name: "blocks before and after minimum_request_epoch",
nblocks: 2,
expired: map[int]bool{0: true},
chain: dmc,
err: p2pTypes.ErrBlobLTMinRequest,
},
{
name: "one after minimum_request_epoch then one before",
nblocks: 2,
expired: map[int]bool{1: true},
chain: dmc,
err: p2pTypes.ErrBlobLTMinRequest,
},
{
name: "one missing index, one after minimum_request_epoch then one before",
nblocks: 3,
missing: map[int]map[int]bool{0: map[int]bool{0: true}},
expired: map[int]bool{1: true},
chain: dmc,
err: p2pTypes.ErrBlobLTMinRequest,
},
{
name: "2 missing indices from 2 different blocks",
nblocks: 3,
missing: map[int]map[int]bool{0: map[int]bool{0: true}, 2: map[int]bool{3: true}},
total: func(i int) *int { return &i }(3*int(params.BeaconConfig().MaxBlobsPerBlock) - 2), // aka 10
},
{
name: "all indices missing",
nblocks: 1,
missing: map[int]map[int]bool{0: map[int]bool{0: true, 1: true, 2: true, 3: true}},
total: func(i int) *int { return &i }(0), // aka 10
},
{
name: "block with all indices missing between 2 full blocks",
nblocks: 3,
missing: map[int]map[int]bool{1: map[int]bool{0: true, 1: true, 2: true, 3: true}},
total: func(i int) *int { return &i }(8), // aka 10
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
c.run(t)
})
}
}
func TestSidecarsByRootOK(t *testing.T) {
cases := []sidecarsTestCase{
{
name: "0 blob",
nblocks: 0,
},
{
name: "1 blob",
nblocks: 1,
},
{
name: "2 blob",
nblocks: 2,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
c.run(t)
})
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
// chunkBlockWriter writes the given message as a chunked response to the given network
@@ -107,6 +108,25 @@ func WriteBlobsSidecarChunk(stream libp2pcore.Stream, chain blockchain.ChainInfo
return err
}
// WriteBlobSidecarChunk writes blob chunk object to stream.
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func WriteBlobSidecarChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, sidecar *ethpb.BlobSidecar) error {
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
valRoot := chain.GenesisValidatorsRoot()
ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.GetSlot()), valRoot[:])
if err != nil {
return err
}
if err := writeContextToStream(ctxBytes[:], stream, chain); err != nil {
return err
}
_, err = encoding.EncodeWithMaxLength(stream, sidecar)
return err
}
func WriteBlockAndBlobsSidecarChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, b *ethpb.SignedBeaconBlockAndBlobsSidecar) error {
SetStreamWriteDeadline(stream, defaultWriteDuration)
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {

View File

@@ -132,6 +132,50 @@ func SendBeaconBlocksByRootRequest(
return blocks, nil
}
func SendSidecarsByRoot(
ctx context.Context, ci blockchain.ChainInfoFetcher, p2pApi p2p.P2P, pid peer.ID,
req *p2ptypes.BlobSidecarsByRootReq, processor BlobsSidecarProcessor,
) ([]*pb.BlobsSidecar, error) {
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRootName, slots.ToEpoch(ci.CurrentSlot()))
if err != nil {
return nil, err
}
stream, err := p2pApi.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
}
defer closeStream(stream, log)
sidecars := make([]*pb.BlobsSidecar, 0, len(*req))
process := func(s *pb.BlobsSidecar) error {
sidecars = append(sidecars, s)
if processor != nil {
return processor(s)
}
return nil
}
max := params.BeaconNetworkConfig().MaxRequestBlobsSidecars * params.BeaconConfig().MaxBlobsPerBlock
for i := 0; i < len(*req); i++ {
// Exit if peer sends more than MAX_REQUEST_BLOBS_SIDECARS.
if uint64(i) >= max {
break
}
isFirstChunk := i == 0
sc, err := ReadChunkedBlobsSidecar(stream, ci, p2pApi, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
if err := process(sc); err != nil {
return nil, err
}
}
return sidecars, nil
}
func SendBlocksAndSidecarsByRootRequest(
ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID,
req *p2ptypes.BeaconBlockByRootsReq, processor BeaconBlockAndSidecarProcessor,

View File

@@ -34,6 +34,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
lruwrpr "github.com/prysmaticlabs/prysm/v3/cache/lru"
"github.com/prysmaticlabs/prysm/v3/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/runtime"
prysmTime "github.com/prysmaticlabs/prysm/v3/time"
@@ -139,6 +140,7 @@ type Service struct {
syncContributionBitsOverlapLock sync.RWMutex
syncContributionBitsOverlapCache *lru.Cache
signatureChan chan *signatureVerifier
blobs BlobDB
}
// NewService initializes new regular sync service.
@@ -289,6 +291,10 @@ func (s *Service) registerHandlers() {
}
}
func (s *Service) setRateCollector(topic string, c *leakybucket.Collector) {
s.rateLimiter.limiterMap[topic] = c
}
// marks the chain as having started.
func (s *Service) markForChainStart() {
s.chainStarted.Set()

View File

@@ -26,4 +26,5 @@ const (
SyncCommitteeAggregationBytesLength = 16 // SyncCommitteeAggregationBytesLength defines the length of sync committee aggregate bytes.
SyncAggregateSyncCommitteeBytesLength = 64 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate.
MaxWithdrawalsPerPayload = 16 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload.
BlobSize = 131072 // defined to match blobs.size in bazel ssz codegen
)

View File

@@ -26,4 +26,5 @@ const (
SyncCommitteeAggregationBytesLength = 1 // SyncCommitteeAggregationBytesLength defines the sync committee aggregate bytes.
SyncAggregateSyncCommitteeBytesLength = 4 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate.
MaxWithdrawalsPerPayload = 4 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload.
BlobSize = 131072 // defined to match blobs.size in bazel ssz codegen
)

View File

@@ -213,6 +213,9 @@ type BeaconChainConfig struct {
// Execution engine timeout value
ExecutionEngineTimeoutValue uint64 // ExecutionEngineTimeoutValue defines the seconds to wait before timing out engine endpoints with execution payload execution semantics (newPayload, forkchoiceUpdated).
// Deneb presets
MaxBlobsPerBlock uint64
}
// InitializeForkSchedule initializes the schedules forks baked into the config.

View File

@@ -268,6 +268,9 @@ var mainnetBeaconConfig = &BeaconChainConfig{
// Execution engine timeout value
ExecutionEngineTimeoutValue: 8, // 8 seconds default based on: https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#core
// deneb presets
MaxBlobsPerBlock: 4,
}
// MainnetTestConfig provides a version of the mainnet config that has a different name

View File

@@ -1,7 +1,9 @@
package util
import (
"context"
"sync"
"testing"
"time"
)
@@ -20,3 +22,35 @@ func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
return true
}
}
type Waiter struct {
c chan struct{}
}
func NewWaiter() *Waiter {
return &Waiter{
c: make(chan struct{}),
}
}
func (w *Waiter) Done() {
close(w.c)
}
func (w *Waiter) RequireDoneAfter(t *testing.T, timeout time.Duration) {
select {
case <-w.c:
return
case <-time.After(timeout):
t.Fatalf("timeout after %s", timeout)
}
}
func (w *Waiter) RequireDoneBeforeCancel(t *testing.T, ctx context.Context) {
select {
case <-w.c:
return
case <-ctx.Done():
t.Fatalf("context canceled before Done with error=%s", ctx.Err())
}
}