dataColumnSidecarsByRangeRPCHandler: Gracefully close the stream if no data to return. (#15866)

* `TestDataColumnSidecarsByRangeRPCHandler`: Remove commented code.

* Remove double import

* `dataColumnSidecarsByRangeRPCHandler`: Gracefully close the stream if no data to return.

* Tests: Change `require` to `assert` in goroutines in tests.

https://pkg.go.dev/github.com/stretchr/testify/require#hdr-Assertions

* Add changelog.
This commit is contained in:
Manu NALEPA
2025-10-15 14:16:05 +02:00
committed by GitHub
parent 10f8d8c26e
commit 55b9448d41
3 changed files with 69 additions and 25 deletions

View File

@@ -70,6 +70,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
log.Trace("Serving data column sidecars by range")
if rangeParameters == nil {
closeStream(stream, log)
return nil
}

View File

@@ -23,16 +23,15 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
consensusblocks "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
//beaconConfig.FuluForkEpoch = beaconConfig.ElectraForkEpoch + 100
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
params.BeaconConfig().InitializeForkSchedule()
@@ -47,6 +46,7 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
t.Run("invalid request", func(t *testing.T) {
slot := primitives.Slot(400)
mockNower.SetSlot(t, clock, slot)
@@ -72,8 +72,8 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
code, _, err := readStatusCodeNoDeadline(stream, localP2P.Encoding())
require.NoError(t, err)
require.Equal(t, responseCodeInvalidRequest, code)
assert.NoError(t, err)
assert.Equal(t, responseCodeInvalidRequest, code)
})
localP2P.Connect(remoteP2P)
@@ -94,6 +94,48 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
}
})
t.Run("in the future", func(t *testing.T) {
slot := primitives.Slot(400)
mockNower.SetSlot(t, clock, slot)
localP2P, remoteP2P := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
protocolID := protocol.ID(fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRangeTopicV1))
service := &Service{
cfg: &config{
p2p: localP2P,
chain: &chainMock.ChainService{
Slot: &slot,
},
clock: clock,
},
rateLimiter: newRateLimiter(localP2P),
}
var wg sync.WaitGroup
wg.Add(1)
remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
_, err := readChunkedDataColumnSidecar(stream, remoteP2P, ctxMap)
assert.Equal(t, true, errors.Is(err, io.EOF))
})
localP2P.Connect(remoteP2P)
stream, err := localP2P.BHost.NewStream(ctx, remoteP2P.BHost.ID(), protocolID)
require.NoError(t, err)
msg := &pb.DataColumnSidecarsByRangeRequest{
StartSlot: slot + 1,
Count: 50,
Columns: []uint64{1, 2, 3, 4, 6, 7, 8, 9, 10},
}
err = service.dataColumnSidecarsByRangeRPCHandler(ctx, msg, stream)
require.NoError(t, err)
})
t.Run("nominal", func(t *testing.T) {
slot := primitives.Slot(400)
@@ -133,12 +175,12 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
signedBeaconBlockPb.Block.ParentRoot = roots[i-1][:]
}
signedBeaconBlock, err := consensusblocks.NewSignedBeaconBlock(signedBeaconBlockPb)
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// There is a discrepancy between the root of the beacon block and the rodata column root,
// but for the sake of this test, we actually don't care.
roblock, err := consensusblocks.NewROBlockWithRoot(signedBeaconBlock, roots[i])
roblock, err := blocks.NewROBlockWithRoot(signedBeaconBlock, roots[i])
require.NoError(t, err)
roBlocks = append(roBlocks, roblock)
@@ -178,28 +220,28 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
break
}
require.NoError(t, err)
assert.NoError(t, err)
sidecars = append(sidecars, sidecar)
}
require.Equal(t, 8, len(sidecars))
require.Equal(t, root0, sidecars[0].BlockRoot())
require.Equal(t, root0, sidecars[1].BlockRoot())
require.Equal(t, root0, sidecars[2].BlockRoot())
require.Equal(t, root3, sidecars[3].BlockRoot())
require.Equal(t, root3, sidecars[4].BlockRoot())
require.Equal(t, root5, sidecars[5].BlockRoot())
require.Equal(t, root5, sidecars[6].BlockRoot())
require.Equal(t, root5, sidecars[7].BlockRoot())
assert.Equal(t, 8, len(sidecars))
assert.Equal(t, root0, sidecars[0].BlockRoot())
assert.Equal(t, root0, sidecars[1].BlockRoot())
assert.Equal(t, root0, sidecars[2].BlockRoot())
assert.Equal(t, root3, sidecars[3].BlockRoot())
assert.Equal(t, root3, sidecars[4].BlockRoot())
assert.Equal(t, root5, sidecars[5].BlockRoot())
assert.Equal(t, root5, sidecars[6].BlockRoot())
assert.Equal(t, root5, sidecars[7].BlockRoot())
require.Equal(t, uint64(1), sidecars[0].Index)
require.Equal(t, uint64(2), sidecars[1].Index)
require.Equal(t, uint64(3), sidecars[2].Index)
require.Equal(t, uint64(4), sidecars[3].Index)
require.Equal(t, uint64(6), sidecars[4].Index)
require.Equal(t, uint64(7), sidecars[5].Index)
require.Equal(t, uint64(8), sidecars[6].Index)
require.Equal(t, uint64(9), sidecars[7].Index)
assert.Equal(t, uint64(1), sidecars[0].Index)
assert.Equal(t, uint64(2), sidecars[1].Index)
assert.Equal(t, uint64(3), sidecars[2].Index)
assert.Equal(t, uint64(4), sidecars[3].Index)
assert.Equal(t, uint64(6), sidecars[4].Index)
assert.Equal(t, uint64(7), sidecars[5].Index)
assert.Equal(t, uint64(8), sidecars[6].Index)
assert.Equal(t, uint64(9), sidecars[7].Index)
})
localP2P.Connect(remoteP2P)
@@ -215,7 +257,6 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
err = service.dataColumnSidecarsByRangeRPCHandler(ctx, msg, stream)
require.NoError(t, err)
})
}
func TestValidateDataColumnsByRange(t *testing.T) {

View File

@@ -0,0 +1,2 @@
### Fixed
- `dataColumnSidecarsByRangeRPCHandler`: Gracefully close the stream if no data to return.