mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Native Blocks Ep. 2 - Switch usages to new package (#10885)
* panic in SizeSSZ
* moving slowly
* adapt old code to new interfaces
* return interfaces from factory functions
* replace the rest of WrappedSignedBeaconBlock
* WrappedBeaconBlock
* WrappedBeaconBlockBody
* miscellaneous
* Test_BeaconBlockIsNil
* replace usages of BeaconBlockIsNil
* replace usages of mutator
* fix all build errors
* fix some more issues
* mutator changes
* relax assertions when initializing
* revert changes in object_mapping.go
* allow calling Proto on nil
* Revert "allow calling Proto on nil"
This reverts commit ecc84e4553.
* modify Copy and Proto methods
* remove unused var
* fix block batch tests
* correct BUILD file
* Error when initializing nil objects
* one more error fix
* add missing comma
* rename alias to blocktest
* add logging
* error when SignedBeaconBlock is nil
* fix last test
* import fix
* broken
* working
* test fixes
* reduce complexity of processPendingBlocks
* simplified
This commit is contained in:
@@ -12,9 +12,9 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/config/params"
|
||||
"github.com/prysmaticlabs/prysm/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/consensus-types/interfaces"
|
||||
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/consensus-types/wrapper"
|
||||
"github.com/prysmaticlabs/prysm/crypto/rand"
|
||||
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/encoding/ssz/equality"
|
||||
@@ -99,11 +99,12 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
span.End()
|
||||
return err
|
||||
}
|
||||
|
||||
inDB := s.cfg.beaconDB.HasBlock(ctx, blkRoot)
|
||||
// No need to process the same block twice.
|
||||
if inDB {
|
||||
s.pendingQueueLock.Lock()
|
||||
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
||||
if err = s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
||||
s.pendingQueueLock.Unlock()
|
||||
return err
|
||||
}
|
||||
@@ -116,22 +117,11 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block().ParentRoot())]
|
||||
s.pendingQueueLock.RUnlock()
|
||||
|
||||
parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block().ParentRoot()))
|
||||
blockIsBad := s.hasBadBlock(blkRoot)
|
||||
// Check if parent is a bad block.
|
||||
if parentIsBad || blockIsBad {
|
||||
// Set block as bad if its parent block is bad too.
|
||||
if parentIsBad {
|
||||
s.setBadBlock(ctx, blkRoot)
|
||||
}
|
||||
// Remove block from queue.
|
||||
s.pendingQueueLock.Lock()
|
||||
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
||||
s.pendingQueueLock.Unlock()
|
||||
return err
|
||||
}
|
||||
s.pendingQueueLock.Unlock()
|
||||
span.End()
|
||||
keepProcessing, err := s.checkIfBlockIsBad(ctx, span, slot, b, blkRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !keepProcessing {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -188,8 +178,13 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
s.setSeenBlockIndexSlot(b.Block().Slot(), b.Block().ProposerIndex())
|
||||
|
||||
// Broadcasting the block again once a node is able to process it.
|
||||
if err := s.cfg.p2p.Broadcast(ctx, b.Proto()); err != nil {
|
||||
log.WithError(err).Debug("Could not broadcast block")
|
||||
pb, err := b.Proto()
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not get protobuf block")
|
||||
} else {
|
||||
if err := s.cfg.p2p.Broadcast(ctx, pb); err != nil {
|
||||
log.WithError(err).Debug("Could not broadcast block")
|
||||
}
|
||||
}
|
||||
|
||||
s.pendingQueueLock.Lock()
|
||||
@@ -211,6 +206,35 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
|
||||
}
|
||||
|
||||
func (s *Service) checkIfBlockIsBad(
|
||||
ctx context.Context,
|
||||
span *trace.Span,
|
||||
slot types.Slot,
|
||||
b interfaces.SignedBeaconBlock,
|
||||
blkRoot [32]byte,
|
||||
) (keepProcessing bool, err error) {
|
||||
parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block().ParentRoot()))
|
||||
blockIsBad := s.hasBadBlock(blkRoot)
|
||||
// Check if parent is a bad block.
|
||||
if parentIsBad || blockIsBad {
|
||||
// Set block as bad if its parent block is bad too.
|
||||
if parentIsBad {
|
||||
s.setBadBlock(ctx, blkRoot)
|
||||
}
|
||||
// Remove block from queue.
|
||||
s.pendingQueueLock.Lock()
|
||||
if err = s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
||||
s.pendingQueueLock.Unlock()
|
||||
return false, err
|
||||
}
|
||||
s.pendingQueueLock.Unlock()
|
||||
span.End()
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, randGen *rand.Rand) error {
|
||||
ctx, span := trace.StartSpan(ctx, "sendBatchRootRequest")
|
||||
defer span.End()
|
||||
@@ -337,13 +361,21 @@ func (s *Service) deleteBlockFromPendingQueue(slot types.Slot, b interfaces.Sign
|
||||
}
|
||||
|
||||
// Defensive check to ignore nil blocks
|
||||
if err := wrapper.BeaconBlockIsNil(b); err != nil {
|
||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newBlks := make([]interfaces.SignedBeaconBlock, 0, len(blks))
|
||||
for _, blk := range blks {
|
||||
if equality.DeepEqual(blk.Proto(), b.Proto()) {
|
||||
blkPb, err := blk.Proto()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bPb, err := b.Proto()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if equality.DeepEqual(blkPb, bPb) {
|
||||
continue
|
||||
}
|
||||
newBlks = append(newBlks, blk)
|
||||
@@ -396,7 +428,7 @@ func (s *Service) pendingBlocksInCache(slot types.Slot) []interfaces.SignedBeaco
|
||||
|
||||
// This adds input signed beacon block to slotToPendingBlocks cache.
|
||||
func (s *Service) addPendingBlockToCache(b interfaces.SignedBeaconBlock) error {
|
||||
if err := wrapper.BeaconBlockIsNil(b); err != nil {
|
||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user