mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
4 Commits
v6.1.3-rc.
...
e2e-hack-2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98e7a3c2ab | ||
|
|
208ae6bbf0 | ||
|
|
cb494c2215 | ||
|
|
844d9623b6 |
@@ -1,9 +1,15 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
|
||||
)
|
||||
|
||||
var errBlockNotFoundInCacheOrDB = errors.New("block not found in cache or db")
|
||||
|
||||
// This saves a beacon block to the initial sync blocks cache.
|
||||
func (s *Service) saveInitSyncBlock(r [32]byte, b block.SignedBeaconBlock) {
|
||||
s.initSyncBlocksLock.Lock()
|
||||
@@ -20,13 +26,33 @@ func (s *Service) hasInitSyncBlock(r [32]byte) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// This retrieves a beacon block from the initial sync blocks cache using the root of
|
||||
// the block.
|
||||
func (s *Service) getInitSyncBlock(r [32]byte) block.SignedBeaconBlock {
|
||||
// Returns true if a block for root `r` exists in the initial sync blocks cache or the DB.
|
||||
func (s *Service) hasBlockInInitSyncOrDB(ctx context.Context, r [32]byte) bool {
|
||||
if s.hasInitSyncBlock(r) {
|
||||
return true
|
||||
}
|
||||
return s.cfg.BeaconDB.HasBlock(ctx, r)
|
||||
}
|
||||
|
||||
// Returns block for a given root `r` from either the initial sync blocks cache or the DB.
|
||||
// Error is returned if the block is not found in either cache or DB.
|
||||
func (s *Service) getBlock(ctx context.Context, r [32]byte) (block.SignedBeaconBlock, error) {
|
||||
s.initSyncBlocksLock.RLock()
|
||||
defer s.initSyncBlocksLock.RUnlock()
|
||||
b := s.initSyncBlocks[r]
|
||||
return b
|
||||
|
||||
// Check cache first because it's faster.
|
||||
b, ok := s.initSyncBlocks[r]
|
||||
s.initSyncBlocksLock.RUnlock()
|
||||
var err error
|
||||
if !ok {
|
||||
b, err = s.cfg.BeaconDB.Block(ctx, r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not retrieve block from db")
|
||||
}
|
||||
}
|
||||
if err := helpers.BeaconBlockIsNil(b); err != nil {
|
||||
return nil, errBlockNotFoundInCacheOrDB
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// This retrieves all the beacon blocks from the initial sync blocks cache, the returned
|
||||
|
||||
72
beacon-chain/blockchain/init_sync_process_block_test.go
Normal file
72
beacon-chain/blockchain/init_sync_process_block_test.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
"github.com/prysmaticlabs/prysm/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/testing/util"
|
||||
)
|
||||
|
||||
func TestService_getBlock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
s := setupBeaconChain(t, beaconDB)
|
||||
b1 := util.NewBeaconBlock()
|
||||
r1, err := b1.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
b2 := util.NewBeaconBlock()
|
||||
b2.Block.Slot = 100
|
||||
r2, err := b2.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// block not found
|
||||
_, err = s.getBlock(ctx, [32]byte{})
|
||||
require.ErrorIs(t, err, errBlockNotFoundInCacheOrDB)
|
||||
|
||||
// block in cache
|
||||
b, err := wrapper.WrappedSignedBeaconBlock(b1)
|
||||
require.NoError(t, err)
|
||||
s.saveInitSyncBlock(r1, b)
|
||||
got, err := s.getBlock(ctx, r1)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, b, got)
|
||||
|
||||
// block in db
|
||||
b, err = wrapper.WrappedSignedBeaconBlock(b2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, s.cfg.BeaconDB.SaveBlock(ctx, b))
|
||||
got, err = s.getBlock(ctx, r2)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, b, got)
|
||||
}
|
||||
|
||||
func TestService_hasBlockInInitSyncOrDB(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
s := setupBeaconChain(t, beaconDB)
|
||||
b1 := util.NewBeaconBlock()
|
||||
r1, err := b1.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
b2 := util.NewBeaconBlock()
|
||||
b2.Block.Slot = 100
|
||||
r2, err := b2.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// block not found
|
||||
require.Equal(t, false, s.hasBlockInInitSyncOrDB(ctx, [32]byte{}))
|
||||
|
||||
// block in cache
|
||||
b, err := wrapper.WrappedSignedBeaconBlock(b1)
|
||||
require.NoError(t, err)
|
||||
s.saveInitSyncBlock(r1, b)
|
||||
require.Equal(t, true, s.hasBlockInInitSyncOrDB(ctx, r1))
|
||||
|
||||
// block in db
|
||||
b, err = wrapper.WrappedSignedBeaconBlock(b2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, s.cfg.BeaconDB.SaveBlock(ctx, b))
|
||||
require.Equal(t, true, s.hasBlockInInitSyncOrDB(ctx, r2))
|
||||
}
|
||||
@@ -46,15 +46,9 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headState state.Be
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get execution payload")
|
||||
}
|
||||
finalizedBlock, err := s.cfg.BeaconDB.Block(ctx, s.ensureRootNotZeros(finalizedRoot))
|
||||
finalizedBlock, err := s.getBlock(ctx, s.ensureRootNotZeros(finalizedRoot))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get finalized block")
|
||||
}
|
||||
if finalizedBlock == nil || finalizedBlock.IsNil() {
|
||||
finalizedBlock = s.getInitSyncBlock(s.ensureRootNotZeros(finalizedRoot))
|
||||
if finalizedBlock == nil || finalizedBlock.IsNil() {
|
||||
return nil, errors.Errorf("finalized block with root %#x does not exist in the db or our cache", s.ensureRootNotZeros(finalizedRoot))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
var finalizedHash []byte
|
||||
if blocks.IsPreBellatrixVersion(finalizedBlock.Block().Version()) {
|
||||
|
||||
@@ -76,15 +76,10 @@ func verifyAttTargetEpoch(_ context.Context, genesisTime, nowTime uint64, c *eth
|
||||
// verifyBeaconBlock verifies beacon head block is known and not from the future.
|
||||
func (s *Service) verifyBeaconBlock(ctx context.Context, data *ethpb.AttestationData) error {
|
||||
r := bytesutil.ToBytes32(data.BeaconBlockRoot)
|
||||
b, err := s.cfg.BeaconDB.Block(ctx, r)
|
||||
b, err := s.getBlock(ctx, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If the block does not exist in db, check again if block exists in initial sync block cache.
|
||||
// This could happen as the node first syncs to head.
|
||||
if (b == nil || b.IsNil()) && s.hasInitSyncBlock(r) {
|
||||
b = s.getInitSyncBlock(r)
|
||||
}
|
||||
if err := helpers.BeaconBlockIsNil(b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -438,7 +438,7 @@ func TestVerifyBeaconBlock_NoBlock(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
d := util.HydrateAttestationData(ðpb.AttestationData{})
|
||||
assert.ErrorContains(t, "signed beacon block can't be nil", service.verifyBeaconBlock(ctx, d))
|
||||
require.Equal(t, errBlockNotFoundInCacheOrDB, service.verifyBeaconBlock(ctx, d))
|
||||
}
|
||||
|
||||
func TestVerifyBeaconBlock_futureBlock(t *testing.T) {
|
||||
|
||||
@@ -317,17 +317,9 @@ func (s *Service) ancestorByDB(ctx context.Context, r [32]byte, slot types.Slot)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
signed, err := s.cfg.BeaconDB.Block(ctx, r)
|
||||
signed, err := s.getBlock(ctx, r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get ancestor block")
|
||||
}
|
||||
|
||||
if s.hasInitSyncBlock(r) {
|
||||
signed = s.getInitSyncBlock(r)
|
||||
}
|
||||
|
||||
if signed == nil || signed.IsNil() || signed.Block().IsNil() {
|
||||
return nil, errors.New("nil block")
|
||||
return nil, err
|
||||
}
|
||||
b := signed.Block()
|
||||
if b.Slot() == slot || b.Slot() < slot {
|
||||
|
||||
@@ -177,14 +177,24 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32
|
||||
if s.headRoot() == newHeadRoot {
|
||||
return
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
|
||||
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
|
||||
}).Debug("Head changed due to attestations")
|
||||
|
||||
if !s.hasBlockInInitSyncOrDB(ctx, newHeadRoot) {
|
||||
return // We don't have the block, don't notify the engine and update head.
|
||||
}
|
||||
|
||||
finalized := s.store.FinalizedCheckpt()
|
||||
if finalized == nil {
|
||||
log.WithError(errNilFinalizedInStore).Error("could not get finalized checkpoint")
|
||||
return
|
||||
}
|
||||
newHeadBlock, err := s.cfg.BeaconDB.Block(ctx, newHeadRoot)
|
||||
|
||||
newHeadBlock, err := s.getBlock(ctx, newHeadRoot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get block from db")
|
||||
log.WithError(err).Error("Could not get new head block")
|
||||
return
|
||||
}
|
||||
headState, err := s.cfg.StateGen.StateByRoot(ctx, newHeadRoot)
|
||||
|
||||
@@ -140,6 +140,9 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
|
||||
finalizedErr := "could not get finalized checkpoint"
|
||||
require.LogsDoNotContain(t, hook, finalizedErr)
|
||||
require.LogsDoNotContain(t, hook, hookErr)
|
||||
gb, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
|
||||
require.NoError(t, err)
|
||||
service.saveInitSyncBlock([32]byte{'a'}, gb)
|
||||
service.notifyEngineIfChangedHead(ctx, [32]byte{'a'})
|
||||
require.LogsContain(t, hook, finalizedErr)
|
||||
|
||||
@@ -149,13 +152,14 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
|
||||
block: nil, /* should not panic if notify head uses correct head */
|
||||
}
|
||||
|
||||
// Block in Cache
|
||||
b := util.NewBeaconBlock()
|
||||
b.Block.Slot = 2
|
||||
wsb, err := wrapper.WrappedSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
|
||||
r1, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
service.saveInitSyncBlock(r1, wsb)
|
||||
finalized := ðpb.Checkpoint{Root: r1[:], Epoch: 0}
|
||||
st, _ := util.DeterministicGenesisState(t, 1)
|
||||
service.head = &head{
|
||||
@@ -169,6 +173,28 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
|
||||
service.notifyEngineIfChangedHead(ctx, r1)
|
||||
require.LogsDoNotContain(t, hook, finalizedErr)
|
||||
require.LogsDoNotContain(t, hook, hookErr)
|
||||
|
||||
// Block in DB
|
||||
b = util.NewBeaconBlock()
|
||||
b.Block.Slot = 3
|
||||
wsb, err = wrapper.WrappedSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
|
||||
r1, err = b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
finalized = ðpb.Checkpoint{Root: r1[:], Epoch: 0}
|
||||
st, _ = util.DeterministicGenesisState(t, 1)
|
||||
service.head = &head{
|
||||
slot: 1,
|
||||
root: r1,
|
||||
block: wsb,
|
||||
state: st,
|
||||
}
|
||||
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1})
|
||||
service.store.SetFinalizedCheckpt(finalized)
|
||||
service.notifyEngineIfChangedHead(ctx, r1)
|
||||
require.LogsDoNotContain(t, hook, finalizedErr)
|
||||
require.LogsDoNotContain(t, hook, hookErr)
|
||||
vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2)
|
||||
require.Equal(t, true, has)
|
||||
require.Equal(t, types.ValidatorIndex(1), vId)
|
||||
|
||||
@@ -62,11 +62,17 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
log.Debugf("Trying to dial endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
|
||||
currClient := s.rpcClient
|
||||
if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil {
|
||||
errorLogger(err, "Could not connect to execution client endpoint")
|
||||
s.runError = err
|
||||
s.fallbackToNextEndpoint()
|
||||
continue
|
||||
}
|
||||
// Close previous client, if connection was successful.
|
||||
currClient.Close()
|
||||
log.Infof("Connected to new endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
|
||||
return
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("Received cancelled context,closing existing powchain service")
|
||||
return
|
||||
@@ -80,10 +86,13 @@ func (s *Service) retryExecutionClientConnection(ctx context.Context, err error)
|
||||
s.updateConnectedETH1(false)
|
||||
// Back off for a while before redialing.
|
||||
time.Sleep(backOffPeriod)
|
||||
currClient := s.rpcClient
|
||||
if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil {
|
||||
s.runError = err
|
||||
return
|
||||
}
|
||||
// Close previous client, if connection was successful.
|
||||
currClient.Close()
|
||||
// Reset run error in the event of a successful connection.
|
||||
s.runError = nil
|
||||
}
|
||||
@@ -99,10 +108,13 @@ func (s *Service) checkDefaultEndpoint(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
currClient := s.rpcClient
|
||||
if err := s.setupExecutionClientConnections(ctx, primaryEndpoint); err != nil {
|
||||
log.Debugf("Primary endpoint not ready: %v", err)
|
||||
return
|
||||
}
|
||||
// Close previous client, if connection was successful.
|
||||
currClient.Close()
|
||||
s.updateCurrHttpEndpoint(primaryEndpoint)
|
||||
}
|
||||
|
||||
|
||||
@@ -246,7 +246,7 @@ func (s *Service) Start() {
|
||||
s.isRunning = true
|
||||
|
||||
// Poll the execution client connection and fallback if errors occur.
|
||||
go s.pollConnectionStatus(s.ctx)
|
||||
s.pollConnectionStatus(s.ctx)
|
||||
|
||||
// Check transition configuration for the engine API client in the background.
|
||||
go s.checkTransitionConfiguration(s.ctx, make(chan *feed.Event, 1))
|
||||
@@ -355,6 +355,7 @@ func (s *Service) ETH1ConnectionErrors() []error {
|
||||
for _, ep := range s.cfg.httpEndpoints {
|
||||
client, err := s.newRPCClientWithAuth(s.ctx, ep)
|
||||
if err != nil {
|
||||
client.Close()
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
@@ -602,7 +603,9 @@ func (s *Service) initPOWService() {
|
||||
}
|
||||
s.chainStartData.GenesisBlock = genBlock
|
||||
if err := s.savePowchainData(ctx); err != nil {
|
||||
s.retryExecutionClientConnection(ctx, err)
|
||||
errorLogger(err, "Unable to save execution client data")
|
||||
continue
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -624,17 +627,19 @@ func (s *Service) run(done <-chan struct{}) {
|
||||
case <-done:
|
||||
s.isRunning = false
|
||||
s.runError = nil
|
||||
s.rpcClient.Close()
|
||||
s.updateConnectedETH1(false)
|
||||
log.Debug("Context closed, exiting goroutine")
|
||||
return
|
||||
case <-s.headTicker.C:
|
||||
head, err := s.eth1DataFetcher.HeaderByNumber(s.ctx, nil)
|
||||
if err != nil {
|
||||
s.pollConnectionStatus(s.ctx)
|
||||
log.WithError(err).Debug("Could not fetch latest eth1 header")
|
||||
continue
|
||||
}
|
||||
if eth1HeadIsBehind(head.Time) {
|
||||
s.retryExecutionClientConnection(s.ctx, err)
|
||||
s.pollConnectionStatus(s.ctx)
|
||||
log.WithError(errFarBehind).Debug("Could not get an up to date eth1 header")
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -141,7 +141,12 @@ func (vs *Server) CheckDoppelGanger(ctx context.Context, req *ethpb.DoppelGanger
|
||||
|
||||
// We request a state 32 slots ago. We are guaranteed to have
|
||||
// currentSlot > 32 since we assume that we are in Altair's fork.
|
||||
prevState, err := vs.ReplayerBuilder.ReplayerForSlot(headSlot - params.BeaconConfig().SlotsPerEpoch).ReplayBlocks(ctx)
|
||||
prevStateSlot := headSlot - params.BeaconConfig().SlotsPerEpoch
|
||||
prevEpochEnd, err := slots.EpochEnd(slots.ToEpoch(prevStateSlot))
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get previous epoch's end")
|
||||
}
|
||||
prevState, err := vs.ReplayerBuilder.ReplayerForSlot(prevEpochEnd).ReplayBlocks(ctx)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get previous state")
|
||||
}
|
||||
@@ -171,7 +176,7 @@ func (vs *Server) CheckDoppelGanger(ctx context.Context, req *ethpb.DoppelGanger
|
||||
// ago, the current doppelganger check will not be able to
|
||||
// identify dopplelgangers since an attestation can take up to
|
||||
// 31 slots to be included.
|
||||
if v.Epoch+1 >= currEpoch {
|
||||
if v.Epoch+2 >= currEpoch {
|
||||
resp.Responses = append(resp.Responses,
|
||||
ðpb.DoppelGangerResponse_ValidatorResponse{
|
||||
PublicKey: v.PublicKey,
|
||||
@@ -376,8 +381,8 @@ func checkValidatorsAreRecent(headEpoch types.Epoch, req *ethpb.DoppelGangerRequ
|
||||
// Due to how balances are reflected for individual
|
||||
// validators, we can only effectively determine if a
|
||||
// validator voted or not if we are able to look
|
||||
// back more than 1 epoch into the past.
|
||||
if v.Epoch+1 < headEpoch {
|
||||
// back more than 2 epoch into the past.
|
||||
if v.Epoch+2 < headEpoch {
|
||||
validatorsAreRecent = false
|
||||
// Zero out response if we encounter non-recent validators to
|
||||
// guard against potential misuse.
|
||||
|
||||
@@ -961,7 +961,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
|
||||
hs, ps, keys := createStateSetupAltair(t, 3)
|
||||
rb := mockstategen.NewMockReplayerBuilder()
|
||||
rb.SetMockStateForSlot(ps, 20)
|
||||
rb.SetMockStateForSlot(ps, 23)
|
||||
vs := &Server{
|
||||
HeadFetcher: &mockChain.ChainService{
|
||||
State: hs,
|
||||
@@ -993,7 +993,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
|
||||
hs, ps, keys := createStateSetupAltair(t, 3)
|
||||
rb := mockstategen.NewMockReplayerBuilder()
|
||||
rb.SetMockStateForSlot(ps, 20)
|
||||
rb.SetMockStateForSlot(ps, 23)
|
||||
currentIndices := make([]byte, 64)
|
||||
currentIndices[2] = 1
|
||||
require.NoError(t, hs.SetCurrentParticipationBits(currentIndices))
|
||||
@@ -1024,7 +1024,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
// Add in for duplicate validator
|
||||
request.ValidatorRequests = append(request.ValidatorRequests, ðpb.DoppelGangerRequest_ValidatorRequest{
|
||||
PublicKey: keys[2].PublicKey().Marshal(),
|
||||
Epoch: 1,
|
||||
Epoch: 0,
|
||||
SignedRoot: []byte{'A'},
|
||||
})
|
||||
response.Responses = append(response.Responses, ðpb.DoppelGangerResponse_ValidatorResponse{
|
||||
@@ -1043,7 +1043,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
prevIndices[2] = 1
|
||||
require.NoError(t, ps.SetPreviousParticipationBits(prevIndices))
|
||||
rb := mockstategen.NewMockReplayerBuilder()
|
||||
rb.SetMockStateForSlot(ps, 20)
|
||||
rb.SetMockStateForSlot(ps, 23)
|
||||
|
||||
vs := &Server{
|
||||
HeadFetcher: &mockChain.ChainService{
|
||||
@@ -1071,7 +1071,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
// Add in for duplicate validator
|
||||
request.ValidatorRequests = append(request.ValidatorRequests, ðpb.DoppelGangerRequest_ValidatorRequest{
|
||||
PublicKey: keys[2].PublicKey().Marshal(),
|
||||
Epoch: 1,
|
||||
Epoch: 0,
|
||||
SignedRoot: []byte{'A'},
|
||||
})
|
||||
response.Responses = append(response.Responses, ðpb.DoppelGangerResponse_ValidatorResponse{
|
||||
@@ -1091,7 +1091,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
currentIndices[11] = 2
|
||||
require.NoError(t, hs.SetPreviousParticipationBits(currentIndices))
|
||||
rb := mockstategen.NewMockReplayerBuilder()
|
||||
rb.SetMockStateForSlot(ps, 20)
|
||||
rb.SetMockStateForSlot(ps, 23)
|
||||
|
||||
prevIndices := make([]byte, 64)
|
||||
for i := 12; i < 20; i++ {
|
||||
@@ -1114,7 +1114,7 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
// Add in for duplicate validator
|
||||
request.ValidatorRequests = append(request.ValidatorRequests, ðpb.DoppelGangerRequest_ValidatorRequest{
|
||||
PublicKey: keys[i].PublicKey().Marshal(),
|
||||
Epoch: 1,
|
||||
Epoch: 0,
|
||||
SignedRoot: []byte{'A'},
|
||||
})
|
||||
response.Responses = append(response.Responses, ðpb.DoppelGangerResponse_ValidatorResponse{
|
||||
@@ -1143,8 +1143,11 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
|
||||
hs, ps, keys := createStateSetupAltair(t, 3)
|
||||
rb := mockstategen.NewMockReplayerBuilder()
|
||||
rb.SetMockStateForSlot(ps, 20)
|
||||
|
||||
rb.SetMockStateForSlot(ps, 23)
|
||||
currentIndices := make([]byte, 64)
|
||||
currentIndices[0] = 1
|
||||
currentIndices[1] = 2
|
||||
require.NoError(t, hs.SetPreviousParticipationBits(currentIndices))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mockChain.ChainService{
|
||||
State: hs,
|
||||
@@ -1171,6 +1174,43 @@ func TestServer_CheckDoppelGanger(t *testing.T) {
|
||||
return vs, request, response
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "attesters are too recent(previous state)",
|
||||
wantErr: false,
|
||||
svSetup: func(t *testing.T) (*Server, *ethpb.DoppelGangerRequest, *ethpb.DoppelGangerResponse) {
|
||||
hs, ps, keys := createStateSetupAltair(t, 3)
|
||||
rb := mockstategen.NewMockReplayerBuilder()
|
||||
rb.SetMockStateForSlot(ps, 23)
|
||||
currentIndices := make([]byte, 64)
|
||||
currentIndices[0] = 1
|
||||
currentIndices[1] = 2
|
||||
require.NoError(t, ps.SetPreviousParticipationBits(currentIndices))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mockChain.ChainService{
|
||||
State: hs,
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
ReplayerBuilder: rb,
|
||||
}
|
||||
request := ðpb.DoppelGangerRequest{
|
||||
ValidatorRequests: make([]*ethpb.DoppelGangerRequest_ValidatorRequest, 0),
|
||||
}
|
||||
response := ðpb.DoppelGangerResponse{Responses: make([]*ethpb.DoppelGangerResponse_ValidatorResponse, 0)}
|
||||
for i := 0; i < 15; i++ {
|
||||
request.ValidatorRequests = append(request.ValidatorRequests, ðpb.DoppelGangerRequest_ValidatorRequest{
|
||||
PublicKey: keys[i].PublicKey().Marshal(),
|
||||
Epoch: 1,
|
||||
SignedRoot: []byte{'A'},
|
||||
})
|
||||
response.Responses = append(response.Responses, ðpb.DoppelGangerResponse_ValidatorResponse{
|
||||
PublicKey: keys[i].PublicKey().Marshal(),
|
||||
DuplicateExists: false,
|
||||
})
|
||||
}
|
||||
|
||||
return vs, request, response
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exit early for Phase 0",
|
||||
wantErr: false,
|
||||
|
||||
@@ -112,7 +112,7 @@ func (s *State) LoadBlocks(ctx context.Context, startSlot, endSlot types.Slot, e
|
||||
}
|
||||
|
||||
if blockRoots[length-1] != endBlockRoot {
|
||||
return nil, errors.New("end block roots don't match")
|
||||
return nil, errors.Errorf("end block roots don't match: got %#x but wanted %#x for %d to %d at length %d", blockRoots[length-1], endBlockRoot, startSlot, endSlot, length)
|
||||
}
|
||||
|
||||
filteredBlocks := []block.SignedBeaconBlock{blocks[length-1]}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/async"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/config/params"
|
||||
@@ -21,7 +20,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
"github.com/prysmaticlabs/prysm/runtime/version"
|
||||
prysmTime "github.com/prysmaticlabs/prysm/time"
|
||||
"github.com/prysmaticlabs/prysm/validator/client/iface"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
@@ -46,10 +44,6 @@ func (v *validator) ProposeBlock(ctx context.Context, slot types.Slot, pubKey [f
|
||||
ctx, span := trace.StartSpan(ctx, "validator.ProposeBlock")
|
||||
defer span.End()
|
||||
|
||||
lock := async.NewMultilock(fmt.Sprint(iface.RoleProposer), string(pubKey[:]))
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
fmtKey := fmt.Sprintf("%#x", pubKey[:])
|
||||
span.AddAttributes(trace.StringAttribute("validator", fmtKey))
|
||||
log := log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:])))
|
||||
@@ -131,6 +125,34 @@ func (v *validator) ProposeBlock(ctx context.Context, slot types.Slot, pubKey [f
|
||||
}
|
||||
return
|
||||
}
|
||||
hasProposerNextSlot := false
|
||||
hasProposerPrevSlot := false
|
||||
hasProposerPrevPrevSlot := false
|
||||
for _, d := range v.duties.Duties {
|
||||
if slot < 6 {
|
||||
continue
|
||||
}
|
||||
if len(d.ProposerSlots) > 0 && d.ProposerSlots[0] == slot+1 {
|
||||
hasProposerNextSlot = true
|
||||
}
|
||||
if len(d.ProposerSlots) > 0 && d.ProposerSlots[0] == slot-1 {
|
||||
hasProposerPrevSlot = true
|
||||
}
|
||||
if len(d.ProposerSlots) > 0 && d.ProposerSlots[0] == slot-2 {
|
||||
hasProposerPrevPrevSlot = true
|
||||
}
|
||||
}
|
||||
if hasProposerNextSlot && !hasProposerPrevSlot {
|
||||
log.Infof("waiting for slot %d", slot+1)
|
||||
<-v.propChan
|
||||
log.Infof("received from slot %d", slot+1)
|
||||
}
|
||||
if hasProposerPrevSlot && !hasProposerPrevPrevSlot {
|
||||
log.Infof("sending to slot %d", slot-1)
|
||||
v.propChan <- true
|
||||
log.Infof("sent to slot %d", slot-1)
|
||||
}
|
||||
ctx = context.Background()
|
||||
blkResp, err := v.validatorClient.ProposeBeaconBlock(ctx, proposal)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to propose block")
|
||||
|
||||
@@ -207,6 +207,7 @@ func (v *ValidatorService) Start() {
|
||||
Web3SignerConfig: v.Web3SignerConfig,
|
||||
feeRecipientConfig: v.feeRecipientConfig,
|
||||
walletIntializedChannel: make(chan *wallet.Wallet, 1),
|
||||
propChan: make(chan bool, 1),
|
||||
}
|
||||
// To resolve a race condition at startup due to the interface
|
||||
// nature of the abstracted block type. We initialize
|
||||
|
||||
@@ -80,6 +80,7 @@ type validator struct {
|
||||
domainDataCache *ristretto.Cache
|
||||
highestValidSlot types.Slot
|
||||
genesisTime uint64
|
||||
propChan chan bool
|
||||
blockFeed *event.Feed
|
||||
interopKeysConfig *local.InteropKeymanagerConfig
|
||||
wallet *wallet.Wallet
|
||||
|
||||
Reference in New Issue
Block a user