Add the Abillity to Defragment the Beacon State (#13444)

* Defragment head state

* change log level

* change it to be more efficient

* add flag

* add tests and clean up

* fix it

* gosimple

* Update container/multi-value-slice/multi_value_slice.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* radek's review

* unlock it

* remove from fc lock

---------

Co-authored-by: rkapka <rkapka@wp.pl>
This commit is contained in:
Nishant Das
2024-01-13 13:44:02 +08:00
committed by GitHub
parent 0cfbddc980
commit 1ff5a43385
8 changed files with 288 additions and 1 deletions

View File

@@ -6,6 +6,7 @@ go_library(
"chain_info.go",
"chain_info_forkchoice.go",
"currently_syncing_block.go",
"defragment.go",
"error.go",
"execution_engine.go",
"forkchoice_update_execution.go",

View File

@@ -0,0 +1,27 @@
package blockchain
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/time"
)
var stateDefragmentationTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "head_state_defragmentation_milliseconds",
Help: "Milliseconds it takes to defragment the head state",
})
// This method defragments our state, so that any specific fields which have
// a higher number of fragmented indexes are reallocated to a new separate slice for
// that field.
func (s *Service) defragmentState(st state.BeaconState) {
if !features.Get().EnableExperimentalState {
return
}
startTime := time.Now()
st.Defragment()
elapsedTime := time.Since(startTime)
stateDefragmentationTime.Observe(float64(elapsedTime.Milliseconds()))
}

View File

@@ -123,6 +123,9 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
}
daWaitedTime := time.Since(daStartTime)
// Defragment the state before continuing block processing.
s.defragmentState(postState)
// The rest of block processing takes a lock on forkchoice.
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()

View File

@@ -22,6 +22,7 @@ type BeaconState interface {
WriteOnlyBeaconState
Copy() BeaconState
CopyAllTries()
Defragment()
HashTreeRoot(ctx context.Context) ([32]byte, error)
Prover
json.Marshaler

View File

@@ -123,6 +123,55 @@ func NewMultiValueValidators(vals []*ethpb.Validator) *MultiValueValidators {
return mv
}
// Defragment checks whether each individual multi-value field in our state is fragmented
// and if it is, it will 'reset' the field to create a new multivalue object.
func (b *BeaconState) Defragment() {
b.lock.Lock()
defer b.lock.Unlock()
if b.blockRootsMultiValue != nil && b.blockRootsMultiValue.IsFragmented() {
initialMVslice := b.blockRootsMultiValue
b.blockRootsMultiValue = b.blockRootsMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.BlockRoots.String()).Inc()
runtime.SetFinalizer(b.blockRootsMultiValue, blockRootsFinalizer)
}
if b.stateRootsMultiValue != nil && b.stateRootsMultiValue.IsFragmented() {
initialMVslice := b.stateRootsMultiValue
b.stateRootsMultiValue = b.stateRootsMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.StateRoots.String()).Inc()
runtime.SetFinalizer(b.stateRootsMultiValue, stateRootsFinalizer)
}
if b.randaoMixesMultiValue != nil && b.randaoMixesMultiValue.IsFragmented() {
initialMVslice := b.randaoMixesMultiValue
b.randaoMixesMultiValue = b.randaoMixesMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.RandaoMixes.String()).Inc()
runtime.SetFinalizer(b.randaoMixesMultiValue, randaoMixesFinalizer)
}
if b.balancesMultiValue != nil && b.balancesMultiValue.IsFragmented() {
initialMVslice := b.balancesMultiValue
b.balancesMultiValue = b.balancesMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.Balances.String()).Inc()
runtime.SetFinalizer(b.balancesMultiValue, balancesFinalizer)
}
if b.validatorsMultiValue != nil && b.validatorsMultiValue.IsFragmented() {
initialMVslice := b.validatorsMultiValue
b.validatorsMultiValue = b.validatorsMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.Validators.String()).Inc()
runtime.SetFinalizer(b.validatorsMultiValue, validatorsFinalizer)
}
if b.inactivityScoresMultiValue != nil && b.inactivityScoresMultiValue.IsFragmented() {
initialMVslice := b.inactivityScoresMultiValue
b.inactivityScoresMultiValue = b.inactivityScoresMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.InactivityScores.String()).Inc()
runtime.SetFinalizer(b.inactivityScoresMultiValue, inactivityScoresFinalizer)
}
}
func randaoMixesFinalizer(m *MultiValueRandaoMixes) {
multiValueCountGauge.WithLabelValues(types.RandaoMixes.String()).Dec()
}

View File

@@ -606,7 +606,7 @@ func TestBlocksFetcher_WaitForBandwidth(t *testing.T) {
p1.Connect(p2)
require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
req := &ethpb.BeaconBlocksByRangeRequest{
Count: 64,
Count: 64,
}
topic := p2pm.RPCBlocksByRangeTopicV1

View File

@@ -96,6 +96,10 @@ import (
"github.com/pkg/errors"
)
// Amount of references beyond which a multivalue object is considered
// fragmented.
const fragmentationLimit = 50000
// Id is an object identifier.
type Id = uint64
@@ -424,6 +428,58 @@ func (s *Slice[V]) MultiValueStatistics() MultiValueStatistics {
return stats
}
// IsFragmented checks if our mutlivalue object is fragmented (individual references held).
// If the number of references is higher than our threshold we return true.
func (s *Slice[V]) IsFragmented() bool {
stats := s.MultiValueStatistics()
return stats.TotalIndividualElemReferences+stats.TotalAppendedElemReferences >= fragmentationLimit
}
// Reset builds a new multivalue object with respect to the
// provided object's id. The base slice will be based on this
// particular id.
func (s *Slice[V]) Reset(obj Identifiable) *Slice[V] {
s.lock.RLock()
defer s.lock.RUnlock()
l, ok := s.cachedLengths[obj.Id()]
if !ok {
l = len(s.sharedItems)
}
items := make([]V, l)
copy(items, s.sharedItems)
for i, ind := range s.individualItems {
for _, v := range ind.Values {
_, found := containsId(v.ids, obj.Id())
if found {
items[i] = v.val
break
}
}
}
index := len(s.sharedItems)
for _, app := range s.appendedItems {
found := true
for _, v := range app.Values {
_, found = containsId(v.ids, obj.Id())
if found {
items[index] = v.val
index++
break
}
}
if !found {
break
}
}
reset := &Slice[V]{}
reset.Init(items)
return reset
}
func (s *Slice[V]) fillOriginalItems(obj Identifiable, items *[]V) {
for i, item := range s.sharedItems {
ind, ok := s.individualItems[uint64(i)]

View File

@@ -326,6 +326,156 @@ func TestDetach(t *testing.T) {
assert.Equal(t, false, ok)
}
func TestReset(t *testing.T) {
s := setup()
obj := &testObject{id: 2}
reset := s.Reset(obj)
assert.Equal(t, 8, len(reset.sharedItems))
assert.Equal(t, 123, reset.sharedItems[0])
assert.Equal(t, 2, reset.sharedItems[1])
assert.Equal(t, 3, reset.sharedItems[2])
assert.Equal(t, 123, reset.sharedItems[3])
assert.Equal(t, 2, reset.sharedItems[4])
assert.Equal(t, 2, reset.sharedItems[5])
assert.Equal(t, 3, reset.sharedItems[6])
assert.Equal(t, 2, reset.sharedItems[7])
assert.Equal(t, 0, len(reset.individualItems))
assert.Equal(t, 0, len(reset.appendedItems))
}
func TestFragmentation_IndividualReferences(t *testing.T) {
s := &Slice[int]{}
s.Init([]int{123, 123, 123, 123, 123})
s.individualItems[1] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 1,
ids: []uint64{1},
},
{
val: 2,
ids: []uint64{2},
},
},
}
s.individualItems[2] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 3,
ids: []uint64{1, 2},
},
},
}
numOfRefs := fragmentationLimit / 2
for i := 3; i < numOfRefs; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, false, s.IsFragmented())
// Add more references to hit fragmentation limit. Id 1
// has 2 references above.
for i := numOfRefs; i < numOfRefs+3; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, true, s.IsFragmented())
}
func TestFragmentation_AppendedReferences(t *testing.T) {
s := &Slice[int]{}
s.Init([]int{123, 123, 123, 123, 123})
s.appendedItems = []*MultiValueItem[int]{
{
Values: []*Value[int]{
{
val: 1,
ids: []uint64{1},
},
{
val: 2,
ids: []uint64{2},
},
},
},
{
Values: []*Value[int]{
{
val: 3,
ids: []uint64{1, 2},
},
},
},
}
s.cachedLengths[1] = 7
s.cachedLengths[2] = 8
numOfRefs := fragmentationLimit / 2
for i := 3; i < numOfRefs; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, false, s.IsFragmented())
// Add more references to hit fragmentation limit. Id 1
// has 2 references above.
for i := numOfRefs; i < numOfRefs+3; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, true, s.IsFragmented())
}
func TestFragmentation_IndividualAndAppendedReferences(t *testing.T) {
s := &Slice[int]{}
s.Init([]int{123, 123, 123, 123, 123})
s.individualItems[2] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 3,
ids: []uint64{1, 2},
},
},
}
s.appendedItems = []*MultiValueItem[int]{
{
Values: []*Value[int]{
{
val: 1,
ids: []uint64{1},
},
{
val: 2,
ids: []uint64{2},
},
},
},
}
s.cachedLengths[1] = 7
s.cachedLengths[2] = 8
numOfRefs := fragmentationLimit / 2
for i := 3; i < numOfRefs; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, false, s.IsFragmented())
// Add more references to hit fragmentation limit. Id 1
// has 2 references above.
for i := numOfRefs; i < numOfRefs+3; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, true, s.IsFragmented())
}
// Share the slice between 2 objects.
// Index 0: Shared value
// Index 1: Different individual value