Initsync skip local blobs (#13827)

* wip - init-sync skip available blob req

* satisfy deep source

* gaz

* don't need to sort blobs; simplify blobRequest stack

* wip debug log to watch blob skip behavior

* unit tests for new blob req generator

* refactor to reduce blob req func count

* log when WaitForSummarizer fails

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2024-04-05 14:09:43 -05:00
committed by GitHub
parent be1bfcce63
commit 04f231a400
12 changed files with 460 additions and 254 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"testing"
@@ -14,13 +13,14 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2pt "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
@@ -960,28 +960,7 @@ func TestTimeToWait(t *testing.T) {
}
}
func TestSortBlobs(t *testing.T) {
_, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
shuffled := make([]blocks.ROBlob, len(blobs))
for i := range blobs {
shuffled[i] = blobs[i]
}
rand.Shuffle(len(shuffled), func(i, j int) {
shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
})
sorted := sortBlobs(shuffled)
require.Equal(t, len(sorted), len(shuffled))
for i := range blobs {
expect := blobs[i]
actual := sorted[i]
require.Equal(t, expect.Slot(), actual.Slot())
require.Equal(t, expect.Index, actual.Index)
require.Equal(t, bytesutil.ToBytes48(expect.KzgCommitment), bytesutil.ToBytes48(actual.KzgCommitment))
require.Equal(t, expect.BlockRoot(), actual.BlockRoot())
}
}
func TestLowestSlotNeedsBlob(t *testing.T) {
func TestBlobRangeForBlocks(t *testing.T) {
blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
for i := range blks {
@@ -990,12 +969,12 @@ func TestLowestSlotNeedsBlob(t *testing.T) {
retentionStart := primitives.Slot(5)
bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs)
require.NoError(t, err)
lowest := lowestSlotNeedsBlob(retentionStart, bwb)
require.Equal(t, retentionStart, *lowest)
bounds := countCommitments(bwb, retentionStart).blobRange(nil)
require.Equal(t, retentionStart, bounds.low)
higher := primitives.Slot(len(blks) + 1)
lowest = lowestSlotNeedsBlob(higher, bwb)
var nilSlot *primitives.Slot
require.Equal(t, nilSlot, lowest)
bounds = countCommitments(bwb, higher).blobRange(nil)
var nilBounds *blobRange
require.Equal(t, nilBounds, bounds)
blks, _ = util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs = make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
@@ -1008,14 +987,14 @@ func TestLowestSlotNeedsBlob(t *testing.T) {
next := bwb[6].Block.Block().Slot()
skip := bwb[5].Block.Block()
bwb[5].Block, _ = util.GenerateTestDenebBlockWithSidecar(t, skip.ParentRoot(), skip.Slot(), 0)
lowest = lowestSlotNeedsBlob(retentionStart, bwb)
require.Equal(t, next, *lowest)
bounds = countCommitments(bwb, retentionStart).blobRange(nil)
require.Equal(t, next, bounds.low)
}
func TestBlobRequest(t *testing.T) {
var nilReq *ethpb.BlobSidecarsByRangeRequest
// no blocks
req := blobRequest([]blocks.BlockWithROBlobs{}, 0)
req := countCommitments([]blocks.BlockWithROBlobs{}, 0).blobRange(nil).Request()
require.Equal(t, nilReq, req)
blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
@@ -1027,26 +1006,180 @@ func TestBlobRequest(t *testing.T) {
maxBlkSlot := primitives.Slot(len(blks) - 1)
tooHigh := primitives.Slot(len(blks) + 1)
req = blobRequest(bwb, tooHigh)
req = countCommitments(bwb, tooHigh).blobRange(nil).Request()
require.Equal(t, nilReq, req)
req = blobRequest(bwb, maxBlkSlot)
req = countCommitments(bwb, maxBlkSlot).blobRange(nil).Request()
require.Equal(t, uint64(1), req.Count)
require.Equal(t, maxBlkSlot, req.StartSlot)
halfway := primitives.Slot(5)
req = blobRequest(bwb, halfway)
req = countCommitments(bwb, halfway).blobRange(nil).Request()
require.Equal(t, halfway, req.StartSlot)
// adding 1 to include the halfway slot itself
require.Equal(t, uint64(1+maxBlkSlot-halfway), req.Count)
before := bwb[0].Block.Block().Slot()
allAfter := bwb[1:]
req = blobRequest(allAfter, before)
req = countCommitments(allAfter, before).blobRange(nil).Request()
require.Equal(t, allAfter[0].Block.Block().Slot(), req.StartSlot)
require.Equal(t, len(allAfter), int(req.Count))
}
func TestCountCommitments(t *testing.T) {
// no blocks
// blocks before retention start filtered
// blocks without commitments filtered
// pre-deneb filtered
// variety of commitment counts are accurate, from 1 to max
type testcase struct {
name string
bwb func(t *testing.T, c testcase) []blocks.BlockWithROBlobs
numBlocks int
retStart primitives.Slot
resCount int
}
cases := []testcase{
{
name: "nil blocks is safe",
bwb: func(t *testing.T, c testcase) []blocks.BlockWithROBlobs {
return nil
},
retStart: 0,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
bwb := c.bwb(t, c)
cc := countCommitments(bwb, c.retStart)
require.Equal(t, c.resCount, len(cc))
})
}
}
func TestCommitmentCountList(t *testing.T) {
cases := []struct {
name string
cc commitmentCountList
bss func(*testing.T) filesystem.BlobStorageSummarizer
expected *blobRange
request *ethpb.BlobSidecarsByRangeRequest
}{
{
name: "nil commitmentCount is safe",
cc: nil,
expected: nil,
request: nil,
},
{
name: "nil bss, single slot",
cc: []commitmentCount{
{slot: 11235, count: 1},
},
expected: &blobRange{low: 11235, high: 11235},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 11235, Count: 1},
},
{
name: "nil bss, sparse slots",
cc: []commitmentCount{
{slot: 11235, count: 1},
{slot: 11240, count: fieldparams.MaxBlobsPerBlock},
{slot: 11250, count: 3},
},
expected: &blobRange{low: 11235, high: 11250},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 11235, Count: 16},
},
{
name: "AllAvailable in middle, some avail low, none high",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("1")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 3, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("1"))},
{slot: 15, count: 3},
},
expected: &blobRange{low: 0, high: 15},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 0, Count: 16},
},
{
name: "AllAvailable at high and low",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: 3},
{slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))},
},
expected: &blobRange{low: 5, high: 5},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 1},
},
{
name: "AllAvailable at high and low, adjacent range in middle",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: 3},
{slot: 6, count: 3},
{slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))},
},
expected: &blobRange{low: 5, high: 6},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 2},
},
{
name: "AllAvailable at high and low, range in middle",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("1")): {0, 1},
bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: 3, root: bytesutil.ToBytes32([]byte("1"))},
{slot: 10, count: 3},
{slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))},
},
expected: &blobRange{low: 5, high: 10},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 6},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var bss filesystem.BlobStorageSummarizer
if c.bss != nil {
bss = c.bss(t)
}
br := c.cc.blobRange(bss)
require.DeepEqual(t, c.expected, br)
if c.request == nil {
require.IsNil(t, br.Request())
} else {
req := br.Request()
require.DeepEqual(t, req.StartSlot, c.request.StartSlot)
require.DeepEqual(t, req.Count, c.request.Count)
}
})
}
}
func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROBlobs, []blocks.ROBlob) {
blks, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, nblocks)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
@@ -1058,91 +1191,75 @@ func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROB
return bwb, blobs
}
func testReqFromResp(bwb []blocks.BlockWithROBlobs) *ethpb.BlobSidecarsByRangeRequest {
return &ethpb.BlobSidecarsByRangeRequest{
StartSlot: bwb[0].Block.Block().Slot(),
Count: uint64(bwb[len(bwb)-1].Block.Block().Slot()-bwb[0].Block.Block().Slot()) + 1,
}
}
func TestVerifyAndPopulateBlobs(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
lastBlobIdx := len(blobs) - 1
// Blocks are all before the retention window, blobs argument is ignored.
windowAfter := bwb[len(bwb)-1].Block.Block().Slot() + 1
_, err := verifyAndPopulateBlobs(bwb, nil, windowAfter)
require.NoError(t, err)
t.Run("happy path", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
firstBlockSlot := bwb[0].Block.Block().Slot()
// slice off blobs for the last block so we hit the out of bounds / blob exhaustion check.
_, err = verifyAndPopulateBlobs(bwb, blobs[0:len(blobs)-6], firstBlockSlot)
require.ErrorIs(t, err, errMissingBlobsForBlockCommitments)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
// Misalign the slots of the blobs for the first block to simulate them being missing from the response.
offByOne := blobs[0].Slot()
for i := range blobs {
if blobs[i].Slot() == offByOne {
blobs[i].SignedBlockHeader.Header.Slot = offByOne + 1
expectedCommits := make(map[[48]byte]bool)
for _, bl := range blobs {
expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true
}
}
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
require.Equal(t, len(blobs), len(expectedCommits))
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx], err = blocks.NewROBlobWithRoot(blobs[lastBlobIdx].BlobSidecar, blobs[0].BlockRoot())
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].Index = 100
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrIncorrectBlobIndex)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].SignedBlockHeader.Header.ProposerIndex = 100
blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar)
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].SignedBlockHeader.Header.ParentRoot = blobs[0].SignedBlockHeader.Header.ParentRoot
blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar)
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
var emptyKzg [48]byte
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].KzgCommitment = emptyKzg[:]
blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar)
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrMismatchedBlobCommitments)
// happy path
bwb, blobs = testSequenceBlockWithBlob(t, 10)
expectedCommits := make(map[[48]byte]bool)
for _, bl := range blobs {
expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true
}
// The assertions using this map expect all commitments to be unique, so make sure that stays true.
require.Equal(t, len(blobs), len(expectedCommits))
bwb, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.NoError(t, err)
for _, bw := range bwb {
commits, err := bw.Block.Block().Body().BlobKzgCommitments()
bwb, err := verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), nil)
require.NoError(t, err)
require.Equal(t, len(commits), len(bw.Blobs))
for i := range commits {
bc := bytesutil.ToBytes48(commits[i])
require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment))
// Since we delete entries we've seen, duplicates will cause an error here.
_, ok := expectedCommits[bc]
// Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them.
require.Equal(t, true, ok)
delete(expectedCommits, bc)
for _, bw := range bwb {
commits, err := bw.Block.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
require.Equal(t, len(commits), len(bw.Blobs))
for i := range commits {
bc := bytesutil.ToBytes48(commits[i])
require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment))
// Since we delete entries we've seen, duplicates will cause an error here.
_, ok := expectedCommits[bc]
// Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them.
require.Equal(t, true, ok)
delete(expectedCommits, bc)
}
}
}
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
})
t.Run("missing blobs", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
_, err := verifyAndPopulateBlobs(bwb, blobs[1:], testReqFromResp(bwb), nil)
require.ErrorIs(t, err, errMissingBlobsForBlockCommitments)
})
t.Run("no blobs for last block", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
lastIdx := len(bwb) - 1
lastBlk := bwb[lastIdx].Block
cmts, err := lastBlk.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
blobs = blobs[0 : len(blobs)-len(cmts)]
lastBlk, _ = util.GenerateTestDenebBlockWithSidecar(t, lastBlk.Block().ParentRoot(), lastBlk.Block().Slot(), 0)
bwb[lastIdx].Block = lastBlk
_, err = verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), nil)
require.NoError(t, err)
})
t.Run("blobs not copied if all locally available", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
// r1 only has some blobs locally available, so we'll still copy them all.
// r7 has all blobs locally available, so we shouldn't copy them.
i1, i7 := 1, 7
r1, r7 := bwb[i1].Block.Root(), bwb[i7].Block.Root()
onDisk := map[[32]byte][]int{
r1: {0, 1},
r7: {0, 1, 2, 3, 4, 5},
}
bss := filesystem.NewMockBlobStorageSummarizer(t, onDisk)
bwb, err := verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), bss)
require.NoError(t, err)
require.Equal(t, 6, len(bwb[i1].Blobs))
require.Equal(t, 0, len(bwb[i7].Blobs))
})
}
func TestBatchLimit(t *testing.T) {