From 55b9448d416c04d8b52637a4d06f7236ba2f1731 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 15 Oct 2025 14:16:05 +0200 Subject: [PATCH] `dataColumnSidecarsByRangeRPCHandler`: Gracefully close the stream if no data to return. (#15866) * `TestDataColumnSidecarsByRangeRPCHandler`: Remove commented code. * Remove double import * `dataColumnSidecarsByRangeRPCHandler`: Gracefully close the stream if no data to return. * Tests: Change `require` to `assert` in goroutines in tests. https://pkg.go.dev/github.com/stretchr/testify/require#hdr-Assertions * Add changelog. --- .../sync/rpc_data_column_sidecars_by_range.go | 1 + .../rpc_data_column_sidecars_by_range_test.go | 91 ++++++++++++++----- changelog/manu-gracefully-close-stream.md | 2 + 3 files changed, 69 insertions(+), 25 deletions(-) create mode 100644 changelog/manu-gracefully-close-stream.md diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go index e64ef8ef64..a2be4b4587 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go @@ -70,6 +70,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i log.Trace("Serving data column sidecars by range") if rangeParameters == nil { + closeStream(stream, log) return nil } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range_test.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range_test.go index b0620918cc..07d0f2a66a 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range_test.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range_test.go @@ -23,16 +23,15 @@ import ( fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" - consensusblocks "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/util" ) func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { params.SetupTestConfigCleanup(t) beaconConfig := params.BeaconConfig() - //beaconConfig.FuluForkEpoch = beaconConfig.ElectraForkEpoch + 100 beaconConfig.FuluForkEpoch = 0 params.OverrideBeaconConfig(beaconConfig) params.BeaconConfig().InitializeForkSchedule() @@ -47,6 +46,7 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot) require.NoError(t, err) + t.Run("invalid request", func(t *testing.T) { slot := primitives.Slot(400) mockNower.SetSlot(t, clock, slot) @@ -72,8 +72,8 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) { defer wg.Done() code, _, err := readStatusCodeNoDeadline(stream, localP2P.Encoding()) - require.NoError(t, err) - require.Equal(t, responseCodeInvalidRequest, code) + assert.NoError(t, err) + assert.Equal(t, responseCodeInvalidRequest, code) }) localP2P.Connect(remoteP2P) @@ -94,6 +94,48 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { } }) + t.Run("in the future", func(t *testing.T) { + slot := primitives.Slot(400) + mockNower.SetSlot(t, clock, slot) + + localP2P, remoteP2P := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t) + protocolID := protocol.ID(fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRangeTopicV1)) + + service := &Service{ + cfg: &config{ + p2p: localP2P, + chain: &chainMock.ChainService{ + Slot: &slot, + }, + clock: clock, + }, + rateLimiter: newRateLimiter(localP2P), + } + + var wg sync.WaitGroup + wg.Add(1) + + remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) { + defer wg.Done() + + _, err := readChunkedDataColumnSidecar(stream, remoteP2P, ctxMap) + assert.Equal(t, true, errors.Is(err, io.EOF)) + }) + + localP2P.Connect(remoteP2P) + stream, err := localP2P.BHost.NewStream(ctx, remoteP2P.BHost.ID(), protocolID) + require.NoError(t, err) + + msg := &pb.DataColumnSidecarsByRangeRequest{ + StartSlot: slot + 1, + Count: 50, + Columns: []uint64{1, 2, 3, 4, 6, 7, 8, 9, 10}, + } + + err = service.dataColumnSidecarsByRangeRPCHandler(ctx, msg, stream) + require.NoError(t, err) + }) + t.Run("nominal", func(t *testing.T) { slot := primitives.Slot(400) @@ -133,12 +175,12 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { signedBeaconBlockPb.Block.ParentRoot = roots[i-1][:] } - signedBeaconBlock, err := consensusblocks.NewSignedBeaconBlock(signedBeaconBlockPb) + signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb) require.NoError(t, err) // There is a discrepancy between the root of the beacon block and the rodata column root, // but for the sake of this test, we actually don't care. - roblock, err := consensusblocks.NewROBlockWithRoot(signedBeaconBlock, roots[i]) + roblock, err := blocks.NewROBlockWithRoot(signedBeaconBlock, roots[i]) require.NoError(t, err) roBlocks = append(roBlocks, roblock) @@ -178,28 +220,28 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { break } - require.NoError(t, err) + assert.NoError(t, err) sidecars = append(sidecars, sidecar) } - require.Equal(t, 8, len(sidecars)) - require.Equal(t, root0, sidecars[0].BlockRoot()) - require.Equal(t, root0, sidecars[1].BlockRoot()) - require.Equal(t, root0, sidecars[2].BlockRoot()) - require.Equal(t, root3, sidecars[3].BlockRoot()) - require.Equal(t, root3, sidecars[4].BlockRoot()) - require.Equal(t, root5, sidecars[5].BlockRoot()) - require.Equal(t, root5, sidecars[6].BlockRoot()) - require.Equal(t, root5, sidecars[7].BlockRoot()) + assert.Equal(t, 8, len(sidecars)) + assert.Equal(t, root0, sidecars[0].BlockRoot()) + assert.Equal(t, root0, sidecars[1].BlockRoot()) + assert.Equal(t, root0, sidecars[2].BlockRoot()) + assert.Equal(t, root3, sidecars[3].BlockRoot()) + assert.Equal(t, root3, sidecars[4].BlockRoot()) + assert.Equal(t, root5, sidecars[5].BlockRoot()) + assert.Equal(t, root5, sidecars[6].BlockRoot()) + assert.Equal(t, root5, sidecars[7].BlockRoot()) - require.Equal(t, uint64(1), sidecars[0].Index) - require.Equal(t, uint64(2), sidecars[1].Index) - require.Equal(t, uint64(3), sidecars[2].Index) - require.Equal(t, uint64(4), sidecars[3].Index) - require.Equal(t, uint64(6), sidecars[4].Index) - require.Equal(t, uint64(7), sidecars[5].Index) - require.Equal(t, uint64(8), sidecars[6].Index) - require.Equal(t, uint64(9), sidecars[7].Index) + assert.Equal(t, uint64(1), sidecars[0].Index) + assert.Equal(t, uint64(2), sidecars[1].Index) + assert.Equal(t, uint64(3), sidecars[2].Index) + assert.Equal(t, uint64(4), sidecars[3].Index) + assert.Equal(t, uint64(6), sidecars[4].Index) + assert.Equal(t, uint64(7), sidecars[5].Index) + assert.Equal(t, uint64(8), sidecars[6].Index) + assert.Equal(t, uint64(9), sidecars[7].Index) }) localP2P.Connect(remoteP2P) @@ -215,7 +257,6 @@ func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) { err = service.dataColumnSidecarsByRangeRPCHandler(ctx, msg, stream) require.NoError(t, err) }) - } func TestValidateDataColumnsByRange(t *testing.T) { diff --git a/changelog/manu-gracefully-close-stream.md b/changelog/manu-gracefully-close-stream.md new file mode 100644 index 0000000000..236706058e --- /dev/null +++ b/changelog/manu-gracefully-close-stream.md @@ -0,0 +1,2 @@ +### Fixed +- `dataColumnSidecarsByRangeRPCHandler`: Gracefully close the stream if no data to return.