Move broadcast of BLS changes to the forkwatcher (#11878)

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Potuz
2023-01-16 12:00:11 -03:00
committed by GitHub
parent 047cae5e8b
commit ad680d3417
7 changed files with 86 additions and 50 deletions

View File

@@ -3,7 +3,6 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"broadcast_bls_changes.go",
"chain_info.go",
"error.go",
"execution_engine.go",

View File

@@ -1,48 +0,0 @@
package blockchain
import (
"github.com/prysmaticlabs/prysm/v3/async/event"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
// This routine broadcasts all known BLS changes at the Capella fork.
func (s *Service) spawnBroadcastBLSChangesRoutine(stateFeed *event.Feed) {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := stateFeed.Subscribe(stateChannel)
go func() {
select {
case <-s.ctx.Done():
stateSub.Unsubscribe()
return
case <-stateChannel:
stateSub.Unsubscribe()
break
}
st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-s.ctx.Done():
return
case <-st.C():
if slots.ToEpoch(s.CurrentSlot()) >= params.BeaconConfig().CapellaForkEpoch {
changes, err := s.cfg.BLSToExecPool.PendingBLSToExecChanges()
if err != nil {
log.WithError(err).Error("Could not get BLS to execution changes.")
return
}
for _, ch := range changes {
if err := s.cfg.P2p.Broadcast(s.ctx, ch); err != nil {
log.WithError(err).Error("Could not broadcast BLS to execution changes.")
return
}
}
return
}
}
}
}()
}

View File

@@ -130,7 +130,6 @@ func (s *Service) Start() {
}
}
s.spawnProcessAttestationsRoutine(s.cfg.StateNotifier.StateFeed())
s.spawnBroadcastBLSChangesRoutine(s.cfg.StateNotifier.StateFeed())
s.fillMissingPayloadIDRoutine(s.ctx, s.cfg.StateNotifier.StateFeed())
}

View File

@@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"batch_verifier.go",
"broadcast_bls_changes.go",
"context.go",
"deadlines.go",
"decode_pubsub.go",
@@ -133,6 +134,7 @@ go_test(
size = "small",
srcs = [
"batch_verifier_test.go",
"broadcast_bls_changes_test.go",
"context_test.go",
"decode_pubsub_test.go",
"error_test.go",

View File

@@ -0,0 +1,28 @@
package sync
import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
// This routine broadcasts all known BLS changes at the Capella fork.
func (s *Service) broadcastBLSChanges(currSlot types.Slot) error {
capellaSlotStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
if err != nil {
return errors.Wrap(err, "unable to compute Capella slot start")
}
if currSlot == capellaSlotStart {
changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges()
if err != nil {
return errors.Wrap(err, "could not get BLS to execution changes")
}
for _, ch := range changes {
if err := s.cfg.p2p.Broadcast(s.ctx, ch); err != nil {
return errors.Wrap(err, "could not broadcast BLS to execution changes.")
}
}
}
return nil
}

View File

@@ -0,0 +1,50 @@
package sync
import (
"context"
"testing"
"time"
mockChain "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/operations/blstoexec"
mockp2p "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing"
mockSync "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
func TestBroadcastBLSChanges(t *testing.T) {
params.SetupTestConfigCleanup(t)
c := params.BeaconConfig()
c.CapellaForkEpoch = c.BellatrixForkEpoch.Add(2)
params.OverrideBeaconConfig(c)
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
s := NewService(context.Background(),
WithP2P(mockp2p.NewTestP2P(t)),
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
WithChainService(chainService),
WithStateNotifier(chainService.StateNotifier()),
WithOperationNotifier(chainService.OperationNotifier()),
WithBlsToExecPool(blstoexec.NewPool()),
)
var emptySig [96]byte
s.cfg.blsToExecPool.InsertBLSToExecChange(&ethpb.SignedBLSToExecutionChange{
Message: &ethpb.BLSToExecutionChange{
ValidatorIndex: 10,
FromBlsPubkey: make([]byte, 48),
ToExecutionAddress: make([]byte, 20),
},
Signature: emptySig[:],
})
capellaStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
require.NoError(t, err)
require.NoError(t, s.broadcastBLSChanges(capellaStart))
require.NoError(t, s.broadcastBLSChanges(capellaStart+1))
}

View File

@@ -28,6 +28,12 @@ func (s *Service) forkWatcher() {
log.WithError(err).Error("Unable to check for fork in the previous epoch")
continue
}
// Broadcast BLS changes at the Capella fork boundary
if err := s.broadcastBLSChanges(currSlot); err != nil {
log.WithError(err).Error("Unable to broadcast BLS to execution changes")
continue
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
slotTicker.Done()