Compare commits

...

3 Commits

Author SHA1 Message Date
Preston Van Loon
cc8d5e4d58 Changelog fragment 2025-07-01 14:01:23 -05:00
Preston Van Loon
2d38f7559c Update beacon-chain/execution/engine_client.go to return the underlying error message from the EL too. 2025-07-01 13:57:39 -05:00
Manu NALEPA
bc7664321b Implement the new Fulu Metadata. (#15440) 2025-07-01 07:07:32 +00:00
7 changed files with 294 additions and 167 deletions

View File

@@ -2,6 +2,7 @@ package execution
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
@@ -26,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/holiman/uint256"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
@@ -158,7 +158,7 @@ func (s *Service) NewPayload(ctx context.Context, payload interfaces.ExecutionDa
} else {
flattenedRequests, err := pb.EncodeExecutionRequests(executionRequests)
if err != nil {
return nil, errors.Wrap(err, "failed to encode execution requests")
return nil, fmt.Errorf("failed to encode execution requests: %w", err)
}
err = s.rpcClient.CallContext(ctx, result, NewPayloadMethodV4, payloadPb, versionedHashes, parentBlockRoot, flattenedRequests)
if err != nil {
@@ -346,14 +346,14 @@ func (s *Service) GetTerminalBlockHash(ctx context.Context, transitionTime uint6
ttd.SetString(params.BeaconConfig().TerminalTotalDifficulty, 10)
terminalTotalDifficulty, overflows := uint256.FromBig(ttd)
if overflows {
return nil, false, errors.New("could not convert terminal total difficulty to uint256")
return nil, false, fmt.Errorf("could not convert terminal total difficulty to uint256")
}
blk, err := s.LatestExecutionBlock(ctx)
if err != nil {
return nil, false, errors.Wrap(err, "could not get latest execution block")
return nil, false, fmt.Errorf("could not get latest execution block: %w", err)
}
if blk == nil {
return nil, false, errors.New("latest execution block is nil")
return nil, false, fmt.Errorf("latest execution block is nil")
}
for {
@@ -362,7 +362,7 @@ func (s *Service) GetTerminalBlockHash(ctx context.Context, transitionTime uint6
}
currentTotalDifficulty, err := tDStringToUint256(blk.TotalDifficulty)
if err != nil {
return nil, false, errors.Wrap(err, "could not convert total difficulty to uint256")
return nil, false, fmt.Errorf("could not convert total difficulty to uint256: %w", err)
}
blockReachedTTD := currentTotalDifficulty.Cmp(terminalTotalDifficulty) >= 0
@@ -372,16 +372,16 @@ func (s *Service) GetTerminalBlockHash(ctx context.Context, transitionTime uint6
}
parentBlk, err := s.ExecutionBlockByHash(ctx, parentHash, false /* no txs */)
if err != nil {
return nil, false, errors.Wrap(err, "could not get parent execution block")
return nil, false, fmt.Errorf("could not get parent execution block: %w", err)
}
if parentBlk == nil {
return nil, false, errors.New("parent execution block is nil")
return nil, false, fmt.Errorf("parent execution block is nil")
}
if blockReachedTTD {
parentTotalDifficulty, err := tDStringToUint256(parentBlk.TotalDifficulty)
if err != nil {
return nil, false, errors.Wrap(err, "could not convert total difficulty to uint256")
return nil, false, fmt.Errorf("could not convert total difficulty to uint256: %w", err)
}
// If terminal block has time same timestamp or greater than transition time,
@@ -515,7 +515,7 @@ func (s *Service) ReconstructFullBlock(
return nil, err
}
if len(reconstructed) != 1 {
return nil, errors.Errorf("could not retrieve the correct number of payload bodies: wanted 1 but got %d", len(reconstructed))
return nil, fmt.Errorf("could not retrieve the correct number of payload bodies: wanted 1 but got %d", len(reconstructed))
}
return reconstructed[0], nil
}
@@ -544,7 +544,7 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
blockBody := block.Block().Body()
kzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "could not get blob KZG commitments")
return nil, fmt.Errorf("could not get blob KZG commitments: %w", err)
}
// Collect KZG hashes for non-existing blobs
@@ -563,7 +563,7 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
// Fetch blobs from EL
blobs, err := s.GetBlobs(ctx, kzgHashes)
if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
return nil, fmt.Errorf("could not get blobs: %w", err)
}
if len(blobs) == 0 {
return nil, nil
@@ -571,7 +571,7 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
header, err := block.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get header")
return nil, fmt.Errorf("could not get header: %w", err)
}
// Reconstruct verified blob sidecars
@@ -619,17 +619,17 @@ func fullPayloadFromPayloadBody(
header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int,
) (interfaces.ExecutionData, error) {
if header == nil || header.IsNil() || body == nil {
return nil, errors.New("execution block and header cannot be nil")
return nil, fmt.Errorf("execution block and header cannot be nil")
}
if bVersion >= version.Deneb {
ebg, err := header.ExcessBlobGas()
if err != nil {
return nil, errors.Wrap(err, "unable to extract ExcessBlobGas attribute from execution payload header")
return nil, fmt.Errorf("unable to extract ExcessBlobGas attribute from execution payload header: %w", err)
}
bgu, err := header.BlobGasUsed()
if err != nil {
return nil, errors.Wrap(err, "unable to extract BlobGasUsed attribute from execution payload header")
return nil, fmt.Errorf("unable to extract BlobGasUsed attribute from execution payload header: %w", err)
}
return blocks.WrappedExecutionPayloadDeneb(
&pb.ExecutionPayloadDeneb{
@@ -713,45 +713,45 @@ func handleRPCError(err error) error {
"here https://docs.prylabs.network/docs/execution-node/authentication")
return fmt.Errorf("could not authenticate connection to execution client: %w", err)
}
return errors.Wrapf(err, "got an unexpected error in JSON-RPC response")
return fmt.Errorf("got an unexpected error in JSON-RPC response: %w", err)
}
switch e.ErrorCode() {
case -32700:
errParseCount.Inc()
return ErrParse
return errors.Join(errors.New(e.Error()), ErrParse)
case -32600:
errInvalidRequestCount.Inc()
return ErrInvalidRequest
return errors.Join(errors.New(e.Error()), ErrInvalidRequest)
case -32601:
errMethodNotFoundCount.Inc()
return ErrMethodNotFound
return errors.Join(errors.New(e.Error()), ErrMethodNotFound)
case -32602:
errInvalidParamsCount.Inc()
return ErrInvalidParams
return errors.Join(errors.New(e.Error()), ErrInvalidParams)
case -32603:
errInternalCount.Inc()
return ErrInternal
return errors.Join(errors.New(e.Error()), ErrInternal)
case -38001:
errUnknownPayloadCount.Inc()
return ErrUnknownPayload
return errors.Join(errors.New(e.Error()), ErrUnknownPayload)
case -38002:
errInvalidForkchoiceStateCount.Inc()
return ErrInvalidForkchoiceState
return errors.Join(errors.New(e.Error()), ErrInvalidForkchoiceState)
case -38003:
errInvalidPayloadAttributesCount.Inc()
return ErrInvalidPayloadAttributes
return errors.Join(errors.New(e.Error()), ErrInvalidPayloadAttributes)
case -38004:
errRequestTooLargeCount.Inc()
return ErrRequestTooLarge
return errors.Join(errors.New(e.Error()), ErrRequestTooLarge)
case -32000:
errServerErrorCount.Inc()
// Only -32000 status codes are data errors in the RPC specification.
var errWithData gethRPC.DataError
ok := errors.As(err, &errWithData)
if !ok {
return errors.Wrapf(err, "got an unexpected error in JSON-RPC response")
return fmt.Errorf("got an unexpected error in JSON-RPC response: %w", err)
}
return errors.Wrapf(ErrServer, "%v", errWithData.Error())
return fmt.Errorf("%v: %w", errWithData.Error(), ErrServer)
default:
return err
}
@@ -778,7 +778,7 @@ func tDStringToUint256(td string) (*uint256.Int, error) {
}
i, overflows := uint256.FromBig(b)
if overflows {
return nil, errors.New("total difficulty overflowed")
return nil, fmt.Errorf("total difficulty overflowed")
}
return i, nil
}
@@ -831,7 +831,7 @@ func EmptyExecutionPayload(v int) (proto.Message, error) {
}, nil
}
return nil, errors.Wrapf(ErrUnsupportedVersion, "version=%s", version.String(v))
return nil, fmt.Errorf("version=%s: %w", version.String(v), ErrUnsupportedVersion)
}
func EmptyExecutionPayloadHeader(v int) (proto.Message, error) {
@@ -881,7 +881,7 @@ func EmptyExecutionPayloadHeader(v int) (proto.Message, error) {
}, nil
}
return nil, errors.Wrapf(ErrUnsupportedVersion, "version=%s", version.String(v))
return nil, fmt.Errorf("version=%s: %w", version.String(v), ErrUnsupportedVersion)
}
func toBlockNumArg(number *big.Int) string {

View File

@@ -55,9 +55,10 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
// Goodbye Message
topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// MetadataV0 Message
// Metadata Message
topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCMetaDataTopicV3)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// Ping Message
topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// Status Message

View File

@@ -17,7 +17,7 @@ import (
func TestNewRateLimiter(t *testing.T) {
rlimiter := newRateLimiter(mockp2p.NewTestP2P(t))
assert.Equal(t, len(rlimiter.limiterMap), 18, "correct number of topics not registered")
assert.Equal(t, len(rlimiter.limiterMap), 19, "correct number of topics not registered")
}
func TestNewRateLimiter_FreeCorrectly(t *testing.T) {

View File

@@ -17,7 +17,7 @@ import (
"github.com/prysmaticlabs/go-bitfield"
)
// metaDataHandler reads the incoming metadata rpc request from the peer.
// metaDataHandler reads the incoming metadata RPC request from the peer.
func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2pcore.Stream) error {
SetRPCStreamDeadlines(stream)
@@ -70,7 +70,9 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
switch streamVersion {
case p2p.SchemaVersionV1:
switch metadataVersion {
case version.Altair, version.Deneb:
case version.Altair, version.Fulu:
// If the stream version corresponds to Phase 0 but our metadata
// corresponds to Altair or Fulu, convert our metadata to the Phase 0 one.
metadata = wrapper.WrappedMetadataV0(
&pb.MetaDataV0{
Attnets: metadata.AttnetsBitfield(),
@@ -81,13 +83,18 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
case p2p.SchemaVersionV2:
switch metadataVersion {
case version.Phase0:
// If the stream version corresponds to Altair but our metadata
// corresponds to Phase 0, convert our metadata to the Altair one,
// and use a zeroed syncnets bitfield.
metadata = wrapper.WrappedMetadataV1(
&pb.MetaDataV1{
Attnets: metadata.AttnetsBitfield(),
SeqNumber: metadata.SequenceNumber(),
Syncnets: bitfield.Bitvector4{byte(0x00)},
})
case version.Deneb:
case version.Fulu:
// If the stream version corresponds to Altair but our metadata
// corresponds to Fulu, convert our metadata to the Altair one.
metadata = wrapper.WrappedMetadataV1(
&pb.MetaDataV1{
Attnets: metadata.AttnetsBitfield(),
@@ -95,6 +102,32 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
Syncnets: metadata.SyncnetsBitfield(),
})
}
case p2p.SchemaVersionV3:
switch metadataVersion {
case version.Phase0:
// If the stream version corresponds to Fulu but our metadata
// corresponds to Phase 0, convert our metadata to the Fulu one,
// and use a zeroed syncnets bitfield and custody group count.
metadata = wrapper.WrappedMetadataV2(
&pb.MetaDataV2{
Attnets: metadata.AttnetsBitfield(),
SeqNumber: metadata.SequenceNumber(),
Syncnets: bitfield.Bitvector4{byte(0x00)},
CustodyGroupCount: 0,
})
case version.Altair:
// If the stream version corresponds to Fulu but our metadata
// corresponds to Altair, convert our metadata to the Fulu one and
// use a zeroed custody group count.
metadata = wrapper.WrappedMetadataV2(
&pb.MetaDataV2{
Attnets: metadata.AttnetsBitfield(),
SeqNumber: metadata.SequenceNumber(),
Syncnets: metadata.SyncnetsBitfield(),
CustodyGroupCount: 0,
})
}
}
// Write the METADATA response into the stream.
@@ -164,12 +197,14 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
}
// Defensive check to ensure valid objects are being sent.
topicVersion := ""
var topicVersion string
switch msg.Version() {
case version.Phase0:
topicVersion = p2p.SchemaVersionV1
case version.Altair:
topicVersion = p2p.SchemaVersionV2
case version.Fulu:
topicVersion = p2p.SchemaVersionV3
}
// Validate the version of the topic.

View File

@@ -1,6 +1,7 @@
package sync
import (
"context"
"sync"
"testing"
"time"
@@ -15,6 +16,7 @@ import (
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/encoding/ssz/equality"
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
@@ -22,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/prysmaticlabs/go-bitfield"
)
func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) {
@@ -76,158 +79,241 @@ func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) {
}
}
func TestMetadataRPCHandler_SendsMetadata(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
bitfield := [8]byte{'A', 'B'}
p2.LocalMetadata = wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: 2,
Attnets: bitfield[:],
})
// Set up a head state in the database with data we expect.
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
d := db.SetupDB(t)
r := &Service{
func createService(peer p2p.P2P, chain *mock.ChainService) *Service {
return &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
p2p: peer,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
rateLimiter: newRateLimiter(p1),
}
r2 := &Service{
cfg: &config{
beaconDB: d,
p2p: p2,
chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}},
},
rateLimiter: newRateLimiter(p2),
}
// Setup streams
pcl := protocol.ID(p2p.RPCMetaDataTopicV1 + r.cfg.p2p.Encoding().ProtocolSuffix())
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
assert.NoError(t, r2.metaDataHandler(t.Context(), new(interface{}), stream))
})
md, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID())
assert.NoError(t, err)
if !equality.DeepEqual(md.InnerObject(), p2.LocalMetadata.InnerObject()) {
t.Fatalf("MetadataV0 unequal, received %v but wanted %v", md, p2.LocalMetadata)
}
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
conns := p1.BHost.Network().ConnsToPeer(p2.BHost.ID())
if len(conns) == 0 {
t.Error("Peer is disconnected despite receiving a valid ping")
rateLimiter: newRateLimiter(peer),
}
}
func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) {
func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
const (
requestTimeout = 1 * time.Second
seqNumber = 2
custodyGroupCount = 4
)
attnets := []byte{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'}
syncnets := []byte{0x4}
// Configure the test beacon chain.
params.SetupTestConfigCleanup(t)
bCfg := params.BeaconConfig().Copy()
bCfg.AltairForkEpoch = 5
params.OverrideBeaconConfig(bCfg)
beaconChainConfig := params.BeaconConfig().Copy()
beaconChainConfig.AltairForkEpoch = 5
beaconChainConfig.FuluForkEpoch = 15
params.OverrideBeaconConfig(beaconChainConfig)
params.BeaconConfig().InitializeForkSchedule()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
bitfield := [8]byte{'A', 'B'}
p2.LocalMetadata = wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: 2,
Attnets: bitfield[:],
})
// Compute the number of seconds in an epoch.
secondsPerEpoch := oneEpoch()
// Set up a head state in the database with data we expect.
d := db.SetupDB(t)
chain := &mock.ChainService{Genesis: time.Now().Add(-5 * oneEpoch()), ValidatorsRoot: [32]byte{}}
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
testCases := []struct {
name string
topic string
epochsSinceGenesisPeer1, epochsSinceGenesisPeer2 int
metadataPeer2, expected metadata.Metadata
}{
{
name: "Phase0-Phase0",
topic: p2p.RPCMetaDataTopicV1,
epochsSinceGenesisPeer1: 0,
epochsSinceGenesisPeer2: 0,
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
Attnets: attnets,
}),
expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
Attnets: attnets,
}),
},
rateLimiter: newRateLimiter(p1),
}
chain2 := &mock.ChainService{Genesis: time.Now().Add(-5 * oneEpoch()), ValidatorsRoot: [32]byte{}}
r2 := &Service{
cfg: &config{
beaconDB: d,
p2p: p2,
chain: chain2,
clock: startup.NewClock(chain2.Genesis, chain2.ValidatorsRoot),
{
name: "Phase0-Altair",
topic: p2p.RPCMetaDataTopicV1,
epochsSinceGenesisPeer1: 0,
epochsSinceGenesisPeer2: 5,
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
}),
expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
Attnets: attnets,
}),
},
{
name: "Phase0-Fulu",
topic: p2p.RPCMetaDataTopicV1,
epochsSinceGenesisPeer1: 0,
epochsSinceGenesisPeer2: 15,
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
CustodyGroupCount: custodyGroupCount,
}),
expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
Attnets: attnets,
}),
},
{
name: "Altair-Phase0",
topic: p2p.RPCMetaDataTopicV2,
epochsSinceGenesisPeer1: 5,
epochsSinceGenesisPeer2: 0,
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
Attnets: attnets,
}),
expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: bitfield.Bitvector4{byte(0x00)},
}),
},
{
name: "Altair-Altair",
topic: p2p.RPCMetaDataTopicV2,
epochsSinceGenesisPeer1: 5,
epochsSinceGenesisPeer2: 5,
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
}),
expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
}),
},
{
name: "Altair-Fulu",
topic: p2p.RPCMetaDataTopicV2,
epochsSinceGenesisPeer1: 5,
epochsSinceGenesisPeer2: 15,
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
CustodyGroupCount: custodyGroupCount,
}),
expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
}),
},
{
name: "Fulu-Phase0",
topic: p2p.RPCMetaDataTopicV3,
epochsSinceGenesisPeer1: 15,
epochsSinceGenesisPeer2: 0,
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
Attnets: attnets,
}),
expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: bitfield.Bitvector4{byte(0x00)},
CustodyGroupCount: 0,
}),
},
{
name: "Fulu-Altair",
topic: p2p.RPCMetaDataTopicV3,
epochsSinceGenesisPeer1: 15,
epochsSinceGenesisPeer2: 5,
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
}),
expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
CustodyGroupCount: 0,
}),
},
{
name: "Fulu-Fulu",
topic: p2p.RPCMetaDataTopicV3,
epochsSinceGenesisPeer1: 15,
epochsSinceGenesisPeer2: 15,
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
CustodyGroupCount: custodyGroupCount,
}),
expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
Syncnets: syncnets,
CustodyGroupCount: custodyGroupCount,
}),
},
rateLimiter: newRateLimiter(p2),
}
// Setup streams
pcl := protocol.ID(p2p.RPCMetaDataTopicV2 + r.cfg.p2p.Encoding().ProtocolSuffix())
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var wg sync.WaitGroup
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
err := r2.metaDataHandler(t.Context(), new(interface{}), stream)
assert.NoError(t, err)
})
ctx := context.Background()
_, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID())
assert.NoError(t, err)
// Setup and connect peers.
peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
peer1.Connect(peer2)
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
// Ensure the peers are connected.
peersCount := len(peer1.BHost.Network().Peers())
require.Equal(t, 1, peersCount, "Expected peers to be connected")
// Fix up peer with the correct metadata.
p2.LocalMetadata = wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: 2,
Attnets: bitfield[:],
Syncnets: []byte{0x0},
})
// Setup sync services.
genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch)
genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch)
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
assert.NoError(t, r2.metaDataHandler(t.Context(), new(interface{}), stream))
})
chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}}
chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}}
md, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID())
assert.NoError(t, err)
servicePeer1 := createService(peer1, chainPeer1)
servicePeer2 := createService(peer2, chainPeer2)
if !equality.DeepEqual(md.InnerObject(), p2.LocalMetadata.InnerObject()) {
t.Fatalf("MetadataV1 unequal, received %v but wanted %v", md, p2.LocalMetadata)
}
// Define the behavior of peer2 when receiving a METADATA request.
protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix()
protocolID := protocol.ID(tc.topic + protocolSuffix)
peer2.LocalMetadata = tc.metadataPeer2
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
wg.Add(1)
peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream)
require.NoError(t, err)
})
conns := p1.BHost.Network().ConnsToPeer(p2.BHost.ID())
if len(conns) == 0 {
t.Error("Peer is disconnected despite receiving a valid ping")
// Send a METADATA request from peer1 to peer2.
actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID())
require.NoError(t, err)
// Wait until the METADATA request is received by peer2 or timeout.
timeOutReached := util.WaitTimeout(&wg, requestTimeout)
require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout")
// Compare the received METADATA object with the expected METADATA object.
require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal")
// Ensure the peers are still connected.
peersCount = len(peer1.BHost.Network().Peers())
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
})
}
}

View File

@@ -0,0 +1,2 @@
### Added
- PeerDAS: Implement the new Fulu Metadata.

View File

@@ -0,0 +1,3 @@
### Changed
- Update beacon-chain/execution/engine_client.go to return the underlying error message from the EL