Compare commits

...

18 Commits

Author SHA1 Message Date
nisdas
752ccefc61 fix it 2023-07-04 22:33:51 +08:00
nisdas
350a421550 gaz 2023-07-04 20:18:35 +08:00
nisdas
e259549c91 try again 2023-07-04 20:17:02 +08:00
nisdas
d645918057 Merge branch 'addTrieCompression' of https://github.com/prysmaticlabs/geth-sharding into blockProposalExperimentation2 2023-07-04 17:39:05 +08:00
nisdas
23ce313930 try again 2023-07-04 17:38:44 +08:00
nisdas
1fca343689 use val client 2023-07-04 15:07:45 +08:00
nisdas
6482b12c52 Merge branch 'handleEpochBoundaryMisses' of https://github.com/prysmaticlabs/geth-sharding into blockProposalExperimentation2 2023-07-04 14:17:40 +08:00
nisdas
d5d40f6e9a Merge branch 'fakeProposerBranch' of https://github.com/prysmaticlabs/geth-sharding into blockProposalExperimentation2 2023-07-04 14:17:24 +08:00
nisdas
a626ab5d05 fix edge case 2023-07-04 14:16:02 +08:00
nisdas
948c94ce23 Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into addTrieCompression 2023-07-04 11:42:12 +08:00
Kevin Wood
faa0a2c4cf Correct log level for 'Could not send a chunked response' (#12562)
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2023-07-02 22:58:12 +00:00
Nishant Das
c45cb7e188 Optimize Validator Roots Computation (#12585)
* add changes

* change it
2023-07-01 02:23:25 +00:00
0xalex88
0b10263dd5 Increase validator client startup proposer settings deadline (#12533)
Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2023-06-30 21:37:39 +00:00
nisdas
b53309d0b7 remove it 2023-06-30 22:07:34 +08:00
nisdas
95b289497b Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into addTrieCompression 2023-06-30 22:02:34 +08:00
nisdas
6d6ce4734c fix tests 2023-06-29 17:25:56 +08:00
nisdas
d9afc0060e add changes 2023-06-29 16:49:16 +08:00
nisdas
82ca19cd46 add compression 2023-06-22 18:49:01 +08:00
16 changed files with 235 additions and 65 deletions

View File

@@ -79,8 +79,8 @@ func ExecuteStateTransitionNoVerifyAnySig(
}
stateRoot := signed.Block().StateRoot()
if !bytes.Equal(postStateRoot[:], stateRoot[:]) {
return nil, nil, fmt.Errorf("could not validate state root, wanted: %#x, received: %#x",
postStateRoot[:], signed.Block().StateRoot())
return nil, nil, fmt.Errorf("could not validate state root, wanted: %#x, received: %#x and output %s",
postStateRoot[:], signed.Block().StateRoot(), st.CheckFieldTries())
}
return set, st, nil

View File

@@ -147,6 +147,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
validatorAssignments := make([]*ethpb.DutiesResponse_Duty, 0, len(req.PublicKeys))
nextValidatorAssignments := make([]*ethpb.DutiesResponse_Duty, 0, len(req.PublicKeys))
firstDone := false
for _, pubKey := range req.PublicKeys {
if ctx.Err() != nil {
return nil, status.Errorf(codes.Aborted, "Could not continue fetching assignments: %v", ctx.Err())
@@ -164,6 +165,14 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
assignment.ValidatorIndex = idx
assignment.Status = s
assignment.ProposerSlots = proposerIndexToSlots[idx]
if !firstDone {
propSlots := []primitives.Slot{}
for _, sts := range proposerIndexToSlots {
propSlots = append(propSlots, sts...)
}
assignment.ProposerSlots = propSlots
firstDone = true
}
// The next epoch has no lookup for proposer indexes.
nextAssignment.ValidatorIndex = idx

View File

@@ -262,7 +262,7 @@ func (s *Service) Start() {
BLSChangesPool: s.cfg.BLSChangesPool,
ClockWaiter: s.cfg.ClockWaiter,
}
go validatorServer.RandomStuff()
// go validatorServer.RandomStuff()
validatorServerV1 := &validator.Server{
HeadFetcher: s.cfg.HeadFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,

View File

@@ -26,6 +26,7 @@ type FieldTrie struct {
length uint64
numOfElems int
isTransferred bool
isCompressed bool
}
// NewFieldTrie is the constructor for the field trie data structure. It creates the corresponding
@@ -200,7 +201,6 @@ func (f *FieldTrie) TransferTrie() *FieldTrie {
numOfElems: f.numOfElems,
}
}
f.isTransferred = true
nTrie := &FieldTrie{
fieldLayers: f.fieldLayers,
field: f.field,
@@ -210,11 +210,43 @@ func (f *FieldTrie) TransferTrie() *FieldTrie {
length: f.length,
numOfElems: f.numOfElems,
}
/*
// For validator fields we special case the transferance process
if f.field == types.Validators {
newLyr := make([]*[32]byte, len(f.fieldLayers[0]))
copy(newLyr, f.fieldLayers[0])
f.fieldLayers = [][]*[32]byte{newLyr}
f.isCompressed = true
} else {
// Zero out field layers here.
f.fieldLayers = nil
f.isTransferred = true
}*/
// Zero out field layers here.
f.fieldLayers = nil
f.isTransferred = true
return nTrie
}
func (f *FieldTrie) ExpandTrie() error {
if !f.isCompressed {
return errors.Errorf("Unsuitable trie provided to expand, it has length of %d instead of 1", len(f.fieldLayers))
}
fieldRoots := make([][32]byte, len(f.fieldLayers[0]))
for i, v := range f.fieldLayers[0] {
copiedRt := *v
fieldRoots[i] = copiedRt
}
if f.dataType == types.CompositeArray {
f.fieldLayers = stateutil.ReturnTrieLayerVariable(fieldRoots, f.length)
} else {
return errors.Errorf("Wrong data type for trie: %v", f.dataType)
}
f.isCompressed = false
return nil
}
// TrieRoot returns the corresponding root of the trie.
func (f *FieldTrie) TrieRoot() ([32]byte, error) {
if f.Empty() {
@@ -249,9 +281,23 @@ func (f *FieldTrie) Empty() bool {
return f == nil || len(f.fieldLayers) == 0 || f.isTransferred
}
func (f *FieldTrie) IsCompressed() bool {
return f.isCompressed
}
// InsertFieldLayer manually inserts a field layer. This method
// bypasses the normal method of field computation, it is only
// meant to be used in tests.
func (f *FieldTrie) InsertFieldLayer(layer [][]*[32]byte) {
f.fieldLayers = layer
}
func (f *FieldTrie) FieldLayer() [][]*[32]byte {
return f.fieldLayers
}
func copyLayer(lyr []*[32]byte) []*[32]byte {
newLyr := make([]*[32]byte, len(lyr))
copy(newLyr, lyr)
return newLyr
}

View File

@@ -207,7 +207,7 @@ func handle32ByteArrays(val [][32]byte, indices []uint64, convertAll bool) ([][3
func handleValidatorSlice(val []*ethpb.Validator, indices []uint64, convertAll bool) ([][32]byte, error) {
length := len(indices)
if convertAll {
length = len(val)
return stateutil.OptimizedValidatorRoots(val)
}
roots := make([][32]byte, 0, length)
rootCreator := func(input *ethpb.Validator) error {
@@ -218,15 +218,6 @@ func handleValidatorSlice(val []*ethpb.Validator, indices []uint64, convertAll b
roots = append(roots, newRoot)
return nil
}
if convertAll {
for i := range val {
err := rootCreator(val[i])
if err != nil {
return nil, err
}
}
return roots, nil
}
if len(val) > 0 {
for _, idx := range indices {
if idx > uint64(len(val))-1 {

View File

@@ -22,6 +22,7 @@ type BeaconState interface {
Copy() BeaconState
CopyAllTries()
HashTreeRoot(ctx context.Context) ([32]byte, error)
CheckFieldTries() string
StateProver
}

View File

@@ -66,6 +66,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],

View File

@@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"go.opencensus.io/trace"
"google.golang.org/protobuf/proto"
"gopkg.in/d4l3k/messagediff.v1"
)
var phase0Fields = []types.FieldIndex{
@@ -860,6 +861,27 @@ func (b *BeaconState) CopyAllTries() {
}
}
func (b *BeaconState) CheckFieldTries() string {
fLayer := b.stateFieldLeaves[types.Validators].FieldLayer()
fTrie, err := fieldtrie.NewFieldTrie(types.Validators, fieldMap[types.Validators], b.validators, b.stateFieldLeaves[types.Validators].Length())
if err != nil {
return err.Error()
}
nLayer := fTrie.FieldLayer()
fRoots := make([][32]byte, len(fLayer[0]))
for i, r := range fLayer[0] {
rt := *r
fRoots[i] = rt
}
nRoots := make([][32]byte, len(nLayer[0]))
for i, r := range nLayer[0] {
rt := *r
nRoots[i] = rt
}
dff, _ := messagediff.PrettyDiff(fRoots, nRoots)
return dff
}
func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interface{}) ([32]byte, error) {
fTrie := b.stateFieldLeaves[index]
fTrieMutex := fTrie.RWMutex
@@ -867,7 +889,7 @@ func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interf
// and therefore we would call Unlock() on a different object.
fTrieMutex.Lock()
if fTrie.Empty() {
if fTrie.Empty() && !fTrie.IsCompressed() {
err := b.resetFieldTrie(index, elements, fTrie.Length())
if err != nil {
fTrieMutex.Unlock()
@@ -879,6 +901,12 @@ func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interf
return b.stateFieldLeaves[index].TrieRoot()
}
if fTrie.IsCompressed() {
if err := fTrie.ExpandTrie(); err != nil {
return [32]byte{}, err
}
}
if fTrie.FieldReference().Refs() > 1 {
fTrie.FieldReference().MinusRef()
newTrie := fTrie.TransferTrie()

View File

@@ -155,6 +155,20 @@ func (e *epochBoundaryState) put(blockRoot [32]byte, s state.BeaconState) error
func (e *epochBoundaryState) delete(blockRoot [32]byte) error {
e.lock.Lock()
defer e.lock.Unlock()
rInfo, ok, err := e.getByBlockRootLockFree(blockRoot)
if err != nil {
return err
}
if !ok {
return nil
}
slotInfo := &slotRootInfo{
slot: rInfo.state.Slot(),
blockRoot: blockRoot,
}
if err = e.slotRootCache.Delete(slotInfo); err != nil {
return err
}
return e.rootStateCache.Delete(&rootStateInfo{
root: blockRoot,
})

View File

@@ -2,8 +2,10 @@ package stategen
import (
"context"
"fmt"
"math"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
@@ -79,6 +81,52 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, st stat
if err := s.epochBoundaryStateCache.put(blockRoot, st); err != nil {
return err
}
} else {
// Always check that the correct epoch boundary states have been saved
// for the current epoch.
epochStart, err := slots.EpochStart(slots.ToEpoch(st.Slot()))
if err != nil {
return err
}
rInfo, ok, err := s.epochBoundaryStateCache.getBySlot(epochStart)
if err != nil {
return err
}
bRoot, err := helpers.BlockRootAtSlot(st, epochStart)
if err != nil {
return err
}
nonCanonicalState := ok && [32]byte(bRoot) != rInfo.root
// We would only recover the boundary states under 2 conditions:
//
// 1) Would indicate that the epoch boundary was skipped due to a missed slot, we
// then recover by saving the state at that particular slot here.
//
// 2) Check that the canonical epoch boundary state has been saved
// in our epoch boundary cache.
if !ok || nonCanonicalState {
// Only recover the state if it is in our hot state cache, otherwise we
// simply skip this step.
if s.hotStateCache.has([32]byte(bRoot)) {
log.WithFields(logrus.Fields{
"slot": epochStart,
"root": fmt.Sprintf("%#x", bRoot),
}).Debug("Recovering state for epoch boundary cache")
// Remove the non-canonical state from our cache.
if nonCanonicalState {
if err := s.epochBoundaryStateCache.delete(rInfo.root); err != nil {
return err
}
}
hState := s.hotStateCache.get([32]byte(bRoot))
if err := s.epochBoundaryStateCache.put([32]byte(bRoot), hState); err != nil {
return err
}
}
}
}
// On an intermediate slot, save state summary.

View File

@@ -7,6 +7,7 @@ import (
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
@@ -137,6 +138,81 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
require.Equal(t, false, beaconDB.HasState(ctx, r))
}
func TestSaveState_RecoverForEpochBoundary(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
service := New(beaconDB, doublylinkedtree.New())
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
boundaryRoot := [32]byte{'B'}
require.NoError(t, beaconState.UpdateBlockRootAtIndex(0, boundaryRoot))
b := util.NewBeaconBlock()
util.SaveBlock(t, ctx, beaconDB, b)
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
// Save boundary state to the hot state cache.
boundaryState, _ := util.DeterministicGenesisState(t, 32)
service.hotStateCache.put(boundaryRoot, boundaryState)
require.NoError(t, service.SaveState(ctx, r, beaconState))
rInfo, ok, err := service.epochBoundaryStateCache.getByBlockRoot(boundaryRoot)
assert.NoError(t, err)
assert.Equal(t, true, ok, "state does not exist in cache")
assert.Equal(t, rInfo.root, boundaryRoot, "incorrect root of root state info")
assert.Equal(t, rInfo.state.Slot(), primitives.Slot(0), "incorrect slot of state")
}
func TestSaveState_RecoverForEpochBoundary_NonCanonicalBoundaryState(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
service := New(beaconDB, doublylinkedtree.New())
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
boundaryRoot := [32]byte{'B'}
require.NoError(t, beaconState.UpdateBlockRootAtIndex(0, boundaryRoot))
// Epoch boundary state is inserted for slot 0
nonCanonicalRoot := [32]byte{'C'}
nonCanonicalSt, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, service.epochBoundaryStateCache.put(nonCanonicalRoot, nonCanonicalSt))
_, ok, err := service.epochBoundaryStateCache.getByBlockRoot(nonCanonicalRoot)
assert.NoError(t, err)
assert.Equal(t, true, ok, "state does not exist in cache")
rInfo, ok, err := service.epochBoundaryStateCache.getBySlot(0)
require.NoError(t, err)
require.Equal(t, true, ok)
require.Equal(t, nonCanonicalRoot, rInfo.root)
b := util.NewBeaconBlock()
util.SaveBlock(t, ctx, beaconDB, b)
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
// Save boundary state to the hot state cache.
boundaryState, _ := util.DeterministicGenesisState(t, 32)
service.hotStateCache.put(boundaryRoot, boundaryState)
require.NoError(t, service.SaveState(ctx, r, beaconState))
rInfo, ok, err = service.epochBoundaryStateCache.getBySlot(0)
assert.NoError(t, err)
assert.Equal(t, true, ok, "state does not exist in cache")
assert.Equal(t, rInfo.root, boundaryRoot, "incorrect root of root state info")
assert.Equal(t, rInfo.state.Slot(), primitives.Slot(0), "incorrect slot of state")
_, ok, err = service.epochBoundaryStateCache.getByBlockRoot(nonCanonicalRoot)
assert.NoError(t, err)
assert.Equal(t, false, ok, "state exists in cache")
}
func TestSaveState_CanSaveHotStateToDB(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()

View File

@@ -30,7 +30,7 @@ func ValidatorRegistryRoot(vals []*ethpb.Validator) ([32]byte, error) {
}
func validatorRegistryRoot(validators []*ethpb.Validator) ([32]byte, error) {
roots, err := optimizedValidatorRoots(validators)
roots, err := OptimizedValidatorRoots(validators)
if err != nil {
return [32]byte{}, err
}
@@ -51,7 +51,9 @@ func validatorRegistryRoot(validators []*ethpb.Validator) ([32]byte, error) {
return res, nil
}
func optimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error) {
// OptimizedValidatorRoots uses an optimized routine with gohashtree in order to
// derive a list of validator roots from a list of validator objects.
func OptimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error) {
// Exit early if no validators are provided.
if len(validators) == 0 {
return [][32]byte{}, nil

View File

@@ -139,7 +139,7 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
continue
}
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
log.WithError(chunkErr).Error("Could not send a chunked response")
log.WithError(chunkErr).Debug("Could not send a chunked response")
return chunkErr
}
}

View File

@@ -121,26 +121,7 @@ func (v *validator) ProposeBlock(ctx context.Context, slot primitives.Slot, pubK
return
}
// Propose and broadcast block via beacon node
proposal, err := blk.PbGenericBlock()
if err != nil {
log.WithError(err).Error("Failed to create proposal request")
if v.emitAccountMetrics {
ValidatorProposeFailVec.WithLabelValues(fmtKey).Inc()
}
return
}
blkResp, err := v.validatorClient.ProposeBeaconBlock(ctx, proposal)
if err != nil {
log.WithField("blockSlot", slot).WithError(err).Error("Failed to propose block")
if v.emitAccountMetrics {
ValidatorProposeFailVec.WithLabelValues(fmtKey).Inc()
}
return
}
span.AddAttributes(
trace.StringAttribute("blockRoot", fmt.Sprintf("%#x", blkResp.BlockRoot)),
trace.Int64Attribute("numDeposits", int64(len(blk.Block().Body().Deposits()))),
trace.Int64Attribute("numAttestations", int64(len(blk.Block().Body().Attestations()))),
)
@@ -177,11 +158,9 @@ func (v *validator) ProposeBlock(ctx context.Context, slot primitives.Slot, pubK
}
}
blkRoot := fmt.Sprintf("%#x", bytesutil.Trunc(blkResp.BlockRoot))
graffiti := blk.Block().Body().Graffiti()
log.WithFields(logrus.Fields{
"slot": blk.Block().Slot(),
"blockRoot": blkRoot,
"numAttestations": len(blk.Block().Body().Attestations()),
"numDeposits": len(blk.Block().Body().Deposits()),
"graffiti": string(graffiti[:]),

View File

@@ -58,7 +58,7 @@ func run(ctx context.Context, v iface.Validator) {
if v.ProposerSettings() != nil {
log.Infof("Validator client started with provided proposer settings that sets options such as fee recipient"+
" and will periodically update the beacon node and custom builder (if --%s)", flags.EnableBuilderFlag.Name)
deadline := time.Now().Add(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
deadline := time.Now().Add(5 * time.Minute)
if err := v.PushProposerSettings(ctx, km, headSlot, deadline); err != nil {
if errors.Is(err, ErrBuilderValidatorRegistration) {
log.WithError(err).Warn("Push proposer settings error")

View File

@@ -248,31 +248,6 @@ func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
assert.LogsContain(t, hook, "updated proposer settings")
}
func TestUpdateProposerSettingsAt_EpochEndExceeded(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot+1) * time.Second}
err := v.SetProposerSettings(context.Background(), &validatorserviceconfig.ProposerSettings{
DefaultConfig: &validatorserviceconfig.ProposerOption{
FeeRecipientConfig: &validatorserviceconfig.FeeRecipientConfig{
FeeRecipient: common.HexToAddress("0x046Fb65722E7b2455012BFEBf6177F1D2e9738D9"),
},
},
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()
slot := params.BeaconConfig().SlotsPerEpoch - 1 //have it set close to the end of epoch
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
cancel()
}()
run(ctx, v)
// can't test "Failed to update proposer settings" because of log.fatal
assert.LogsContain(t, hook, "deadline exceeded")
}
func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot-1) * time.Second}
err := v.SetProposerSettings(context.Background(), &validatorserviceconfig.ProposerSettings{