Compare commits

...

4 Commits

Author SHA1 Message Date
Preston Van Loon
98e7a3c2ab Add bug. Originally from 4c8e420ab4 2022-05-10 15:36:14 +00:00
Nishant Das
208ae6bbf0 Fix Doppelganger Check (#10582)
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
(cherry picked from commit ad03938964)
2022-04-29 15:04:30 -05:00
Nishant Das
cb494c2215 Cleanup Discarded Connections Correctly (#10574)
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
(cherry picked from commit 58ad800553)
2022-04-29 15:04:15 -05:00
terencechain
844d9623b6 Can retrieve cached initial sync block and db block (#10568)
* Save cached initial sync blocks before getting head block

* Add better abstraction to get block

* Move unlock read to a better location

* Feedbacks

* Add head changed logging

* Harder hasBlock requirement

* Update beacon-chain/blockchain/service.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update receive_attestation.go

* Don't process head if the block is unknown

* Use a helper method

* Fix test

Co-authored-by: Nishant Das <nishdas93@gmail.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
(cherry picked from commit b7a82d0fd1)
2022-04-29 15:04:01 -05:00
16 changed files with 257 additions and 56 deletions

View File

@@ -1,9 +1,15 @@
package blockchain
import (
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
)
var errBlockNotFoundInCacheOrDB = errors.New("block not found in cache or db")
// This saves a beacon block to the initial sync blocks cache.
func (s *Service) saveInitSyncBlock(r [32]byte, b block.SignedBeaconBlock) {
s.initSyncBlocksLock.Lock()
@@ -20,13 +26,33 @@ func (s *Service) hasInitSyncBlock(r [32]byte) bool {
return ok
}
// This retrieves a beacon block from the initial sync blocks cache using the root of
// the block.
func (s *Service) getInitSyncBlock(r [32]byte) block.SignedBeaconBlock {
// Returns true if a block for root `r` exists in the initial sync blocks cache or the DB.
func (s *Service) hasBlockInInitSyncOrDB(ctx context.Context, r [32]byte) bool {
if s.hasInitSyncBlock(r) {
return true
}
return s.cfg.BeaconDB.HasBlock(ctx, r)
}
// Returns block for a given root `r` from either the initial sync blocks cache or the DB.
// Error is returned if the block is not found in either cache or DB.
func (s *Service) getBlock(ctx context.Context, r [32]byte) (block.SignedBeaconBlock, error) {
s.initSyncBlocksLock.RLock()
defer s.initSyncBlocksLock.RUnlock()
b := s.initSyncBlocks[r]
return b
// Check cache first because it's faster.
b, ok := s.initSyncBlocks[r]
s.initSyncBlocksLock.RUnlock()
var err error
if !ok {
b, err = s.cfg.BeaconDB.Block(ctx, r)
if err != nil {
return nil, errors.Wrap(err, "could not retrieve block from db")
}
}
if err := helpers.BeaconBlockIsNil(b); err != nil {
return nil, errBlockNotFoundInCacheOrDB
}
return b, nil
}
// This retrieves all the beacon blocks from the initial sync blocks cache, the returned

View File

@@ -0,0 +1,72 @@
package blockchain
import (
"context"
"testing"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
)
func TestService_getBlock(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
s := setupBeaconChain(t, beaconDB)
b1 := util.NewBeaconBlock()
r1, err := b1.Block.HashTreeRoot()
require.NoError(t, err)
b2 := util.NewBeaconBlock()
b2.Block.Slot = 100
r2, err := b2.Block.HashTreeRoot()
require.NoError(t, err)
// block not found
_, err = s.getBlock(ctx, [32]byte{})
require.ErrorIs(t, err, errBlockNotFoundInCacheOrDB)
// block in cache
b, err := wrapper.WrappedSignedBeaconBlock(b1)
require.NoError(t, err)
s.saveInitSyncBlock(r1, b)
got, err := s.getBlock(ctx, r1)
require.NoError(t, err)
require.DeepEqual(t, b, got)
// block in db
b, err = wrapper.WrappedSignedBeaconBlock(b2)
require.NoError(t, err)
require.NoError(t, s.cfg.BeaconDB.SaveBlock(ctx, b))
got, err = s.getBlock(ctx, r2)
require.NoError(t, err)
require.DeepEqual(t, b, got)
}
func TestService_hasBlockInInitSyncOrDB(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
s := setupBeaconChain(t, beaconDB)
b1 := util.NewBeaconBlock()
r1, err := b1.Block.HashTreeRoot()
require.NoError(t, err)
b2 := util.NewBeaconBlock()
b2.Block.Slot = 100
r2, err := b2.Block.HashTreeRoot()
require.NoError(t, err)
// block not found
require.Equal(t, false, s.hasBlockInInitSyncOrDB(ctx, [32]byte{}))
// block in cache
b, err := wrapper.WrappedSignedBeaconBlock(b1)
require.NoError(t, err)
s.saveInitSyncBlock(r1, b)
require.Equal(t, true, s.hasBlockInInitSyncOrDB(ctx, r1))
// block in db
b, err = wrapper.WrappedSignedBeaconBlock(b2)
require.NoError(t, err)
require.NoError(t, s.cfg.BeaconDB.SaveBlock(ctx, b))
require.Equal(t, true, s.hasBlockInInitSyncOrDB(ctx, r2))
}

View File

@@ -46,15 +46,9 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headState state.Be
if err != nil {
return nil, errors.Wrap(err, "could not get execution payload")
}
finalizedBlock, err := s.cfg.BeaconDB.Block(ctx, s.ensureRootNotZeros(finalizedRoot))
finalizedBlock, err := s.getBlock(ctx, s.ensureRootNotZeros(finalizedRoot))
if err != nil {
return nil, errors.Wrap(err, "could not get finalized block")
}
if finalizedBlock == nil || finalizedBlock.IsNil() {
finalizedBlock = s.getInitSyncBlock(s.ensureRootNotZeros(finalizedRoot))
if finalizedBlock == nil || finalizedBlock.IsNil() {
return nil, errors.Errorf("finalized block with root %#x does not exist in the db or our cache", s.ensureRootNotZeros(finalizedRoot))
}
return nil, err
}
var finalizedHash []byte
if blocks.IsPreBellatrixVersion(finalizedBlock.Block().Version()) {

View File

@@ -76,15 +76,10 @@ func verifyAttTargetEpoch(_ context.Context, genesisTime, nowTime uint64, c *eth
// verifyBeaconBlock verifies beacon head block is known and not from the future.
func (s *Service) verifyBeaconBlock(ctx context.Context, data *ethpb.AttestationData) error {
r := bytesutil.ToBytes32(data.BeaconBlockRoot)
b, err := s.cfg.BeaconDB.Block(ctx, r)
b, err := s.getBlock(ctx, r)
if err != nil {
return err
}
// If the block does not exist in db, check again if block exists in initial sync block cache.
// This could happen as the node first syncs to head.
if (b == nil || b.IsNil()) && s.hasInitSyncBlock(r) {
b = s.getInitSyncBlock(r)
}
if err := helpers.BeaconBlockIsNil(b); err != nil {
return err
}

View File

@@ -438,7 +438,7 @@ func TestVerifyBeaconBlock_NoBlock(t *testing.T) {
require.NoError(t, err)
d := util.HydrateAttestationData(&ethpb.AttestationData{})
assert.ErrorContains(t, "signed beacon block can't be nil", service.verifyBeaconBlock(ctx, d))
require.Equal(t, errBlockNotFoundInCacheOrDB, service.verifyBeaconBlock(ctx, d))
}
func TestVerifyBeaconBlock_futureBlock(t *testing.T) {

View File

@@ -317,17 +317,9 @@ func (s *Service) ancestorByDB(ctx context.Context, r [32]byte, slot types.Slot)
return nil, ctx.Err()
}
signed, err := s.cfg.BeaconDB.Block(ctx, r)
signed, err := s.getBlock(ctx, r)
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor block")
}
if s.hasInitSyncBlock(r) {
signed = s.getInitSyncBlock(r)
}
if signed == nil || signed.IsNil() || signed.Block().IsNil() {
return nil, errors.New("nil block")
return nil, err
}
b := signed.Block()
if b.Slot() == slot || b.Slot() < slot {

View File

@@ -177,14 +177,24 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32
if s.headRoot() == newHeadRoot {
return
}
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
if !s.hasBlockInInitSyncOrDB(ctx, newHeadRoot) {
return // We don't have the block, don't notify the engine and update head.
}
finalized := s.store.FinalizedCheckpt()
if finalized == nil {
log.WithError(errNilFinalizedInStore).Error("could not get finalized checkpoint")
return
}
newHeadBlock, err := s.cfg.BeaconDB.Block(ctx, newHeadRoot)
newHeadBlock, err := s.getBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get block from db")
log.WithError(err).Error("Could not get new head block")
return
}
headState, err := s.cfg.StateGen.StateByRoot(ctx, newHeadRoot)

View File

@@ -140,6 +140,9 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
finalizedErr := "could not get finalized checkpoint"
require.LogsDoNotContain(t, hook, finalizedErr)
require.LogsDoNotContain(t, hook, hookErr)
gb, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
service.saveInitSyncBlock([32]byte{'a'}, gb)
service.notifyEngineIfChangedHead(ctx, [32]byte{'a'})
require.LogsContain(t, hook, finalizedErr)
@@ -149,13 +152,14 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
block: nil, /* should not panic if notify head uses correct head */
}
// Block in Cache
b := util.NewBeaconBlock()
b.Block.Slot = 2
wsb, err := wrapper.WrappedSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
r1, err := b.Block.HashTreeRoot()
require.NoError(t, err)
service.saveInitSyncBlock(r1, wsb)
finalized := &ethpb.Checkpoint{Root: r1[:], Epoch: 0}
st, _ := util.DeterministicGenesisState(t, 1)
service.head = &head{
@@ -169,6 +173,28 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
service.notifyEngineIfChangedHead(ctx, r1)
require.LogsDoNotContain(t, hook, finalizedErr)
require.LogsDoNotContain(t, hook, hookErr)
// Block in DB
b = util.NewBeaconBlock()
b.Block.Slot = 3
wsb, err = wrapper.WrappedSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
r1, err = b.Block.HashTreeRoot()
require.NoError(t, err)
finalized = &ethpb.Checkpoint{Root: r1[:], Epoch: 0}
st, _ = util.DeterministicGenesisState(t, 1)
service.head = &head{
slot: 1,
root: r1,
block: wsb,
state: st,
}
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1})
service.store.SetFinalizedCheckpt(finalized)
service.notifyEngineIfChangedHead(ctx, r1)
require.LogsDoNotContain(t, hook, finalizedErr)
require.LogsDoNotContain(t, hook, hookErr)
vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2)
require.Equal(t, true, has)
require.Equal(t, types.ValidatorIndex(1), vId)

View File

@@ -62,11 +62,17 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
select {
case <-ticker.C:
log.Debugf("Trying to dial endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
currClient := s.rpcClient
if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil {
errorLogger(err, "Could not connect to execution client endpoint")
s.runError = err
s.fallbackToNextEndpoint()
continue
}
// Close previous client, if connection was successful.
currClient.Close()
log.Infof("Connected to new endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
return
case <-s.ctx.Done():
log.Debug("Received cancelled context,closing existing powchain service")
return
@@ -80,10 +86,13 @@ func (s *Service) retryExecutionClientConnection(ctx context.Context, err error)
s.updateConnectedETH1(false)
// Back off for a while before redialing.
time.Sleep(backOffPeriod)
currClient := s.rpcClient
if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil {
s.runError = err
return
}
// Close previous client, if connection was successful.
currClient.Close()
// Reset run error in the event of a successful connection.
s.runError = nil
}
@@ -99,10 +108,13 @@ func (s *Service) checkDefaultEndpoint(ctx context.Context) {
return
}
currClient := s.rpcClient
if err := s.setupExecutionClientConnections(ctx, primaryEndpoint); err != nil {
log.Debugf("Primary endpoint not ready: %v", err)
return
}
// Close previous client, if connection was successful.
currClient.Close()
s.updateCurrHttpEndpoint(primaryEndpoint)
}

View File

@@ -246,7 +246,7 @@ func (s *Service) Start() {
s.isRunning = true
// Poll the execution client connection and fallback if errors occur.
go s.pollConnectionStatus(s.ctx)
s.pollConnectionStatus(s.ctx)
// Check transition configuration for the engine API client in the background.
go s.checkTransitionConfiguration(s.ctx, make(chan *feed.Event, 1))
@@ -355,6 +355,7 @@ func (s *Service) ETH1ConnectionErrors() []error {
for _, ep := range s.cfg.httpEndpoints {
client, err := s.newRPCClientWithAuth(s.ctx, ep)
if err != nil {
client.Close()
errs = append(errs, err)
continue
}
@@ -602,7 +603,9 @@ func (s *Service) initPOWService() {
}
s.chainStartData.GenesisBlock = genBlock
if err := s.savePowchainData(ctx); err != nil {
s.retryExecutionClientConnection(ctx, err)
errorLogger(err, "Unable to save execution client data")
continue
}
}
return
@@ -624,17 +627,19 @@ func (s *Service) run(done <-chan struct{}) {
case <-done:
s.isRunning = false
s.runError = nil
s.rpcClient.Close()
s.updateConnectedETH1(false)
log.Debug("Context closed, exiting goroutine")
return
case <-s.headTicker.C:
head, err := s.eth1DataFetcher.HeaderByNumber(s.ctx, nil)
if err != nil {
s.pollConnectionStatus(s.ctx)
log.WithError(err).Debug("Could not fetch latest eth1 header")
continue
}
if eth1HeadIsBehind(head.Time) {
s.retryExecutionClientConnection(s.ctx, err)
s.pollConnectionStatus(s.ctx)
log.WithError(errFarBehind).Debug("Could not get an up to date eth1 header")
continue
}

View File

@@ -141,7 +141,12 @@ func (vs *Server) CheckDoppelGanger(ctx context.Context, req *ethpb.DoppelGanger
// We request a state 32 slots ago. We are guaranteed to have
// currentSlot > 32 since we assume that we are in Altair's fork.
prevState, err := vs.ReplayerBuilder.ReplayerForSlot(headSlot - params.BeaconConfig().SlotsPerEpoch).ReplayBlocks(ctx)
prevStateSlot := headSlot - params.BeaconConfig().SlotsPerEpoch
prevEpochEnd, err := slots.EpochEnd(slots.ToEpoch(prevStateSlot))
if err != nil {
return nil, status.Error(codes.Internal, "Could not get previous epoch's end")
}
prevState, err := vs.ReplayerBuilder.ReplayerForSlot(prevEpochEnd).ReplayBlocks(ctx)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get previous state")
}
@@ -171,7 +176,7 @@ func (vs *Server) CheckDoppelGanger(ctx context.Context, req *ethpb.DoppelGanger
// ago, the current doppelganger check will not be able to
// identify dopplelgangers since an attestation can take up to
// 31 slots to be included.
if v.Epoch+1 >= currEpoch {
if v.Epoch+2 >= currEpoch {
resp.Responses = append(resp.Responses,
&ethpb.DoppelGangerResponse_ValidatorResponse{
PublicKey: v.PublicKey,
@@ -376,8 +381,8 @@ func checkValidatorsAreRecent(headEpoch types.Epoch, req *ethpb.DoppelGangerRequ
// Due to how balances are reflected for individual
// validators, we can only effectively determine if a
// validator voted or not if we are able to look
// back more than 1 epoch into the past.
if v.Epoch+1 < headEpoch {
// back more than 2 epoch into the past.
if v.Epoch+2 < headEpoch {
validatorsAreRecent = false
// Zero out response if we encounter non-recent validators to
// guard against potential misuse.

View File

@@ -961,7 +961,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
hs, ps, keys := createStateSetupAltair(t, 3)
rb := mockstategen.NewMockReplayerBuilder()
rb.SetMockStateForSlot(ps, 20)
rb.SetMockStateForSlot(ps, 23)
vs := &Server{
HeadFetcher: &mockChain.ChainService{
State: hs,
@@ -993,7 +993,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
hs, ps, keys := createStateSetupAltair(t, 3)
rb := mockstategen.NewMockReplayerBuilder()
rb.SetMockStateForSlot(ps, 20)
rb.SetMockStateForSlot(ps, 23)
currentIndices := make([]byte, 64)
currentIndices[2] = 1
require.NoError(t, hs.SetCurrentParticipationBits(currentIndices))
@@ -1024,7 +1024,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
// Add in for duplicate validator
request.ValidatorRequests = append(request.ValidatorRequests, &ethpb.DoppelGangerRequest_ValidatorRequest{
PublicKey: keys[2].PublicKey().Marshal(),
Epoch: 1,
Epoch: 0,
SignedRoot: []byte{'A'},
})
response.Responses = append(response.Responses, &ethpb.DoppelGangerResponse_ValidatorResponse{
@@ -1043,7 +1043,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
prevIndices[2] = 1
require.NoError(t, ps.SetPreviousParticipationBits(prevIndices))
rb := mockstategen.NewMockReplayerBuilder()
rb.SetMockStateForSlot(ps, 20)
rb.SetMockStateForSlot(ps, 23)
vs := &Server{
HeadFetcher: &mockChain.ChainService{
@@ -1071,7 +1071,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
// Add in for duplicate validator
request.ValidatorRequests = append(request.ValidatorRequests, &ethpb.DoppelGangerRequest_ValidatorRequest{
PublicKey: keys[2].PublicKey().Marshal(),
Epoch: 1,
Epoch: 0,
SignedRoot: []byte{'A'},
})
response.Responses = append(response.Responses, &ethpb.DoppelGangerResponse_ValidatorResponse{
@@ -1091,7 +1091,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
currentIndices[11] = 2
require.NoError(t, hs.SetPreviousParticipationBits(currentIndices))
rb := mockstategen.NewMockReplayerBuilder()
rb.SetMockStateForSlot(ps, 20)
rb.SetMockStateForSlot(ps, 23)
prevIndices := make([]byte, 64)
for i := 12; i < 20; i++ {
@@ -1114,7 +1114,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
// Add in for duplicate validator
request.ValidatorRequests = append(request.ValidatorRequests, &ethpb.DoppelGangerRequest_ValidatorRequest{
PublicKey: keys[i].PublicKey().Marshal(),
Epoch: 1,
Epoch: 0,
SignedRoot: []byte{'A'},
})
response.Responses = append(response.Responses, &ethpb.DoppelGangerResponse_ValidatorResponse{
@@ -1143,8 +1143,11 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
hs, ps, keys := createStateSetupAltair(t, 3)
rb := mockstategen.NewMockReplayerBuilder()
rb.SetMockStateForSlot(ps, 20)
rb.SetMockStateForSlot(ps, 23)
currentIndices := make([]byte, 64)
currentIndices[0] = 1
currentIndices[1] = 2
require.NoError(t, hs.SetPreviousParticipationBits(currentIndices))
vs := &Server{
HeadFetcher: &mockChain.ChainService{
State: hs,
@@ -1171,6 +1174,43 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
return vs, request, response
},
},
{
name: "attesters are too recent(previous state)",
wantErr: false,
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
hs, ps, keys := createStateSetupAltair(t, 3)
rb := mockstategen.NewMockReplayerBuilder()
rb.SetMockStateForSlot(ps, 23)
currentIndices := make([]byte, 64)
currentIndices[0] = 1
currentIndices[1] = 2
require.NoError(t, ps.SetPreviousParticipationBits(currentIndices))
vs := &Server{
HeadFetcher: &mockChain.ChainService{
State: hs,
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
ReplayerBuilder: rb,
}
request := &ethpb.DoppelGangerRequest{
ValidatorRequests: make([]*ethpb.DoppelGangerRequest_ValidatorRequest, 0),
}
response := &ethpb.DoppelGangerResponse{Responses: make([]*ethpb.DoppelGangerResponse_ValidatorResponse, 0)}
for i := 0; i < 15; i++ {
request.ValidatorRequests = append(request.ValidatorRequests, &ethpb.DoppelGangerRequest_ValidatorRequest{
PublicKey: keys[i].PublicKey().Marshal(),
Epoch: 1,
SignedRoot: []byte{'A'},
})
response.Responses = append(response.Responses, &ethpb.DoppelGangerResponse_ValidatorResponse{
PublicKey: keys[i].PublicKey().Marshal(),
DuplicateExists: false,
})
}
return vs, request, response
},
},
{
name: "exit early for Phase 0",
wantErr: false,

View File

@@ -112,7 +112,7 @@ func (s *State) LoadBlocks(ctx context.Context, startSlot, endSlot types.Slot, e
}
if blockRoots[length-1] != endBlockRoot {
return nil, errors.New("end block roots don't match")
return nil, errors.Errorf("end block roots don't match: got %#x but wanted %#x for %d to %d at length %d", blockRoots[length-1], endBlockRoot, startSlot, endSlot, length)
}
filteredBlocks := []block.SignedBeaconBlock{blocks[length-1]}

View File

@@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/async"
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams"
"github.com/prysmaticlabs/prysm/config/params"
@@ -21,7 +20,6 @@ import (
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/runtime/version"
prysmTime "github.com/prysmaticlabs/prysm/time"
"github.com/prysmaticlabs/prysm/validator/client/iface"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/protobuf/types/known/emptypb"
@@ -46,10 +44,6 @@ func (v *validator) ProposeBlock(ctx context.Context, slot types.Slot, pubKey [f
ctx, span := trace.StartSpan(ctx, "validator.ProposeBlock")
defer span.End()
lock := async.NewMultilock(fmt.Sprint(iface.RoleProposer), string(pubKey[:]))
lock.Lock()
defer lock.Unlock()
fmtKey := fmt.Sprintf("%#x", pubKey[:])
span.AddAttributes(trace.StringAttribute("validator", fmtKey))
log := log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:])))
@@ -131,6 +125,34 @@ func (v *validator) ProposeBlock(ctx context.Context, slot types.Slot, pubKey [f
}
return
}
hasProposerNextSlot := false
hasProposerPrevSlot := false
hasProposerPrevPrevSlot := false
for _, d := range v.duties.Duties {
if slot < 6 {
continue
}
if len(d.ProposerSlots) > 0 && d.ProposerSlots[0] == slot+1 {
hasProposerNextSlot = true
}
if len(d.ProposerSlots) > 0 && d.ProposerSlots[0] == slot-1 {
hasProposerPrevSlot = true
}
if len(d.ProposerSlots) > 0 && d.ProposerSlots[0] == slot-2 {
hasProposerPrevPrevSlot = true
}
}
if hasProposerNextSlot && !hasProposerPrevSlot {
log.Infof("waiting for slot %d", slot+1)
<-v.propChan
log.Infof("received from slot %d", slot+1)
}
if hasProposerPrevSlot && !hasProposerPrevPrevSlot {
log.Infof("sending to slot %d", slot-1)
v.propChan <- true
log.Infof("sent to slot %d", slot-1)
}
ctx = context.Background()
blkResp, err := v.validatorClient.ProposeBeaconBlock(ctx, proposal)
if err != nil {
log.WithError(err).Error("Failed to propose block")

View File

@@ -207,6 +207,7 @@ func (v *ValidatorService) Start() {
Web3SignerConfig: v.Web3SignerConfig,
feeRecipientConfig: v.feeRecipientConfig,
walletIntializedChannel: make(chan *wallet.Wallet, 1),
propChan: make(chan bool, 1),
}
// To resolve a race condition at startup due to the interface
// nature of the abstracted block type. We initialize

View File

@@ -80,6 +80,7 @@ type validator struct {
domainDataCache *ristretto.Cache
highestValidSlot types.Slot
genesisTime uint64
propChan chan bool
blockFeed *event.Feed
interopKeysConfig *local.InteropKeymanagerConfig
wallet *wallet.Wallet