From 4cbb69602f35c11611be446b1b0a06bd3c3c7e8e Mon Sep 17 00:00:00 2001 From: Potuz Date: Sat, 28 May 2022 08:56:58 -0300 Subject: [PATCH] Clean up onBlockBatch and prune forkchoice on init sync (#10768) Co-authored-by: terencechain --- beacon-chain/blockchain/process_block.go | 48 +++++++++---------- beacon-chain/blockchain/process_block_test.go | 12 ++--- beacon-chain/blockchain/receive_block.go | 3 +- 3 files changed, 30 insertions(+), 33 deletions(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 20527ad41c..580ce6e7a6 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -335,33 +335,33 @@ func getStateVersionAndPayload(st state.BeaconState) (int, *ethpb.ExecutionPaylo } func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeaconBlock, - blockRoots [][32]byte) ([]*ethpb.Checkpoint, []*ethpb.Checkpoint, error) { + blockRoots [][32]byte) error { ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch") defer span.End() if len(blks) == 0 || len(blockRoots) == 0 { - return nil, nil, errors.New("no blocks provided") + return errors.New("no blocks provided") } if len(blks) != len(blockRoots) { - return nil, nil, errWrongBlockCount + return errWrongBlockCount } if err := wrapper.BeaconBlockIsNil(blks[0]); err != nil { - return nil, nil, invalidBlock{err} + return invalidBlock{err} } b := blks[0].Block() // Retrieve incoming block's pre state. if err := s.verifyBlkPreState(ctx, b); err != nil { - return nil, nil, err + return err } preState, err := s.cfg.StateGen.StateByRootInitialSync(ctx, bytesutil.ToBytes32(b.ParentRoot())) if err != nil { - return nil, nil, err + return err } if preState == nil || preState.IsNil() { - return nil, nil, fmt.Errorf("nil pre state for slot %d", b.Slot()) + return fmt.Errorf("nil pre state for slot %d", b.Slot()) } jCheckpoints := make([]*ethpb.Checkpoint, len(blks)) @@ -382,7 +382,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac for i, b := range blks { v, h, err := getStateVersionAndPayload(preState) if err != nil { - return nil, nil, err + return err } preVersionAndHeaders[i] = &versionAndHeader{ version: v, @@ -391,7 +391,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac set, preState, err = transition.ExecuteStateTransitionNoVerifyAnySig(ctx, preState, b) if err != nil { - return nil, nil, invalidBlock{err} + return invalidBlock{err} } // Save potential boundary states. if slots.IsEpochStart(preState.Slot()) { @@ -402,7 +402,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac v, h, err = getStateVersionAndPayload(preState) if err != nil { - return nil, nil, err + return err } postVersionAndHeaders[i] = &versionAndHeader{ version: v, @@ -412,10 +412,10 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac } verify, err := sigSet.Verify() if err != nil { - return nil, nil, invalidBlock{err} + return invalidBlock{err} } if !verify { - return nil, nil, errors.New("batch block signature verification failed") + return errors.New("batch block signature verification failed") } // blocks have been verified, add them to forkchoice and call the engine @@ -424,40 +424,40 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac postVersionAndHeaders[i].version, postVersionAndHeaders[i].header, b) if err != nil { - return nil, nil, err + return err } if isValidPayload { if err := s.validateMergeTransitionBlock(ctx, preVersionAndHeaders[i].version, preVersionAndHeaders[i].header, b); err != nil { - return nil, nil, err + return err } } if err := s.insertBlockToForkChoiceStore(ctx, b.Block(), blockRoots[i], fCheckpoints[i], jCheckpoints[i]); err != nil { - return nil, nil, err + return err } if isValidPayload { if err := s.cfg.ForkChoiceStore.SetOptimisticToValid(ctx, blockRoots[i]); err != nil { - return nil, nil, errors.Wrap(err, "could not set optimistic block to valid") + return errors.Wrap(err, "could not set optimistic block to valid") } } s.saveInitSyncBlock(blockRoots[i], b) if err = s.handleBlockAfterBatchVerify(ctx, b, blockRoots[i], fCheckpoints[i], jCheckpoints[i]); err != nil { tracing.AnnotateError(span, err) - return nil, nil, err + return err } } for r, st := range boundaries { if err := s.cfg.StateGen.SaveState(ctx, r, st); err != nil { - return nil, nil, err + return err } } // Also saves the last post state which to be used as pre state for the next batch. lastB := blks[len(blks)-1] lastBR := blockRoots[len(blockRoots)-1] if err := s.cfg.StateGen.SaveState(ctx, lastBR, preState); err != nil { - return nil, nil, err + return err } arg := ¬ifyForkchoiceUpdateArg{ headState: preState, @@ -465,12 +465,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac headBlock: lastB.Block(), } if _, err := s.notifyForkchoiceUpdate(ctx, arg); err != nil { - return nil, nil, err + return err } - if err := s.saveHeadNoDB(ctx, lastB, lastBR, preState); err != nil { - return nil, nil, err - } - return fCheckpoints, jCheckpoints, nil + return s.saveHeadNoDB(ctx, lastB, lastBR, preState) } // handles a block after the block's batch has been verified, where we can save blocks @@ -521,6 +518,9 @@ func (s *Service) handleBlockAfterBatchVerify(ctx context.Context, signed interf return err } s.store.SetFinalizedCheckptAndPayloadHash(fCheckpoint, h) + if err := s.cfg.ForkChoiceStore.Prune(ctx, bytesutil.ToBytes32(fCheckpoint.Root)); err != nil { + return errors.Wrap(err, "could not prune proto array fork choice nodes") + } } return nil } diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 195bfa1aaf..48772093af 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -331,9 +331,9 @@ func TestStore_OnBlockBatch_ProtoArray(t *testing.T) { rBlock.Block.ParentRoot = gRoot[:] require.NoError(t, beaconDB.SaveBlock(context.Background(), blks[0])) require.NoError(t, service.cfg.StateGen.SaveState(ctx, blkRoots[0], firstState)) - _, _, err = service.onBlockBatch(ctx, blks, blkRoots[1:]) + err = service.onBlockBatch(ctx, blks, blkRoots[1:]) require.ErrorIs(t, errWrongBlockCount, err) - _, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) + err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) require.NoError(t, err) } @@ -396,9 +396,9 @@ func TestStore_OnBlockBatch_DoublyLinkedTree(t *testing.T) { rBlock.Block.ParentRoot = gRoot[:] require.NoError(t, beaconDB.SaveBlock(context.Background(), blks[0])) require.NoError(t, service.cfg.StateGen.SaveState(ctx, blkRoots[0], firstState)) - _, _, err = service.onBlockBatch(ctx, blks, blkRoots[1:]) + err = service.onBlockBatch(ctx, blks, blkRoots[1:]) require.ErrorIs(t, errWrongBlockCount, err) - _, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) + err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) require.NoError(t, err) } @@ -452,10 +452,8 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) { rBlock.Block.ParentRoot = gRoot[:] require.NoError(t, beaconDB.SaveBlock(context.Background(), blks[0])) require.NoError(t, service.cfg.StateGen.SaveState(ctx, blkRoots[0], firstState)) - cp1, cp2, err := service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) + err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) require.NoError(t, err) - require.Equal(t, blkCount-1, len(cp1)) - require.Equal(t, blkCount-1, len(cp2)) } func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) { diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 384b8ac3f0..dc73a693f3 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -89,8 +89,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Sig defer span.End() // Apply state transition on the incoming newly received block batches, one by one. - _, _, err := s.onBlockBatch(ctx, blocks, blkRoots) - if err != nil { + if err := s.onBlockBatch(ctx, blocks, blkRoots); err != nil { err := errors.Wrap(err, "could not process block in batch") tracing.AnnotateError(span, err) return err