Randomize bls changes at fork (#11923)

* Rate Limit broadcasting of BLS changes

* Rate limit BLS changes broadcasting

* random

* linting

* mock test

Co-authored-by: terencechain <terence@prysmaticlabs.com>
This commit is contained in:
Potuz
2023-01-27 00:13:28 +01:00
committed by GitHub
parent 77d3ccb9ad
commit 56907bb2c6
11 changed files with 142 additions and 30 deletions

View File

@@ -71,6 +71,9 @@ func (mb *mockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ ui
return nil
}
func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) {
}
var _ p2p.Broadcaster = (*mockBroadcaster)(nil)
func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {

View File

@@ -19,6 +19,8 @@ import (
"google.golang.org/protobuf/proto"
)
const broadcastBLSChangesRateLimit = 128
// ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the
// GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
@@ -52,6 +54,36 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))
}
// BroadcastBLSChanges spins up a go routine that rate limits and broadcasts BLS
// to execution changes at prescribed intervals.
func (s *Service) BroadcastBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) {
go s.broadcastBLSChanges(ctx, changes)
}
func (s *Service) broadcastBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) {
ticker := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
limit := broadcastBLSChangesRateLimit
if len(changes) < broadcastBLSChangesRateLimit {
limit = len(changes)
}
for _, ch := range changes[:limit] {
if err := s.Broadcast(ctx, ch); err != nil {
log.WithError(err).Error("could not broadcast BLS to execution changes.")
}
}
changes = changes[limit:]
if len(changes) == 0 {
return
}
}
}
}
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error {

View File

@@ -24,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
logTest "github.com/sirupsen/logrus/hooks/test"
"google.golang.org/protobuf/proto"
)
@@ -439,3 +440,56 @@ func TestService_BroadcastSyncCommittee(t *testing.T) {
t.Error("Failed to receive pubsub within 1s")
}
}
func TestBroadcastBLSChanges(t *testing.T) {
logHook := logTest.NewGlobal()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.BHost.Network().Peers()) == 0 {
t.Fatal("No peers")
}
p := &Service{
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{},
ctx: context.Background(),
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
}
message := &ethpb.SignedBLSToExecutionChange{
Message: &ethpb.BLSToExecutionChange{
ValidatorIndex: 1,
FromBlsPubkey: bytesutil.PadTo([]byte("pubkey1"), 48),
ToExecutionAddress: bytesutil.PadTo([]byte("address1"), 20),
},
Signature: bytesutil.PadTo([]byte("signature1"), 96),
}
message2 := &ethpb.SignedBLSToExecutionChange{
Message: &ethpb.BLSToExecutionChange{
ValidatorIndex: 1,
FromBlsPubkey: bytesutil.PadTo([]byte("pubkey1"), 48),
ToExecutionAddress: bytesutil.PadTo([]byte("address1"), 20),
},
Signature: bytesutil.PadTo([]byte("signature1"), 96),
}
messages := make([]*ethpb.SignedBLSToExecutionChange, 200)
for i := 0; i < 128; i++ {
messages[i] = message
}
for i := 128; i < 200; i++ {
messages[i] = message2
}
p.broadcastBLSChanges(context.Background(), messages)
require.LogsDoNotContain(t, logHook, "could not")
}

View File

@@ -35,6 +35,7 @@ type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBLSChanges(context.Context, []*ethpb.SignedBLSToExecutionChange)
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -143,6 +143,10 @@ func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *
return nil
}
// BroadcastBLSChanges mocks a broadcast BLS change ocurred
func (_ *FakeP2P) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) {
}
// InterceptPeerDial -- fake.
func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true

View File

@@ -4,6 +4,7 @@ import (
"context"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
@@ -33,3 +34,13 @@ func (m *MockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uin
m.BroadcastCalled = true
return nil
}
// BroadcastBLSChanges mocks a broadcast BLS change ocurred
func (m *MockBroadcaster) BroadcastBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) {
for _, change := range changes {
err := m.Broadcast(ctx, change)
if err != nil {
log.WithError(err).Error("could not broadcast Signed BLS change")
}
}
}

View File

@@ -176,6 +176,14 @@ func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *
return nil
}
// BroadcastBLSChanges mocks a broadcast BLS change ocurred
func (p *TestP2P) BroadcastBLSChanges(_ context.Context, changes []*ethpb.SignedBLSToExecutionChange) {
if len(changes) > 0 {
p.BroadcastCalled = true
return
}
}
// SetStreamHandler for RPC.
func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) {
p.BHost.SetStreamHandler(protocol.ID(topic), handler)

View File

@@ -317,6 +317,8 @@ func (bs *Server) SubmitSignedBLSToExecutionChanges(ctx context.Context, req *et
return nil, status.Errorf(codes.Internal, "Could not get head state: %v", err)
}
var failures []*helpers.SingleIndexedVerificationFailure
var toBroadcast []*ethpbalpha.SignedBLSToExecutionChange
for i, change := range req.GetChanges() {
alphaChange := migration.V2SignedBLSToExecutionChangeToV1Alpha1(change)
_, err = blocks.ValidateBLSToExecutionChange(st, alphaChange)
@@ -342,15 +344,10 @@ func (bs *Server) SubmitSignedBLSToExecutionChanges(ctx context.Context, req *et
})
bs.BLSChangesPool.InsertBLSToExecChange(alphaChange)
if st.Version() >= version.Capella {
if err := bs.Broadcaster.Broadcast(ctx, alphaChange); err != nil {
failures = append(failures, &helpers.SingleIndexedVerificationFailure{
Index: i,
Message: "Could not broadcast BLSToExecutionChange: " + err.Error(),
})
continue
}
toBroadcast = append(toBroadcast, alphaChange)
}
}
bs.Broadcaster.BroadcastBLSChanges(ctx, toBroadcast)
if len(failures) > 0 {
failuresContainer := &helpers.IndexedVerificationFailure{Failures: failures}
err := grpc.AppendCustomErrorHeader(ctx, failuresContainer)

View File

@@ -1,29 +1,36 @@
package sync
import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/crypto/rand"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
// This routine broadcasts all known BLS changes at the Capella fork.
func (s *Service) broadcastBLSChanges(currSlot primitives.Slot) error {
// This routine broadcasts known BLS changes at the Capella fork.
func (s *Service) broadcastBLSChanges(currSlot types.Slot) {
capellaSlotStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
if err != nil {
// only possible error is an overflow, so we exit early from the method
return nil
return
}
if currSlot == capellaSlotStart {
changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges()
if err != nil {
return errors.Wrap(err, "could not get BLS to execution changes")
}
for _, ch := range changes {
if err := s.cfg.p2p.Broadcast(s.ctx, ch); err != nil {
return errors.Wrap(err, "could not broadcast BLS to execution changes.")
}
}
if currSlot != capellaSlotStart {
return
}
return nil
changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges()
if err != nil {
log.WithError(err).Error("could not get BLS to execution changes")
}
if len(changes) == 0 {
return
}
source := rand.NewGenerator()
broadcastChanges := make([]*ethpb.SignedBLSToExecutionChange, len(changes))
for i := 0; i < len(changes); i++ {
idx := source.Intn(len(changes))
broadcastChanges[i] = changes[idx]
changes = append(changes[:idx], changes[idx+1:]...)
}
s.cfg.p2p.BroadcastBLSChanges(s.ctx, broadcastChanges)
}

View File

@@ -44,7 +44,5 @@ func TestBroadcastBLSChanges(t *testing.T) {
capellaStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
require.NoError(t, err)
require.NoError(t, s.broadcastBLSChanges(capellaStart))
require.NoError(t, s.broadcastBLSChanges(capellaStart+1))
s.broadcastBLSChanges(capellaStart + 1)
}

View File

@@ -29,10 +29,7 @@ func (s *Service) forkWatcher() {
continue
}
// Broadcast BLS changes at the Capella fork boundary
if err := s.broadcastBLSChanges(currSlot); err != nil {
log.WithError(err).Error("Unable to broadcast BLS to execution changes")
continue
}
s.broadcastBLSChanges(currSlot)
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")