mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Remove BLS change broadcasting at the fork (#15659)
* Remove BLS change broadcasting at the fork * Changelog
This commit is contained in:
@@ -5,7 +5,6 @@ go_library(
|
||||
srcs = [
|
||||
"batch_verifier.go",
|
||||
"block_batcher.go",
|
||||
"broadcast_bls_changes.go",
|
||||
"context.go",
|
||||
"custody.go",
|
||||
"data_column_sidecars.go",
|
||||
@@ -165,7 +164,6 @@ go_test(
|
||||
"batch_verifier_test.go",
|
||||
"blobs_test.go",
|
||||
"block_batcher_test.go",
|
||||
"broadcast_bls_changes_test.go",
|
||||
"context_test.go",
|
||||
"custody_test.go",
|
||||
"data_column_sidecars_test.go",
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
types "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/rand"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
)
|
||||
|
||||
const broadcastBLSChangesRateLimit = 128
|
||||
|
||||
// This routine broadcasts known BLS changes at the Capella fork.
|
||||
func (s *Service) broadcastBLSChanges(currSlot types.Slot) {
|
||||
capellaSlotStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
|
||||
if err != nil {
|
||||
// only possible error is an overflow, so we exit early from the method
|
||||
return
|
||||
}
|
||||
if currSlot != capellaSlotStart {
|
||||
return
|
||||
}
|
||||
changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("could not get BLS to execution changes")
|
||||
}
|
||||
if len(changes) == 0 {
|
||||
return
|
||||
}
|
||||
source := rand.NewGenerator()
|
||||
length := len(changes)
|
||||
broadcastChanges := make([]*ethpb.SignedBLSToExecutionChange, length)
|
||||
for i := 0; i < length; i++ {
|
||||
idx := source.Intn(len(changes))
|
||||
broadcastChanges[i] = changes[idx]
|
||||
changes = append(changes[:idx], changes[idx+1:]...)
|
||||
}
|
||||
|
||||
go s.rateBLSChanges(s.ctx, broadcastChanges)
|
||||
}
|
||||
|
||||
func (s *Service) broadcastBLSBatch(ctx context.Context, ptr *[]*ethpb.SignedBLSToExecutionChange) {
|
||||
limit := broadcastBLSChangesRateLimit
|
||||
if len(*ptr) < broadcastBLSChangesRateLimit {
|
||||
limit = len(*ptr)
|
||||
}
|
||||
st, err := s.cfg.chain.HeadStateReadOnly(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("could not get head state")
|
||||
return
|
||||
}
|
||||
for _, ch := range (*ptr)[:limit] {
|
||||
if ch != nil {
|
||||
_, err := blocks.ValidateBLSToExecutionChange(st, ch)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("could not validate BLS to execution change")
|
||||
continue
|
||||
}
|
||||
if err := s.cfg.p2p.Broadcast(ctx, ch); err != nil {
|
||||
log.WithError(err).Error("could not broadcast BLS to execution changes.")
|
||||
}
|
||||
}
|
||||
}
|
||||
*ptr = (*ptr)[limit:]
|
||||
}
|
||||
|
||||
func (s *Service) rateBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) {
|
||||
s.broadcastBLSBatch(ctx, &changes)
|
||||
if len(changes) == 0 {
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.broadcastBLSBatch(ctx, &changes)
|
||||
if len(changes) == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,157 +0,0 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing"
|
||||
testingdb "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
|
||||
doublylinkedtree "github.com/OffchainLabs/prysm/v6/beacon-chain/forkchoice/doubly-linked-tree"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/blstoexec"
|
||||
mockp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
|
||||
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestBroadcastBLSChanges(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
c := params.BeaconConfig()
|
||||
c.CapellaForkEpoch = c.BellatrixForkEpoch.Add(2)
|
||||
params.OverrideBeaconConfig(c)
|
||||
chainService := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
s := NewService(t.Context(),
|
||||
WithP2P(mockp2p.NewTestP2P(t)),
|
||||
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
|
||||
WithChainService(chainService),
|
||||
WithOperationNotifier(chainService.OperationNotifier()),
|
||||
WithBlsToExecPool(blstoexec.NewPool()),
|
||||
)
|
||||
var emptySig [96]byte
|
||||
s.cfg.blsToExecPool.InsertBLSToExecChange(ðpb.SignedBLSToExecutionChange{
|
||||
Message: ðpb.BLSToExecutionChange{
|
||||
ValidatorIndex: 10,
|
||||
FromBlsPubkey: make([]byte, 48),
|
||||
ToExecutionAddress: make([]byte, 20),
|
||||
},
|
||||
Signature: emptySig[:],
|
||||
})
|
||||
|
||||
capellaStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
|
||||
require.NoError(t, err)
|
||||
s.broadcastBLSChanges(capellaStart + 1)
|
||||
}
|
||||
|
||||
func TestRateBLSChanges(t *testing.T) {
|
||||
logHook := logTest.NewGlobal()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
c := params.BeaconConfig()
|
||||
c.CapellaForkEpoch = c.BellatrixForkEpoch.Add(2)
|
||||
params.OverrideBeaconConfig(c)
|
||||
chainService := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
p1 := mockp2p.NewTestP2P(t)
|
||||
s := NewService(t.Context(),
|
||||
WithP2P(p1),
|
||||
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
|
||||
WithChainService(chainService),
|
||||
WithOperationNotifier(chainService.OperationNotifier()),
|
||||
WithBlsToExecPool(blstoexec.NewPool()),
|
||||
)
|
||||
beaconDB := testingdb.SetupDB(t)
|
||||
s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New())
|
||||
s.cfg.beaconDB = beaconDB
|
||||
s.initCaches()
|
||||
st, keys := util.DeterministicGenesisStateCapella(t, 256)
|
||||
s.cfg.chain = &mockChain.ChainService{
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(10)),
|
||||
State: st,
|
||||
}
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
message := ðpb.BLSToExecutionChange{
|
||||
ValidatorIndex: primitives.ValidatorIndex(i),
|
||||
FromBlsPubkey: keys[i+1].PublicKey().Marshal(),
|
||||
ToExecutionAddress: bytesutil.PadTo([]byte("address"), 20),
|
||||
}
|
||||
epoch := params.BeaconConfig().CapellaForkEpoch + 1
|
||||
domain, err := signing.Domain(st.Fork(), epoch, params.BeaconConfig().DomainBLSToExecutionChange, st.GenesisValidatorsRoot())
|
||||
assert.NoError(t, err)
|
||||
htr, err := signing.Data(message.HashTreeRoot, domain)
|
||||
assert.NoError(t, err)
|
||||
signed := ðpb.SignedBLSToExecutionChange{
|
||||
Message: message,
|
||||
Signature: keys[i+1].Sign(htr[:]).Marshal(),
|
||||
}
|
||||
|
||||
s.cfg.blsToExecPool.InsertBLSToExecChange(signed)
|
||||
}
|
||||
|
||||
require.Equal(t, false, p1.BroadcastCalled.Load())
|
||||
slot, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
|
||||
require.NoError(t, err)
|
||||
s.broadcastBLSChanges(slot)
|
||||
time.Sleep(100 * time.Millisecond) // Need a sleep for the go routine to be ready
|
||||
require.Equal(t, true, p1.BroadcastCalled.Load())
|
||||
require.LogsDoNotContain(t, logHook, "could not")
|
||||
|
||||
p1.BroadcastCalled.Store(false)
|
||||
time.Sleep(500 * time.Millisecond) // Need a sleep for the second batch to be broadcast
|
||||
require.Equal(t, true, p1.BroadcastCalled.Load())
|
||||
require.LogsDoNotContain(t, logHook, "could not")
|
||||
}
|
||||
|
||||
func TestBroadcastBLSBatch_changes_slice(t *testing.T) {
|
||||
message := ðpb.BLSToExecutionChange{
|
||||
FromBlsPubkey: make([]byte, 48),
|
||||
ToExecutionAddress: make([]byte, 20),
|
||||
}
|
||||
signed := ðpb.SignedBLSToExecutionChange{
|
||||
Message: message,
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
changes := make([]*ethpb.SignedBLSToExecutionChange, 200)
|
||||
for i := 0; i < len(changes); i++ {
|
||||
changes[i] = signed
|
||||
}
|
||||
p1 := mockp2p.NewTestP2P(t)
|
||||
chainService := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
s := NewService(t.Context(),
|
||||
WithP2P(p1),
|
||||
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
|
||||
WithChainService(chainService),
|
||||
WithOperationNotifier(chainService.OperationNotifier()),
|
||||
WithBlsToExecPool(blstoexec.NewPool()),
|
||||
)
|
||||
beaconDB := testingdb.SetupDB(t)
|
||||
s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New())
|
||||
s.cfg.beaconDB = beaconDB
|
||||
s.initCaches()
|
||||
st, _ := util.DeterministicGenesisStateCapella(t, 32)
|
||||
s.cfg.chain = &mockChain.ChainService{
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(10)),
|
||||
State: st,
|
||||
}
|
||||
|
||||
s.broadcastBLSBatch(s.ctx, &changes)
|
||||
require.Equal(t, 200-128, len(changes))
|
||||
}
|
||||
@@ -28,9 +28,6 @@ func (s *Service) forkWatcher() {
|
||||
log.WithError(err).Error("Unable to check for fork in the previous epoch")
|
||||
continue
|
||||
}
|
||||
// Broadcast BLS changes at the Capella fork boundary
|
||||
s.broadcastBLSChanges(currSlot)
|
||||
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("Context closed, exiting goroutine")
|
||||
slotTicker.Done()
|
||||
|
||||
3
changelog/potuz_remove_bls_broadcast.md
Normal file
3
changelog/potuz_remove_bls_broadcast.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Ignored
|
||||
|
||||
- Remove broadcast of BLS changes at the Capella fork.
|
||||
Reference in New Issue
Block a user