Implement SendDataColumnSidecarsByRangeRequest and SendDataColumnSidecarsByRootRequest. (#15430)

* Implement `SendDataColumnSidecarsByRangeRequest` and `SendDataColumnSidecarsByRootRequest`.

* Fix James's comment.

* Fix James's comment.

* Fix James' comment.

* Fix marshaller.
This commit is contained in:
Manu NALEPA
2025-06-24 23:40:16 +02:00
committed by GitHub
parent e8625cd89d
commit 121914d0d7
4 changed files with 807 additions and 21 deletions

View File

@@ -11,7 +11,9 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
p2pTypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
@@ -885,6 +887,544 @@ func TestErrInvalidFetchedDataDistinction(t *testing.T) {
require.Equal(t, false, errors.Is(ErrInvalidFetchedData, verification.ErrBlobInvalid))
}
func TestSendDataColumnSidecarsByRangeRequest(t *testing.T) {
nilTestCases := []struct {
name string
request *ethpb.DataColumnSidecarsByRangeRequest
}{
{
name: "nil request",
request: nil,
},
{
name: "count is 0",
request: &ethpb.DataColumnSidecarsByRangeRequest{},
},
{
name: "columns is nil",
request: &ethpb.DataColumnSidecarsByRangeRequest{Count: 1},
},
}
for _, tc := range nilTestCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := SendDataColumnSidecarsByRangeRequest(t.Context(), nil, nil, "aRandomPID", nil, tc.request)
require.NoError(t, err)
require.IsNil(t, actual)
})
}
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 0
params.OverrideBeaconConfig(beaconConfig)
request := &ethpb.DataColumnSidecarsByRangeRequest{Count: 1, Columns: []uint64{1, 2, 3}}
_, err := SendDataColumnSidecarsByRangeRequest(t.Context(), nil, nil, "aRandomPID", nil, request)
require.ErrorContains(t, errMaxRequestDataColumnSidecarsExceeded.Error(), err)
})
type slotIndex struct {
Slot primitives.Slot
Index uint64
}
createSidecar := func(slotIndex slotIndex) *ethpb.DataColumnSidecar {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
return &ethpb.DataColumnSidecar{
Index: slotIndex.Index,
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: slotIndex.Slot,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
}
testCases := []struct {
name string
slotIndices []slotIndex
expectedError error
}{
{
name: "too many responses",
slotIndices: []slotIndex{
{Slot: 0, Index: 1},
{Slot: 0, Index: 2},
{Slot: 0, Index: 3},
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 0, Index: 3}, // Duplicate
},
expectedError: errMaxResponseDataColumnSidecarsExceeded,
},
{
name: "perfect match",
slotIndices: []slotIndex{
{Slot: 0, Index: 1},
{Slot: 0, Index: 2},
{Slot: 0, Index: 3},
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
},
},
{
name: "few responses than maximum possible",
slotIndices: []slotIndex{
{Slot: 0, Index: 1},
{Slot: 0, Index: 2},
{Slot: 0, Index: 3},
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
protocol := fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRangeTopicV1)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
p1.Connect(p2)
expected := make([]*ethpb.DataColumnSidecar, 0, len(tc.slotIndices))
for _, slotIndex := range tc.slotIndices {
sidecar := createSidecar(slotIndex)
expected = append(expected, sidecar)
}
requestSent := &ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: 0,
Count: 2,
Columns: []uint64{1, 3, 2},
}
var wg sync.WaitGroup
wg.Add(1)
p2.SetStreamHandler(protocol, func(stream network.Stream) {
wg.Done()
requestReceived := new(ethpb.DataColumnSidecarsByRangeRequest)
err := p2.Encoding().DecodeWithMaxLength(stream, requestReceived)
assert.NoError(t, err)
assert.DeepSSZEqual(t, requestSent, requestReceived)
for _, sidecar := range expected {
err := WriteDataColumnSidecarChunk(stream, clock, p2.Encoding(), sidecar)
assert.NoError(t, err)
}
err = stream.CloseWrite()
assert.NoError(t, err)
})
ctx := t.Context()
ctxMap := ContextByteVersions{[4]byte{245, 165, 253, 66}: version.Fulu}
actual, err := SendDataColumnSidecarsByRangeRequest(ctx, clock, p1, p2.PeerID(), ctxMap, requestSent)
if tc.expectedError != nil {
require.ErrorContains(t, tc.expectedError.Error(), err)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
return
}
require.Equal(t, len(expected), len(actual))
for i := range expected {
require.DeepSSZEqual(t, expected[i], actual[i].DataColumnSidecar)
}
})
}
}
func TestIsSidecarSlotWithinBounds(t *testing.T) {
request := &ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: 10,
Count: 10,
}
validator, err := isSidecarSlotWithinBounds(request)
require.NoError(t, err)
testCases := []struct {
name string
slot primitives.Slot
isErrorExpected bool
}{
{
name: "too soon",
slot: 9,
isErrorExpected: true,
},
{
name: "too late",
slot: 20,
isErrorExpected: true,
},
{
name: "within bounds",
slot: 15,
isErrorExpected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: tc.slot,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
sidecar, err := blocks.NewRODataColumn(sidecarPb)
require.NoError(t, err)
err = validator(sidecar)
if tc.isErrorExpected {
require.NotNil(t, err)
return
}
require.NoError(t, err)
})
}
}
func TestIsSidecarIndexRequested(t *testing.T) {
request := &ethpb.DataColumnSidecarsByRangeRequest{
Columns: []uint64{2, 9, 4},
}
validator := isSidecarIndexRequested(request)
testCases := []struct {
name string
index uint64
isErrorExpected bool
}{
{
name: "not requested",
index: 1,
isErrorExpected: true,
},
{
name: "requested",
index: 9,
isErrorExpected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: 0,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
Index: tc.index,
}
sidecar, err := blocks.NewRODataColumn(sidecarPb)
require.NoError(t, err)
err = validator(sidecar)
if tc.isErrorExpected {
require.NotNil(t, err)
return
}
require.NoError(t, err)
})
}
}
func TestSendDataColumnSidecarsByRootRequest(t *testing.T) {
nilTestCases := []struct {
name string
request p2ptypes.DataColumnsByRootIdentifiers
}{
{
name: "nil request",
request: nil,
},
{
name: "count is 0",
request: p2ptypes.DataColumnsByRootIdentifiers{{}, {}},
},
}
for _, tc := range nilTestCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := SendDataColumnSidecarsByRootRequest(t.Context(), nil, nil, "aRandomPID", nil, tc.request)
require.NoError(t, err)
require.IsNil(t, actual)
})
}
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 4
params.OverrideBeaconConfig(beaconConfig)
request := p2ptypes.DataColumnsByRootIdentifiers{
{Columns: []uint64{1, 2, 3}},
{Columns: []uint64{4, 5, 6}},
}
_, err := SendDataColumnSidecarsByRootRequest(t.Context(), nil, nil, "aRandomPID", nil, request)
require.ErrorContains(t, errMaxRequestDataColumnSidecarsExceeded.Error(), err)
})
type slotIndex struct {
Slot primitives.Slot
Index uint64
}
createSidecar := func(rootIndex slotIndex) blocks.RODataColumn {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
Index: rootIndex.Index,
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
roSidecar, err := blocks.NewRODataColumn(sidecarPb)
require.NoError(t, err)
return roSidecar
}
testCases := []struct {
name string
slotIndices []slotIndex
expectedError error
}{
{
name: "too many responses",
slotIndices: []slotIndex{
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 2, Index: 1},
{Slot: 2, Index: 2},
{Slot: 2, Index: 3},
{Slot: 1, Index: 3}, // Duplicate
},
expectedError: errMaxResponseDataColumnSidecarsExceeded,
},
{
name: "perfect match",
slotIndices: []slotIndex{
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 2, Index: 1},
{Slot: 2, Index: 2},
{Slot: 2, Index: 3},
},
},
{
name: "few responses than maximum possible",
slotIndices: []slotIndex{
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 2, Index: 1},
{Slot: 2, Index: 2},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
protocol := fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRootTopicV1)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
p1.Connect(p2)
expected := make([]blocks.RODataColumn, 0, len(tc.slotIndices))
for _, slotIndex := range tc.slotIndices {
roSidecar := createSidecar(slotIndex)
expected = append(expected, roSidecar)
}
blockRoot1, blockRoot2 := expected[0].BlockRoot(), expected[3].BlockRoot()
sentRequest := p2ptypes.DataColumnsByRootIdentifiers{
{BlockRoot: blockRoot1[:], Columns: []uint64{1, 2, 3}},
{BlockRoot: blockRoot2[:], Columns: []uint64{1, 2, 3}},
}
var wg sync.WaitGroup
wg.Add(1)
p2.SetStreamHandler(protocol, func(stream network.Stream) {
wg.Done()
requestReceived := new(p2ptypes.DataColumnsByRootIdentifiers)
err := p2.Encoding().DecodeWithMaxLength(stream, requestReceived)
assert.NoError(t, err)
require.Equal(t, len(sentRequest), len(*requestReceived))
for i := range sentRequest {
require.DeepSSZEqual(t, (sentRequest)[i], (*requestReceived)[i])
}
for _, sidecar := range expected {
err := WriteDataColumnSidecarChunk(stream, clock, p2.Encoding(), sidecar.DataColumnSidecar)
assert.NoError(t, err)
}
err = stream.CloseWrite()
assert.NoError(t, err)
})
ctx := t.Context()
ctxMap := ContextByteVersions{[4]byte{245, 165, 253, 66}: version.Fulu}
actual, err := SendDataColumnSidecarsByRootRequest(ctx, clock, p1, p2.PeerID(), ctxMap, sentRequest)
if tc.expectedError != nil {
require.ErrorContains(t, tc.expectedError.Error(), err)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
return
}
require.Equal(t, len(expected), len(actual))
for i := range expected {
require.DeepSSZEqual(t, expected[i], actual[i])
}
})
}
}
func TestIsSidecarIndexRootRequested(t *testing.T) {
testCases := []struct {
name string
root [fieldparams.RootLength]byte
index uint64
isErrorExpected bool
}{
{
name: "non requested root",
root: [fieldparams.RootLength]byte{2},
isErrorExpected: true,
},
{
name: "non requested index",
root: [fieldparams.RootLength]byte{1},
index: 3,
isErrorExpected: true,
},
{
name: "nominal",
root: [fieldparams.RootLength]byte{1},
index: 2,
isErrorExpected: false,
},
}
request := types.DataColumnsByRootIdentifiers{
{BlockRoot: []byte{1}, Columns: []uint64{1, 2}},
}
validator := isSidecarIndexRootRequested(request)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
Index: tc.index,
}
// There is a discrepancy between `tc.root` and the real root,
// but we don't care about it here.
sidecar, err := blocks.NewRODataColumnWithRoot(sidecarPb, tc.root)
require.NoError(t, err)
err = validator(sidecar)
if tc.isErrorExpected {
require.NotNil(t, err)
return
}
require.NoError(t, err)
})
}
}
func TestReadChunkedDataColumnSidecar(t *testing.T) {
t.Run("non nil status code", func(t *testing.T) {
const reason = "a dummy reason"