StatusV2: Send correct earliest available slot. (#15818)

* `buildStatusFromStream`: Use parent context.

* Status tests: Use `t.Context` everywhere.

* `buildStatusFromStream`: Respond statusV2 only if Fulu is enabled.

Without doing so, earliest available slot is never defined, and then `s.cfg.p2p.EarliestAvailableSlot` will block until the context is canceled.

* Send our real earliest available slot when sending a Status request post Fulu instead of `0`.

* Add changelog.
This commit is contained in:
Manu NALEPA
2025-10-07 20:45:47 +02:00
committed by GitHub
parent b45e87abd6
commit 537f3cb863
3 changed files with 81 additions and 43 deletions

View File

@@ -160,7 +160,10 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, peer peer.ID) error
}
cp := s.cfg.chain.FinalizedCheckpt()
status := s.buildStatusFromEpoch(currentEpoch, forkDigest, cp.Root, cp.Epoch, headRoot)
status, err := s.buildStatusFromEpoch(ctx, currentEpoch, forkDigest, cp.Root, cp.Epoch, headRoot)
if err != nil {
return errors.Wrap(err, "build status from epoch")
}
stream, err := s.cfg.p2p.Send(ctx, status, topic, peer)
if err != nil {
@@ -312,7 +315,7 @@ func (s *Service) respondWithStatus(ctx context.Context, stream network.Stream)
}
cp := s.cfg.chain.FinalizedCheckpt()
status, err := s.buildStatusFromStream(stream, forkDigest, cp.Root, cp.Epoch, headRoot)
status, err := s.buildStatusFromStream(ctx, stream, forkDigest, cp.Root, cp.Epoch, headRoot)
if err != nil {
return errors.Wrap(err, "build status")
}
@@ -329,6 +332,7 @@ func (s *Service) respondWithStatus(ctx context.Context, stream network.Stream)
}
func (s *Service) buildStatusFromStream(
ctx context.Context,
stream libp2pcore.Stream,
forkDigest [4]byte,
finalizedRoot []byte,
@@ -353,8 +357,8 @@ func (s *Service) buildStatusFromStream(
return nil, err
}
if streamVersion == p2p.SchemaVersionV2 {
earliestAvailableSlot, err := s.cfg.p2p.EarliestAvailableSlot(s.ctx)
if params.FuluEnabled() && streamVersion == p2p.SchemaVersionV2 {
earliestAvailableSlot, err := s.cfg.p2p.EarliestAvailableSlot(ctx)
if err != nil {
return nil, errors.Wrap(err, "earliest available slot")
}
@@ -383,24 +387,30 @@ func (s *Service) buildStatusFromStream(
}
func (s *Service) buildStatusFromEpoch(
ctx context.Context,
epoch primitives.Epoch,
forkDigest [4]byte,
finalizedRoot []byte,
FinalizedEpoch primitives.Epoch,
headRoot []byte,
) ssz.Marshaler {
) (ssz.Marshaler, error) {
// Get the stream version from the protocol.
if epoch >= params.BeaconConfig().FuluForkEpoch {
earliestAvailableSlot, err := s.cfg.p2p.EarliestAvailableSlot(ctx)
if err != nil {
return nil, errors.Wrap(err, "earliest available slot")
}
status := &pb.StatusV2{
ForkDigest: forkDigest[:],
FinalizedRoot: finalizedRoot,
FinalizedEpoch: FinalizedEpoch,
HeadRoot: headRoot,
HeadSlot: s.cfg.chain.HeadSlot(),
EarliestAvailableSlot: 0,
EarliestAvailableSlot: earliestAvailableSlot,
}
return status
return status, nil
}
status := &pb.Status{
@@ -411,7 +421,7 @@ func (s *Service) buildStatusFromEpoch(
HeadSlot: s.cfg.chain.HeadSlot(),
}
return status
return status, nil
}
func (s *Service) validateStatusMessage(ctx context.Context, genericMsg interface{}) error {

View File

@@ -40,6 +40,8 @@ import (
)
func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
ctx := t.Context()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
@@ -96,9 +98,9 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
assert.NoError(t, stream.Close())
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
stream1, err := p1.BHost.NewStream(ctx, p2.BHost.ID(), pcl)
require.NoError(t, err)
assert.NoError(t, r.statusRPCHandler(t.Context(), &ethpb.Status{ForkDigest: bytesutil.PadTo([]byte("f"), 4), HeadRoot: make([]byte, 32), FinalizedRoot: make([]byte, 32)}, stream1))
assert.NoError(t, r.statusRPCHandler(ctx, &ethpb.Status{ForkDigest: bytesutil.PadTo([]byte("f"), 4), HeadRoot: make([]byte, 32), FinalizedRoot: make([]byte, 32)}, stream1))
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
@@ -111,6 +113,8 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
}
func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) {
ctx := t.Context()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
@@ -153,12 +157,12 @@ func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) {
assert.DeepEqual(t, root[:], out.FinalizedRoot)
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
stream1, err := p1.BHost.NewStream(ctx, p2.BHost.ID(), pcl)
require.NoError(t, err)
digest, err := r.currentForkDigest()
require.NoError(t, err)
err = r.statusRPCHandler(t.Context(), &ethpb.Status{ForkDigest: digest[:], FinalizedRoot: params.BeaconConfig().ZeroHash[:]}, stream1)
err = r.statusRPCHandler(ctx, &ethpb.Status{ForkDigest: digest[:], FinalizedRoot: params.BeaconConfig().ZeroHash[:]}, stream1)
assert.NoError(t, err)
if util.WaitTimeout(&wg, 1*time.Second) {
@@ -169,6 +173,8 @@ func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) {
}
func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) {
ctx := t.Context()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
@@ -185,12 +191,12 @@ func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) {
finalized.Block.Slot = blkSlot
finalizedRoot, err := finalized.Block.HashTreeRoot()
require.NoError(t, err)
genesisState, err := transition.GenesisBeaconState(t.Context(), nil, 0, &ethpb.Eth1Data{})
genesisState, err := transition.GenesisBeaconState(ctx, nil, 0, &ethpb.Eth1Data{})
require.NoError(t, err)
require.NoError(t, genesisState.SetSlot(111))
require.NoError(t, genesisState.UpdateBlockRootAtIndex(111%uint64(params.BeaconConfig().SlotsPerHistoricalRoot), headRoot))
util.SaveBlock(t, t.Context(), db, finalized)
require.NoError(t, db.SaveGenesisBlockRoot(t.Context(), finalizedRoot))
util.SaveBlock(t, ctx, db, finalized)
require.NoError(t, db.SaveGenesisBlockRoot(ctx, finalizedRoot))
finalizedCheckpt := &ethpb.Checkpoint{
Epoch: 3,
Root: finalizedRoot[:],
@@ -247,10 +253,10 @@ func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) {
t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected)
}
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
stream1, err := p1.BHost.NewStream(ctx, p2.BHost.ID(), pcl)
require.NoError(t, err)
err = r.statusRPCHandler(t.Context(), &ethpb.Status{
err = r.statusRPCHandler(ctx, &ethpb.Status{
ForkDigest: digest[:],
FinalizedRoot: finalizedRoot[:],
FinalizedEpoch: 3,
@@ -434,7 +440,13 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
}
func TestStatusRPCRequest_RequestSent(t *testing.T) {
const (
earliestAvailableSlot = primitives.Slot(50)
custodyGroupCount = uint64(4)
)
beaconConfig := params.BeaconConfig()
ctx := t.Context()
testCases := []struct {
name string
@@ -465,7 +477,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected)
}
err = service.respondWithStatus(context.Background(), stream)
err = service.respondWithStatus(ctx, stream)
require.NoError(t, err)
},
},
@@ -486,14 +498,14 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
HeadRoot: headRoot,
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot,
EarliestAvailableSlot: 0,
EarliestAvailableSlot: earliestAvailableSlot,
}
if !proto.Equal(out, expected) {
t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected)
}
err = service.respondWithStatus(t.Context(), stream)
err = service.respondWithStatus(ctx, stream)
require.NoError(t, err)
},
},
@@ -510,6 +522,11 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
p1.Connect(p2)
updatedEas, updatedCgc, err := p1.UpdateCustodyInfo(earliestAvailableSlot, custodyGroupCount)
require.NoError(t, err)
require.Equal(t, earliestAvailableSlot, updatedEas)
require.Equal(t, custodyGroupCount, updatedCgc)
// Set up a head state with data we expect.
head := util.NewBeaconBlock()
head.Block.Slot = 111
@@ -521,7 +538,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
finalizedRoot, err := finalized.Block.HashTreeRoot()
require.NoError(t, err)
genesisState, err := transition.GenesisBeaconState(context.Background(), nil, 0, &ethpb.Eth1Data{})
genesisState, err := transition.GenesisBeaconState(ctx, nil, 0, &ethpb.Eth1Data{})
require.NoError(t, err)
require.NoError(t, genesisState.SetSlot(111))
@@ -550,7 +567,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: context.Background(),
ctx: ctx,
rateLimiter: newRateLimiter(p1),
}
@@ -566,7 +583,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
tc.streamHandler(r, stream, genesisState, chain.Root, headRoot[:], finalizedRoot[:])
})
err = r.sendRPCStatusRequest(context.Background(), p2.BHost.ID())
err = r.sendRPCStatusRequest(ctx, p2.BHost.ID())
require.ErrorIs(t, err, p2ptypes.ErrInvalidEpoch)
if util.WaitTimeout(&wg, 1*time.Hour) {
@@ -579,6 +596,8 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
}
func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
ctx := t.Context()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
db := testingDB.SetupDB(t)
@@ -593,14 +612,14 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
finalized.Block.Slot = blkSlot
finalizedRoot, err := finalized.Block.HashTreeRoot()
require.NoError(t, err)
genesisState, err := transition.GenesisBeaconState(t.Context(), nil, 0, &ethpb.Eth1Data{DepositRoot: make([]byte, 32), BlockHash: make([]byte, 32)})
genesisState, err := transition.GenesisBeaconState(ctx, nil, 0, &ethpb.Eth1Data{DepositRoot: make([]byte, 32), BlockHash: make([]byte, 32)})
require.NoError(t, err)
require.NoError(t, genesisState.SetSlot(111))
require.NoError(t, genesisState.UpdateBlockRootAtIndex(111%uint64(params.BeaconConfig().SlotsPerHistoricalRoot), headRoot))
blk := util.NewBeaconBlock()
blk.Block.Slot = blkSlot
util.SaveBlock(t, t.Context(), db, blk)
require.NoError(t, db.SaveGenesisBlockRoot(t.Context(), finalizedRoot))
util.SaveBlock(t, ctx, db, blk)
require.NoError(t, db.SaveGenesisBlockRoot(ctx, finalizedRoot))
finalizedCheckpt := &ethpb.Checkpoint{
Epoch: 3,
Root: finalizedRoot[:],
@@ -628,7 +647,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
stateNotifier: chain.StateNotifier(),
},
ctx: t.Context(),
ctx: ctx,
rateLimiter: newRateLimiter(p1),
}
chain2 := &mock.ChainService{
@@ -653,7 +672,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
beaconDB: db,
stateNotifier: chain.StateNotifier(),
},
ctx: t.Context(),
ctx: ctx,
rateLimiter: newRateLimiter(p1),
}
@@ -667,7 +686,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
defer wg.Done()
out := &ethpb.Status{}
assert.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
assert.NoError(t, r2.validateStatusMessage(t.Context(), out))
assert.NoError(t, r2.validateStatusMessage(ctx, out))
})
p1.AddConnectionHandler(r.sendRPCStatusRequest, nil)
@@ -681,9 +700,11 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
}
func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
db, err := kv.NewKVStore(t.Context(), t.TempDir())
ctx := t.Context()
db, err := kv.NewKVStore(ctx, t.TempDir())
require.NoError(t, err)
bState, err := transition.GenesisBeaconState(t.Context(), nil, 0, &ethpb.Eth1Data{DepositRoot: make([]byte, 32), BlockHash: make([]byte, 32)})
bState, err := transition.GenesisBeaconState(ctx, nil, 0, &ethpb.Eth1Data{DepositRoot: make([]byte, 32), BlockHash: make([]byte, 32)})
require.NoError(t, err)
blk := util.NewBeaconBlock()
@@ -693,10 +714,10 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
wsb, err := consensusblocks.NewSignedBeaconBlock(blk)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(t.Context(), wsb))
require.NoError(t, db.SaveGenesisBlockRoot(t.Context(), genRoot))
require.NoError(t, db.SaveBlock(ctx, wsb))
require.NoError(t, db.SaveGenesisBlockRoot(ctx, genRoot))
blocksTillHead := makeBlocks(t, 1, 1000, genRoot)
require.NoError(t, db.SaveBlocks(t.Context(), blocksTillHead))
require.NoError(t, db.SaveBlocks(ctx, blocksTillHead))
stateSummaries := make([]*ethpb.StateSummary, len(blocksTillHead))
for i, b := range blocksTillHead {
@@ -707,7 +728,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
Root: bRoot[:],
}
}
require.NoError(t, db.SaveStateSummaries(t.Context(), stateSummaries))
require.NoError(t, db.SaveStateSummaries(ctx, stateSummaries))
rootFetcher := func(slot primitives.Slot) [32]byte {
rt, err := blocksTillHead[slot-1].Block().HashTreeRoot()
@@ -788,7 +809,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
Epoch: tt.remoteFinalizedEpoch,
Root: tt.remoteFinalizedRoot[:],
}
require.NoError(t, db.SaveFinalizedCheckpoint(t.Context(), finalizedCheckpt))
require.NoError(t, db.SaveFinalizedCheckpoint(ctx, finalizedCheckpt))
epoch := expectedFinalizedEpoch.Add(2)
totalSec := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(uint64(epoch) * params.BeaconConfig().SecondsPerSlot))
@@ -816,7 +837,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
stateNotifier: chain.StateNotifier(),
},
ctx: t.Context(),
ctx: ctx,
rateLimiter: newRateLimiter(p1),
}
chain2 := &mock.ChainService{
@@ -843,7 +864,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
stateNotifier: chain.StateNotifier(),
},
ctx: t.Context(),
ctx: ctx,
rateLimiter: newRateLimiter(p1),
}
@@ -857,7 +878,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
defer wg.Done()
out := &ethpb.Status{}
assert.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
assert.Equal(t, tt.expectError, r2.validateStatusMessage(t.Context(), out) != nil)
assert.Equal(t, tt.expectError, r2.validateStatusMessage(ctx, out) != nil)
})
p1.AddConnectionHandler(r.sendRPCStatusRequest, nil)
@@ -973,6 +994,8 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
}
func TestStatusRPC_ValidGenesisMessage(t *testing.T) {
ctx := t.Context()
// Set up a head state with data we expect.
head := util.NewBeaconBlock()
head.Block.Slot = 111
@@ -983,7 +1006,7 @@ func TestStatusRPC_ValidGenesisMessage(t *testing.T) {
finalized.Block.Slot = blkSlot
finalizedRoot, err := finalized.Block.HashTreeRoot()
require.NoError(t, err)
genesisState, err := transition.GenesisBeaconState(t.Context(), nil, 0, &ethpb.Eth1Data{})
genesisState, err := transition.GenesisBeaconState(ctx, nil, 0, &ethpb.Eth1Data{})
require.NoError(t, err)
require.NoError(t, genesisState.SetSlot(111))
require.NoError(t, genesisState.UpdateBlockRootAtIndex(111%uint64(params.BeaconConfig().SlotsPerHistoricalRoot), headRoot))
@@ -1008,7 +1031,7 @@ func TestStatusRPC_ValidGenesisMessage(t *testing.T) {
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
stateNotifier: chain.StateNotifier(),
},
ctx: t.Context(),
ctx: ctx,
}
digest, err := r.currentForkDigest()
require.NoError(t, err)
@@ -1025,6 +1048,8 @@ func TestStatusRPC_ValidGenesisMessage(t *testing.T) {
}
func TestShouldResync(t *testing.T) {
ctx := t.Context()
type args struct {
genesis time.Time
syncing bool
@@ -1073,7 +1098,7 @@ func TestShouldResync(t *testing.T) {
},
}
for _, tt := range tests {
headState, err := transition.GenesisBeaconState(t.Context(), nil, 0, &ethpb.Eth1Data{})
headState, err := transition.GenesisBeaconState(ctx, nil, 0, &ethpb.Eth1Data{})
require.NoError(t, err)
require.NoError(t, headState.SetSlot(tt.args.headSlot))
chain := &mock.ChainService{
@@ -1087,7 +1112,7 @@ func TestShouldResync(t *testing.T) {
initialSync: &mockSync.Sync{IsSyncing: tt.args.syncing},
stateNotifier: chain.StateNotifier(),
},
ctx: t.Context(),
ctx: ctx,
}
t.Run(tt.name, func(t *testing.T) {
if got := r.shouldReSync(); got != tt.want {

3
changelog/manu-eas.md Normal file
View File

@@ -0,0 +1,3 @@
### Fixed
- `buildStatusFromStream`: Respond `statusV2` only if Fulu is enabled.
- Send our real earliest available slot when sending a Status request post Fulu instead of `0`.