diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 22503a203f..16c4bfba8b 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -71,6 +71,9 @@ func (mb *mockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ ui return nil } +func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) { +} + var _ p2p.Broadcaster = (*mockBroadcaster)(nil) func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index a2d739d93e..41b0eb52ca 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -19,6 +19,8 @@ import ( "google.golang.org/protobuf/proto" ) +const broadcastBLSChangesRateLimit = 128 + // ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the // GossipTypeMapping. var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic") @@ -52,6 +54,36 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error { return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest)) } +// BroadcastBLSChanges spins up a go routine that rate limits and broadcasts BLS +// to execution changes at prescribed intervals. +func (s *Service) BroadcastBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) { + go s.broadcastBLSChanges(ctx, changes) +} + +func (s *Service) broadcastBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) { + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + limit := broadcastBLSChangesRateLimit + if len(changes) < broadcastBLSChangesRateLimit { + limit = len(changes) + } + for _, ch := range changes[:limit] { + if err := s.Broadcast(ctx, ch); err != nil { + log.WithError(err).Error("could not broadcast BLS to execution changes.") + } + } + changes = changes[limit:] + if len(changes) == 0 { + return + } + } + } +} + // BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be // broadcasted to the current fork. func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error { diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 548c7cec53..42e8a1d64a 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -24,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/testing/assert" "github.com/prysmaticlabs/prysm/v3/testing/require" "github.com/prysmaticlabs/prysm/v3/testing/util" + logTest "github.com/sirupsen/logrus/hooks/test" "google.golang.org/protobuf/proto" ) @@ -439,3 +440,56 @@ func TestService_BroadcastSyncCommittee(t *testing.T) { t.Error("Failed to receive pubsub within 1s") } } + +func TestBroadcastBLSChanges(t *testing.T) { + logHook := logTest.NewGlobal() + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + if len(p1.BHost.Network().Peers()) == 0 { + t.Fatal("No peers") + } + + p := &Service{ + host: p1.BHost, + pubsub: p1.PubSub(), + joinedTopics: map[string]*pubsub.Topic{}, + cfg: &Config{}, + ctx: context.Background(), + genesisTime: time.Now(), + genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + subnetsLock: make(map[uint64]*sync.RWMutex), + subnetsLockLock: sync.Mutex{}, + peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ + ScorerParams: &scorers.Config{}, + }), + } + + message := ðpb.SignedBLSToExecutionChange{ + Message: ðpb.BLSToExecutionChange{ + ValidatorIndex: 1, + FromBlsPubkey: bytesutil.PadTo([]byte("pubkey1"), 48), + ToExecutionAddress: bytesutil.PadTo([]byte("address1"), 20), + }, + Signature: bytesutil.PadTo([]byte("signature1"), 96), + } + message2 := ðpb.SignedBLSToExecutionChange{ + Message: ðpb.BLSToExecutionChange{ + ValidatorIndex: 1, + FromBlsPubkey: bytesutil.PadTo([]byte("pubkey1"), 48), + ToExecutionAddress: bytesutil.PadTo([]byte("address1"), 20), + }, + Signature: bytesutil.PadTo([]byte("signature1"), 96), + } + messages := make([]*ethpb.SignedBLSToExecutionChange, 200) + for i := 0; i < 128; i++ { + messages[i] = message + } + for i := 128; i < 200; i++ { + messages[i] = message2 + } + + p.broadcastBLSChanges(context.Background(), messages) + require.LogsDoNotContain(t, logHook, "could not") + +} diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index a51cbc6434..e47a13e0ef 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -35,6 +35,7 @@ type Broadcaster interface { Broadcast(context.Context, proto.Message) error BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error + BroadcastBLSChanges(context.Context, []*ethpb.SignedBLSToExecutionChange) } // SetStreamHandler configures p2p to handle streams of a certain topic ID. diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index f2809ed7db..5c6998fb66 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -143,6 +143,10 @@ func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ * return nil } +// BroadcastBLSChanges mocks a broadcast BLS change ocurred +func (_ *FakeP2P) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) { +} + // InterceptPeerDial -- fake. func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) { return true diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index a9cb87d45f..23f2762b57 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -4,6 +4,7 @@ import ( "context" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) @@ -33,3 +34,13 @@ func (m *MockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uin m.BroadcastCalled = true return nil } + +// BroadcastBLSChanges mocks a broadcast BLS change ocurred +func (m *MockBroadcaster) BroadcastBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) { + for _, change := range changes { + err := m.Broadcast(ctx, change) + if err != nil { + log.WithError(err).Error("could not broadcast Signed BLS change") + } + } +} diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 79e99fc9d3..f466ffe527 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -176,6 +176,14 @@ func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ * return nil } +// BroadcastBLSChanges mocks a broadcast BLS change ocurred +func (p *TestP2P) BroadcastBLSChanges(_ context.Context, changes []*ethpb.SignedBLSToExecutionChange) { + if len(changes) > 0 { + p.BroadcastCalled = true + return + } +} + // SetStreamHandler for RPC. func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) { p.BHost.SetStreamHandler(protocol.ID(topic), handler) diff --git a/beacon-chain/rpc/eth/beacon/pool.go b/beacon-chain/rpc/eth/beacon/pool.go index c54fc3c15b..16f098d586 100644 --- a/beacon-chain/rpc/eth/beacon/pool.go +++ b/beacon-chain/rpc/eth/beacon/pool.go @@ -317,6 +317,8 @@ func (bs *Server) SubmitSignedBLSToExecutionChanges(ctx context.Context, req *et return nil, status.Errorf(codes.Internal, "Could not get head state: %v", err) } var failures []*helpers.SingleIndexedVerificationFailure + var toBroadcast []*ethpbalpha.SignedBLSToExecutionChange + for i, change := range req.GetChanges() { alphaChange := migration.V2SignedBLSToExecutionChangeToV1Alpha1(change) _, err = blocks.ValidateBLSToExecutionChange(st, alphaChange) @@ -342,15 +344,10 @@ func (bs *Server) SubmitSignedBLSToExecutionChanges(ctx context.Context, req *et }) bs.BLSChangesPool.InsertBLSToExecChange(alphaChange) if st.Version() >= version.Capella { - if err := bs.Broadcaster.Broadcast(ctx, alphaChange); err != nil { - failures = append(failures, &helpers.SingleIndexedVerificationFailure{ - Index: i, - Message: "Could not broadcast BLSToExecutionChange: " + err.Error(), - }) - continue - } + toBroadcast = append(toBroadcast, alphaChange) } } + bs.Broadcaster.BroadcastBLSChanges(ctx, toBroadcast) if len(failures) > 0 { failuresContainer := &helpers.IndexedVerificationFailure{Failures: failures} err := grpc.AppendCustomErrorHeader(ctx, failuresContainer) diff --git a/beacon-chain/sync/broadcast_bls_changes.go b/beacon-chain/sync/broadcast_bls_changes.go index e87541649b..25fdfeff62 100644 --- a/beacon-chain/sync/broadcast_bls_changes.go +++ b/beacon-chain/sync/broadcast_bls_changes.go @@ -1,29 +1,36 @@ package sync import ( - "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v3/config/params" - "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v3/crypto/rand" + ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/time/slots" ) -// This routine broadcasts all known BLS changes at the Capella fork. -func (s *Service) broadcastBLSChanges(currSlot primitives.Slot) error { +// This routine broadcasts known BLS changes at the Capella fork. +func (s *Service) broadcastBLSChanges(currSlot types.Slot) { capellaSlotStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch) if err != nil { // only possible error is an overflow, so we exit early from the method - return nil + return } - if currSlot == capellaSlotStart { - changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges() - if err != nil { - return errors.Wrap(err, "could not get BLS to execution changes") - } - for _, ch := range changes { - if err := s.cfg.p2p.Broadcast(s.ctx, ch); err != nil { - return errors.Wrap(err, "could not broadcast BLS to execution changes.") - } - } + if currSlot != capellaSlotStart { + return } - return nil + changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges() + if err != nil { + log.WithError(err).Error("could not get BLS to execution changes") + } + if len(changes) == 0 { + return + } + source := rand.NewGenerator() + broadcastChanges := make([]*ethpb.SignedBLSToExecutionChange, len(changes)) + for i := 0; i < len(changes); i++ { + idx := source.Intn(len(changes)) + broadcastChanges[i] = changes[idx] + changes = append(changes[:idx], changes[idx+1:]...) + } + s.cfg.p2p.BroadcastBLSChanges(s.ctx, broadcastChanges) } diff --git a/beacon-chain/sync/broadcast_bls_changes_test.go b/beacon-chain/sync/broadcast_bls_changes_test.go index ee0d20dd56..e2be411c1d 100644 --- a/beacon-chain/sync/broadcast_bls_changes_test.go +++ b/beacon-chain/sync/broadcast_bls_changes_test.go @@ -44,7 +44,5 @@ func TestBroadcastBLSChanges(t *testing.T) { capellaStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch) require.NoError(t, err) - require.NoError(t, s.broadcastBLSChanges(capellaStart)) - require.NoError(t, s.broadcastBLSChanges(capellaStart+1)) - + s.broadcastBLSChanges(capellaStart + 1) } diff --git a/beacon-chain/sync/fork_watcher.go b/beacon-chain/sync/fork_watcher.go index 731e02eb5c..4408b99586 100644 --- a/beacon-chain/sync/fork_watcher.go +++ b/beacon-chain/sync/fork_watcher.go @@ -29,10 +29,7 @@ func (s *Service) forkWatcher() { continue } // Broadcast BLS changes at the Capella fork boundary - if err := s.broadcastBLSChanges(currSlot); err != nil { - log.WithError(err).Error("Unable to broadcast BLS to execution changes") - continue - } + s.broadcastBLSChanges(currSlot) case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine")