mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
Sort Blocks Before Returning them To Peer (#5894)
* add tests and check * lint * Merge refs/heads/master into sortBlocks
This commit is contained in:
@@ -26,6 +26,7 @@ go_library(
|
||||
"subscriber_beacon_blocks.go",
|
||||
"subscriber_committee_index_beacon_attestation.go",
|
||||
"subscriber_handlers.go",
|
||||
"utils.go",
|
||||
"validate_aggregate_proof.go",
|
||||
"validate_attester_slashing.go",
|
||||
"validate_beacon_blocks.go",
|
||||
@@ -112,6 +113,7 @@ go_test(
|
||||
"subscriber_committee_index_beacon_attestation_test.go",
|
||||
"subscriber_test.go",
|
||||
"sync_test.go",
|
||||
"utils_test.go",
|
||||
"validate_aggregate_proof_test.go",
|
||||
"validate_attester_slashing_test.go",
|
||||
"validate_beacon_blocks_test.go",
|
||||
|
||||
@@ -144,6 +144,7 @@ func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
|
||||
blks = append([]*ethpb.SignedBeaconBlock{genBlock}, blks...)
|
||||
roots = append([][32]byte{genRoot}, roots...)
|
||||
}
|
||||
blks, roots = r.sortBlocksAndRoots(blks, roots)
|
||||
checkpoint, err := r.db.FinalizedCheckpoint(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to retrieve finalized checkpoint")
|
||||
|
||||
@@ -83,6 +83,66 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) {
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
if len(p1.Host.Network().Peers()) != 1 {
|
||||
t.Error("Expected peers to be connected")
|
||||
}
|
||||
d := db.SetupDB(t)
|
||||
|
||||
req := &pb.BeaconBlocksByRangeRequest{
|
||||
StartSlot: 200,
|
||||
Step: 21,
|
||||
Count: 33,
|
||||
}
|
||||
|
||||
endSlot := req.StartSlot + (req.Step * (req.Count - 1))
|
||||
// Populate the database with blocks that would match the request.
|
||||
for i := endSlot; i >= req.StartSlot; i -= req.Step {
|
||||
if err := d.SaveBlock(context.Background(), ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: i}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(0.000001, int64(req.Count*10), false)}
|
||||
pcl := protocol.ID("/testing")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
prevSlot := uint64(0)
|
||||
for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step {
|
||||
expectSuccess(t, r, stream)
|
||||
res := ðpb.SignedBeaconBlock{}
|
||||
if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if res.Block.Slot < prevSlot {
|
||||
t.Errorf("Received block is unsorted with slot %d lower than previous slot %d", res.Block.Slot, prevSlot)
|
||||
}
|
||||
prevSlot = res.Block.Slot
|
||||
}
|
||||
})
|
||||
|
||||
stream1, err := p1.Host.NewStream(context.Background(), p2.Host.ID(), pcl)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = r.beaconBlocksByRangeRPCHandler(context.Background(), req, stream1)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Fatal("Did not receive stream within 1 sec")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) {
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
|
||||
39
beacon-chain/sync/utils.go
Normal file
39
beacon-chain/sync/utils.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
)
|
||||
|
||||
// A type to represent beacon blocks and roots which have methods
|
||||
// which satisfy the Interface in `Sort` so that this type can
|
||||
// be sorted in ascending order.
|
||||
type sortedObj struct {
|
||||
blks []*ethpb.SignedBeaconBlock
|
||||
roots [][32]byte
|
||||
}
|
||||
|
||||
func (s sortedObj) Less(i, j int) bool {
|
||||
return s.blks[i].Block.Slot < s.blks[j].Block.Slot
|
||||
}
|
||||
|
||||
func (s sortedObj) Swap(i, j int) {
|
||||
s.blks[i], s.blks[j] = s.blks[j], s.blks[i]
|
||||
s.roots[i], s.roots[j] = s.roots[j], s.roots[i]
|
||||
}
|
||||
|
||||
func (s sortedObj) Len() int {
|
||||
return len(s.blks)
|
||||
}
|
||||
|
||||
// sort the provided blocks and roots in ascending order. This method assumes that the size of
|
||||
// block slice and root slice is equal.
|
||||
func (r *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) {
|
||||
obj := sortedObj{
|
||||
blks: blks,
|
||||
roots: roots,
|
||||
}
|
||||
sort.Sort(obj)
|
||||
return obj.blks, obj.roots
|
||||
}
|
||||
42
beacon-chain/sync/utils_test.go
Normal file
42
beacon-chain/sync/utils_test.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
)
|
||||
|
||||
func TestSortedObj_SortBlocksRoots(t *testing.T) {
|
||||
source := rand.NewSource(33)
|
||||
randGen := rand.New(source)
|
||||
blks := []*ethpb.SignedBeaconBlock{}
|
||||
roots := [][32]byte{}
|
||||
randFunc := func() int64 {
|
||||
return randGen.Int63n(50)
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
slot := uint64(randFunc())
|
||||
newBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot}}
|
||||
blks = append(blks, newBlk)
|
||||
root := bytesutil.ToBytes32(bytesutil.Bytes32(slot))
|
||||
roots = append(roots, root)
|
||||
}
|
||||
|
||||
r := &Service{}
|
||||
|
||||
newBlks, newRoots := r.sortBlocksAndRoots(blks, roots)
|
||||
|
||||
previousSlot := uint64(0)
|
||||
for i, b := range newBlks {
|
||||
if b.Block.Slot < previousSlot {
|
||||
t.Errorf("Block list is not sorted as %d is smaller than previousSlot %d", b.Block.Slot, previousSlot)
|
||||
}
|
||||
if bytesutil.FromBytes8(newRoots[i][:]) != b.Block.Slot {
|
||||
t.Errorf("root doesn't match stored slot in block: wanted %d but got %d", b.Block.Slot, bytesutil.FromBytes8(newRoots[i][:]))
|
||||
}
|
||||
previousSlot = b.Block.Slot
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user